#include <XrdClHttpUtil.hh>
HandlerQueue is a deque of curl operations that need to be performed. The object is thread safe and can be waited on via poll().
The fact that it's poll'able is necessary because the multi-curl driver thread is based on polling FD's
Definition at line 167 of file XrdClHttpUtil.hh.
◆ HandlerQueue()
| HandlerQueue::HandlerQueue |
( |
unsigned |
max_pending_ops | ) |
|
Definition at line 555 of file XrdClHttpUtil.cc.
556 m_max_pending_ops(max_pending_ops)
559 auto result = pipe(filedes);
561 throw std::runtime_error(strerror(errno));
563 if (fcntl(filedes[0], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1 || fcntl(filedes[1], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1) {
566 throw std::runtime_error(strerror(errno));
568 m_read_fd = filedes[0];
569 m_write_fd = filedes[1];
References close.
◆ Consume()
| std::shared_ptr< CurlOperation > HandlerQueue::Consume |
( |
std::chrono::steady_clock::duration |
dur | ) |
|
Definition at line 788 of file XrdClHttpUtil.cc.
790 std::unique_lock<std::mutex> lk(m_mutex);
791 m_consumer_cv.wait_for(lk, dur, [&]{
return m_ops.size() > 0 || m_shutdown;});
792 if (m_shutdown || m_ops.empty()) {
796 std::shared_ptr<CurlOperation> result = m_ops.front();
801 auto result =
read(m_read_fd, ready, 1);
803 if (errno == EINTR) {
805 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
810 throw std::runtime_error(strerror(errno));
816 m_producer_cv.notify_one();
817 m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
ssize_t read(int fildes, void *buf, size_t nbyte)
References read().
◆ Expire()
| void HandlerQueue::Expire |
( |
| ) |
|
Definition at line 687 of file XrdClHttpUtil.cc.
689 std::unique_lock<std::mutex> lk(m_mutex);
690 auto now = std::chrono::steady_clock::now();
693 for (
auto &op : m_ops) {
694 if (!op->IsPaused())
continue;
696 if (op->TransferStalled(0, now)) {
697 op->ContinueHandle();
701 std::vector<decltype(m_ops)::value_type> expired_ops;
702 unsigned expired_count = 0;
703 auto it = std::remove_if(m_ops.begin(), m_ops.end(),
704 [&](
const std::shared_ptr<CurlOperation> &handler) {
705 auto expired = handler->GetOperationExpiry() < now;
707 expired_ops.push_back(handler);
712 m_ops.erase(it, m_ops.end());
717 unsigned bytes_to_read = expired_count;
718 while (bytes_to_read > 0) {
719 size_t chunk = std::min<size_t>(
sizeof(throwaway), bytes_to_read);
720 ssize_t n =
read(m_read_fd, throwaway, chunk);
723 }
else if (n == -1) {
724 if (errno == EINTR) {
744 for (
auto &handler : expired_ops) {
const uint16_t errOperationExpired
◆ GetDefaultMaxPendingOps()
| static unsigned XrdClHttp::HandlerQueue::GetDefaultMaxPendingOps |
( |
| ) |
|
|
inlinestatic |
◆ GetHandle()
| CURL * HandlerQueue::GetHandle |
( |
| ) |
|
◆ GetMonitoringJson()
| std::string HandlerQueue::GetMonitoringJson |
( |
| ) |
|
|
static |
Definition at line 823 of file XrdClHttpUtil.cc.
825 auto consumed = m_ops_consumed.load(std::memory_order_relaxed);
826 auto produced = m_ops_produced.load(std::memory_order_relaxed);
828 "\"produced\":" + std::to_string(produced) +
","
829 "\"consumed\":" + std::to_string(consumed) +
","
830 "\"pending\":" + std::to_string(produced - consumed) +
","
831 "\"rejected\":" + std::to_string(m_ops_rejected.load(std::memory_order_relaxed)) +
◆ PollFD()
| int XrdClHttp::HandlerQueue::PollFD |
( |
| ) |
const |
|
inline |
◆ Produce()
| void HandlerQueue::Produce |
( |
std::shared_ptr< CurlOperation > |
handler | ) |
|
Definition at line 750 of file XrdClHttpUtil.cc.
752 auto handler_expiry = handler->GetOperationExpiry();
753 std::unique_lock<std::mutex> lk{m_mutex};
754 m_producer_cv.wait_until(lk,
756 [&]{
return m_ops.size() < m_max_pending_ops;}
758 if (std::chrono::steady_clock::now() > handler_expiry) {
761 m_ops_rejected.fetch_add(1, std::memory_order_relaxed);
765 m_ops.push_back(handler);
768 auto result =
write(m_write_fd, ready, 1);
770 if (errno == EINTR) {
772 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
777 throw std::runtime_error(strerror(errno));
783 m_consumer_cv.notify_one();
784 m_ops_produced.fetch_add(1, std::memory_order_relaxed);
References XrdCl::errOperationExpired, and write.
◆ RecycleHandle()
| void HandlerQueue::RecycleHandle |
( |
CURL * |
curl | ) |
|
◆ ReleaseHandles()
| void HandlerQueue::ReleaseHandles |
( |
| ) |
|
Definition at line 879 of file XrdClHttpUtil.cc.
881 for (
auto handle : m_handles) {
882 curl_easy_cleanup(handle);
◆ Shutdown()
| void HandlerQueue::Shutdown |
( |
| ) |
|
Definition at line 871 of file XrdClHttpUtil.cc.
873 std::unique_lock lock(m_mutex);
875 m_consumer_cv.notify_all();
◆ TryConsume()
Definition at line 836 of file XrdClHttpUtil.cc.
838 std::unique_lock<std::mutex> lk(m_mutex);
839 if (m_ops.size() == 0) {
840 std::shared_ptr<CurlOperation> result;
844 std::shared_ptr<CurlOperation> result = m_ops.front();
849 auto result =
read(m_read_fd, ready, 1);
851 if (errno == EINTR) {
853 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
858 throw std::runtime_error(strerror(errno));
864 m_producer_cv.notify_one();
865 m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
References read().
The documentation for this class was generated from the following files: