XRootD
XrdClHttp::HandlerQueue Class Reference

#include <XrdClHttpUtil.hh>

+ Collaboration diagram for XrdClHttp::HandlerQueue:

Public Member Functions

 HandlerQueue (unsigned max_pending_ops)
 
std::shared_ptr< CurlOperationConsume (std::chrono::steady_clock::duration)
 
void Expire ()
 
CURLGetHandle ()
 
int PollFD () const
 
void Produce (std::shared_ptr< CurlOperation > handler)
 
void RecycleHandle (CURL *)
 
void ReleaseHandles ()
 
void Shutdown ()
 
std::shared_ptr< CurlOperationTryConsume ()
 

Static Public Member Functions

static unsigned GetDefaultMaxPendingOps ()
 
static std::string GetMonitoringJson ()
 

Detailed Description

HandlerQueue is a deque of curl operations that need to be performed. The object is thread safe and can be waited on via poll().

The fact that it's poll'able is necessary because the multi-curl driver thread is based on polling FD's

Definition at line 167 of file XrdClHttpUtil.hh.

Constructor & Destructor Documentation

◆ HandlerQueue()

HandlerQueue::HandlerQueue ( unsigned  max_pending_ops)

Definition at line 555 of file XrdClHttpUtil.cc.

555  :
556  m_max_pending_ops(max_pending_ops)
557 {
558  int filedes[2];
559  auto result = pipe(filedes);
560  if (result == -1) {
561  throw std::runtime_error(strerror(errno));
562  }
563  if (fcntl(filedes[0], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1 || fcntl(filedes[1], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1) {
564  close(filedes[0]);
565  close(filedes[1]);
566  throw std::runtime_error(strerror(errno));
567  }
568  m_read_fd = filedes[0];
569  m_write_fd = filedes[1];
570 };
#define close(a)
Definition: XrdPosix.hh:48

References close.

Member Function Documentation

◆ Consume()

std::shared_ptr< CurlOperation > HandlerQueue::Consume ( std::chrono::steady_clock::duration  dur)

Definition at line 788 of file XrdClHttpUtil.cc.

789 {
790  std::unique_lock<std::mutex> lk(m_mutex);
791  m_consumer_cv.wait_for(lk, dur, [&]{return m_ops.size() > 0 || m_shutdown;});
792  if (m_shutdown || m_ops.empty()) {
793  return {};
794  }
795 
796  std::shared_ptr<CurlOperation> result = m_ops.front();
797  m_ops.pop_front();
798 
799  char ready[1];
800  while (true) {
801  auto result = read(m_read_fd, ready, 1);
802  if (result == -1) {
803  if (errno == EINTR) {
804  continue;
805  } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
806  // This should never happen, but if it does, just continue
807  // as if we successfully read the byte.
808  break;
809  }
810  throw std::runtime_error(strerror(errno));
811  }
812  break;
813  }
814 
815  lk.unlock();
816  m_producer_cv.notify_one();
817  m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
818 
819  return result;
820 }
ssize_t read(int fildes, void *buf, size_t nbyte)

References read().

+ Here is the call graph for this function:

◆ Expire()

void HandlerQueue::Expire ( )

Definition at line 687 of file XrdClHttpUtil.cc.

688 {
689  std::unique_lock<std::mutex> lk(m_mutex);
690  auto now = std::chrono::steady_clock::now();
691 
692  // Iterate through the paused transfers, checking if they are done.
693  for (auto &op : m_ops) {
694  if (!op->IsPaused()) continue;
695 
696  if (op->TransferStalled(0, now)) {
697  op->ContinueHandle();
698  }
699  }
700 
701  std::vector<decltype(m_ops)::value_type> expired_ops;
702  unsigned expired_count = 0;
703  auto it = std::remove_if(m_ops.begin(), m_ops.end(),
704  [&](const std::shared_ptr<CurlOperation> &handler) {
705  auto expired = handler->GetOperationExpiry() < now;
706  if (expired) {
707  expired_ops.push_back(handler);
708  expired_count++;
709  }
710  return expired;
711  });
712  m_ops.erase(it, m_ops.end());
713 
714  // The contents of our pipe and the in-memory queue are now off by expired_count.
715  // Read exactly that many bytes from the pipe and throw them away.
716  char throwaway[64];
717  unsigned bytes_to_read = expired_count;
718  while (bytes_to_read > 0) {
719  size_t chunk = std::min<size_t>(sizeof(throwaway), bytes_to_read);
720  ssize_t n = read(m_read_fd, throwaway, chunk);
721  if (n > 0) {
722  bytes_to_read -= n;
723  } else if (n == -1) {
724  if (errno == EINTR) {
725  continue;
726  } else {
727  // EWOULDBLOCK is a possibility if there's a synchronization error;
728  // for now, just continue on as if we were successful in reading out
729  // the missing bytes
730  break;
731  }
732  } else {
733  break;
734  }
735  }
736 
737  // Note: the failure handler may trigger new operations submitted to the queue
738  // (which requires the lock to be held) such as a prefetch operation that gets split
739  // into multiple sub-operations.
740  //
741  // Thus, we must unlock the mutex protecting the queue and avoid touching the shared state of
742  // m_ops.
743  lk.unlock();
744  for (auto &handler : expired_ops) {
745  if (handler) handler->Fail(XrdCl::errOperationExpired, 0, "Operation expired while in queue");
746  }
747 }
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90

◆ GetDefaultMaxPendingOps()

static unsigned XrdClHttp::HandlerQueue::GetDefaultMaxPendingOps ( )
inlinestatic

Definition at line 194 of file XrdClHttpUtil.hh.

194 {return m_default_max_pending_ops;}

◆ GetHandle()

CURL * HandlerQueue::GetHandle ( )

Definition at line 671 of file XrdClHttpUtil.cc.

671  {
672  if (m_handles.size()) {
673  auto result = m_handles.back();
674  m_handles.pop_back();
675  return result;
676  }
677 
678  return ::GetHandle(EnableCurlHeaderDump());
679 }
CURL * GetHandle(bool verbose)

References XrdClHttp::GetHandle().

+ Here is the call graph for this function:

◆ GetMonitoringJson()

std::string HandlerQueue::GetMonitoringJson ( )
static

Definition at line 823 of file XrdClHttpUtil.cc.

824 {
825  auto consumed = m_ops_consumed.load(std::memory_order_relaxed);
826  auto produced = m_ops_produced.load(std::memory_order_relaxed);
827  return "{"
828  "\"produced\":" + std::to_string(produced) + ","
829  "\"consumed\":" + std::to_string(consumed) + ","
830  "\"pending\":" + std::to_string(produced - consumed) + ","
831  "\"rejected\":" + std::to_string(m_ops_rejected.load(std::memory_order_relaxed)) +
832  "}";
833 }

◆ PollFD()

int XrdClHttp::HandlerQueue::PollFD ( ) const
inline

Definition at line 176 of file XrdClHttpUtil.hh.

176 {return m_read_fd;}

◆ Produce()

void HandlerQueue::Produce ( std::shared_ptr< CurlOperation handler)

Definition at line 750 of file XrdClHttpUtil.cc.

751 {
752  auto handler_expiry = handler->GetOperationExpiry();
753  std::unique_lock<std::mutex> lk{m_mutex};
754  m_producer_cv.wait_until(lk,
755  handler_expiry,
756  [&]{return m_ops.size() < m_max_pending_ops;}
757  );
758  if (std::chrono::steady_clock::now() > handler_expiry) {
759  lk.unlock();
760  handler->Fail(XrdCl::errOperationExpired, 0, "Operation expired while waiting for worker");
761  m_ops_rejected.fetch_add(1, std::memory_order_relaxed);
762  return;
763  }
764 
765  m_ops.push_back(handler);
766  char ready[] = "1";
767  while (true) {
768  auto result = write(m_write_fd, ready, 1);
769  if (result == -1) {
770  if (errno == EINTR) {
771  continue;
772  } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
773  // This should never happen, but if it does, just continue
774  // as if we successfully wrote the notification to the pipe.
775  break;
776  }
777  throw std::runtime_error(strerror(errno));
778  }
779  break;
780  }
781 
782  lk.unlock();
783  m_consumer_cv.notify_one();
784  m_ops_produced.fetch_add(1, std::memory_order_relaxed);
785 }
#define write(a, b, c)
Definition: XrdPosix.hh:123

References XrdCl::errOperationExpired, and write.

◆ RecycleHandle()

void HandlerQueue::RecycleHandle ( CURL curl)

Definition at line 682 of file XrdClHttpUtil.cc.

682  {
683  m_handles.push_back(curl);
684 }

◆ ReleaseHandles()

void HandlerQueue::ReleaseHandles ( )

Definition at line 879 of file XrdClHttpUtil.cc.

880 {
881  for (auto handle : m_handles) {
882  curl_easy_cleanup(handle);
883  }
884  m_handles.clear();
885 }

◆ Shutdown()

void HandlerQueue::Shutdown ( )

Definition at line 871 of file XrdClHttpUtil.cc.

872 {
873  std::unique_lock lock(m_mutex);
874  m_shutdown = true;
875  m_consumer_cv.notify_all();
876 }

◆ TryConsume()

std::shared_ptr< CurlOperation > HandlerQueue::TryConsume ( )

Definition at line 836 of file XrdClHttpUtil.cc.

837 {
838  std::unique_lock<std::mutex> lk(m_mutex);
839  if (m_ops.size() == 0) {
840  std::shared_ptr<CurlOperation> result;
841  return result;
842  }
843 
844  std::shared_ptr<CurlOperation> result = m_ops.front();
845  m_ops.pop_front();
846 
847  char ready[1];
848  while (true) {
849  auto result = read(m_read_fd, ready, 1);
850  if (result == -1) {
851  if (errno == EINTR) {
852  continue;
853  } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
854  // This should never happen, but if it does, just continue
855  // as if we successfully read the byte.
856  break;
857  }
858  throw std::runtime_error(strerror(errno));
859  }
860  break;
861  }
862 
863  lk.unlock();
864  m_producer_cv.notify_one();
865  m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
866 
867  return result;
868 }

References read().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: