XRootD
XrdClHttpUtil.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3 /* */
4 /* This file is part of the XrdClHttp client plugin for XRootD. */
5 /* */
6 /* XRootD is free software: you can redistribute it and/or modify it under */
7 /* the terms of the GNU Lesser General Public License as published by the */
8 /* Free Software Foundation, either version 3 of the License, or (at your */
9 /* option) any later version. */
10 /* */
11 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14 /* License for more details. */
15 /* */
16 /* The copyright holder's institutional names and contributor's names may not */
17 /* be used to endorse or promote products derived from this software without */
18 /* specific prior written permission of the institution or contributor. */
19 /******************************************************************************/
20 
21 #include "XrdClHttpFile.hh"
22 #include "XrdClHttpOps.hh"
23 #include "XrdClHttpOptionsCache.hh"
24 #include "XrdClHttpUtil.hh"
25 #include "XrdClHttpWorker.hh"
26 
27 #include <XProtocol/XProtocol.hh>
28 #include <XrdCl/XrdClDefaultEnv.hh>
29 #include <XrdCl/XrdClLog.hh>
30 #include <XrdCl/XrdClURL.hh>
32 #include <XrdOuc/XrdOucCRC.hh>
34 #include <XrdSys/XrdSysPageSize.hh>
35 #include <XrdVersion.hh>
36 
37 #include <curl/curl.h>
38 #include <openssl/bio.h>
39 #include <openssl/evp.h>
40 
41 #include <fcntl.h>
42 #include <fstream>
43 #ifdef __APPLE__
44 #include <pthread.h>
45 #else
46 #include <sys/syscall.h>
47 #include <sys/types.h>
48 #endif
49 #include <unistd.h>
50 
51 #include <charconv>
52 #include <sstream>
53 #include <stdexcept>
54 #include <utility>
55 
56 using namespace XrdClHttp;
57 
58 thread_local std::vector<CURL*> HandlerQueue::m_handles;
59 std::atomic<unsigned> CurlWorker::m_maintenance_period = 5;
60 std::vector<std::unique_ptr<XrdClHttp::CurlWorker>> CurlWorker::m_workers;
61 std::mutex CurlWorker::m_workers_mutex;
62 
63 // Performance statistics for the worker
64 std::atomic<uint64_t> CurlWorker::m_conncall_errors = 0;
65 std::atomic<uint64_t> CurlWorker::m_conncall_req = 0;
66 std::atomic<uint64_t> CurlWorker::m_conncall_success = 0;
67 std::atomic<uint64_t> CurlWorker::m_conncall_timeout = 0;
68 decltype(CurlWorker::m_ops) CurlWorker::m_ops = {};
69 std::vector<std::atomic<std::chrono::system_clock::rep>*> CurlWorker::m_workers_last_completed_cycle;
70 std::vector<std::atomic<std::chrono::system_clock::rep>*> CurlWorker::m_workers_oldest_op;
71 std::mutex CurlWorker::m_worker_stats_mutex;
72 
73 // Performance statistics for the queue
74 std::atomic<uint64_t> HandlerQueue::m_ops_consumed = 0; // Count of operations consumed from the queue.
75 std::atomic<uint64_t> HandlerQueue::m_ops_produced = 0; // Count of operations added to the queue.
76 std::atomic<uint64_t> HandlerQueue::m_ops_rejected = 0; // Count of operations rejected by the queue.
77 
78 // shutdown + init trigger, must be last of the static members
79 CurlWorker::initcontrol CurlWorker::m_initcontrol;
80 
82  CURL *curl{nullptr};
83  time_t expiry{0};
84 };
85 
86 namespace {
87 
88 pid_t getthreadid() {
89 #if defined(__APPLE__)
90  uint64_t pth_threadid;
91  pthread_threadid_np(pthread_self(), &pth_threadid);
92  return pth_threadid;
93 #elif defined(__linux__)
94  // NOTE: glibc 2.30 finally provides a gettid() wrapper; however,
95  // we currently support RHEL 8, which is based on glibc 2.28. Until
96  // we drop that platform, it's easier to do the syscall directly on Linux
97  // instead of additional ifdef calls.
98  return syscall(SYS_gettid);
99 #else
100  return getpid();
101 #endif
102 }
103 
104 }
105 
106 bool XrdClHttp::HTTPStatusIsError(unsigned status) {
107  return (status < 100) || (status >= 400);
108 }
109 
110 std::pair<uint16_t, uint32_t> XrdClHttp::HTTPStatusConvert(unsigned status) {
111  switch (status) {
112  case 400: // Bad Request
113  return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
114  case 401: // Unauthorized (needs authentication)
115  return std::make_pair(XrdCl::errErrorResponse, kXR_NotAuthorized);
116  case 402: // Payment Required
117  case 403: // Forbidden (failed authorization)
118  return std::make_pair(XrdCl::errErrorResponse, kXR_NotAuthorized);
119  case 404:
120  return std::make_pair(XrdCl::errErrorResponse, kXR_NotFound);
121  case 405: // Method not allowed
122  case 406: // Not acceptable
123  return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
124  case 407: // Proxy Authentication Required
125  return std::make_pair(XrdCl::errErrorResponse, kXR_NotAuthorized);
126  case 408: // Request timeout
127  return std::make_pair(XrdCl::errErrorResponse, kXR_ReqTimedOut);
128  case 409: // Conflict
129  return std::make_pair(XrdCl::errErrorResponse, kXR_Conflict);
130  case 410: // Gone
131  return std::make_pair(XrdCl::errErrorResponse, kXR_NotFound);
132  case 411: // Length required
133  case 412: // Precondition failed
134  case 413: // Payload too large
135  case 414: // URI too long
136  case 415: // Unsupported Media Type
137  case 416: // Range Not Satisfiable
138  case 417: // Expectation Failed
139  case 418: // I'm a teapot
140  return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
141  case 421: // Misdirected Request
142  case 422: // Unprocessable Content
143  return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
144  case 423: // Locked
145  return std::make_pair(XrdCl::errErrorResponse, kXR_FileLocked);
146  case 424: // Failed Dependency
147  case 425: // Too Early
148  case 426: // Upgrade Required
149  case 428: // Precondition Required
150  return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
151  case 429: // Too Many Requests
152  return std::make_pair(XrdCl::errErrorResponse, kXR_Overloaded);
153  case 431: // Request Header Fields Too Large
154  return std::make_pair(XrdCl::errErrorResponse, kXR_InvalidRequest);
155  case 451: // Unavailable For Legal Reasons
156  return std::make_pair(XrdCl::errErrorResponse, kXR_Impossible);
157  case 500: // Internal Server Error
158  case 501: // Not Implemented
159  case 502: // Bad Gateway
160  case 503: // Service Unavailable
161  return std::make_pair(XrdCl::errErrorResponse, kXR_ServerError);
162  case 504: // Gateway Timeout
163  return std::make_pair(XrdCl::errErrorResponse, kXR_ReqTimedOut);
164  case 507: // Insufficient Storage
165  return std::make_pair(XrdCl::errErrorResponse, kXR_overQuota);
166  case 508: // Loop Detected
167  case 510: // Not Extended
168  case 511: // Network Authentication Required
169  return std::make_pair(XrdCl::errErrorResponse, kXR_ServerError);
170  }
171  return std::make_pair(XrdCl::errUnknown, status);
172 }
173 
174 std::pair<uint16_t, uint32_t> CurlCodeConvert(CURLcode res) {
175  switch (res) {
176  case CURLE_OK:
177  return std::make_pair(XrdCl::errNone, 0);
178  case CURLE_COULDNT_RESOLVE_PROXY:
179  case CURLE_COULDNT_RESOLVE_HOST:
180  return std::make_pair(XrdCl::errInvalidAddr, 0);
181  case CURLE_LOGIN_DENIED:
182  // Commented-out cases are for platforms (RHEL7) where the error
183  // codes are undefined.
184  //case CURLE_AUTH_ERROR:
185  //case CURLE_SSL_CLIENTCERT:
186  case CURLE_REMOTE_ACCESS_DENIED:
187  return std::make_pair(XrdCl::errLoginFailed, EACCES);
188  case CURLE_SSL_CONNECT_ERROR:
189  case CURLE_SSL_ENGINE_NOTFOUND:
190  case CURLE_SSL_ENGINE_SETFAILED:
191  case CURLE_SSL_CERTPROBLEM:
192  case CURLE_SSL_CIPHER:
193  case 51: // In old curl versions, this is CURLE_PEER_FAILED_VERIFICATION; that constant was changed to be 60 / CURLE_SSL_CACERT
194  case CURLE_SSL_SHUTDOWN_FAILED:
195  case CURLE_SSL_CRL_BADFILE:
196  case CURLE_SSL_ISSUER_ERROR:
197  case CURLE_SSL_CACERT: // value is 60; merged with CURLE_PEER_FAILED_VERIFICATION
198  //case CURLE_SSL_PINNEDPUBKEYNOTMATCH:
199  //case CURLE_SSL_INVALIDCERTSTATUS:
200  return std::make_pair(XrdCl::errTlsError, 0);
201  case CURLE_SEND_ERROR:
202  case CURLE_RECV_ERROR:
203  return std::make_pair(XrdCl::errSocketError, EIO);
204  case CURLE_COULDNT_CONNECT:
205  case CURLE_GOT_NOTHING:
206  return std::make_pair(XrdCl::errConnectionError, ECONNREFUSED);
207  case CURLE_OPERATION_TIMEDOUT:
208 #ifdef HAVE_XPROTOCOL_TIMEREXPIRED
210 #else
211  return std::make_pair(XrdCl::errOperationExpired, ESTALE);
212 #endif
213  case CURLE_UNSUPPORTED_PROTOCOL:
214  case CURLE_NOT_BUILT_IN:
215  return std::make_pair(XrdCl::errNotSupported, ENOSYS);
216  case CURLE_FAILED_INIT:
217  return std::make_pair(XrdCl::errInternal, 0);
218  case CURLE_URL_MALFORMAT:
219  return std::make_pair(XrdCl::errInvalidArgs, res);
220  //case CURLE_WEIRD_SERVER_REPLY:
221  //case CURLE_HTTP2:
222  //case CURLE_HTTP2_STREAM:
223  return std::make_pair(XrdCl::errCorruptedHeader, res);
224  case CURLE_PARTIAL_FILE:
225  return std::make_pair(XrdCl::errDataError, res);
226  // These two errors indicate a failure in the callback. That
227  // should generate their own failures, meaning this should never
228  // get use.
229  case CURLE_READ_ERROR:
230  case CURLE_WRITE_ERROR:
231  return std::make_pair(XrdCl::errInternal, res);
232  case CURLE_RANGE_ERROR:
233  case CURLE_BAD_CONTENT_ENCODING:
234  return std::make_pair(XrdCl::errNotSupported, res);
235  case CURLE_TOO_MANY_REDIRECTS:
236  return std::make_pair(XrdCl::errRedirectLimit, res);
237  default:
238  return std::make_pair(XrdCl::errUnknown, res);
239  }
240 }
241 
242 bool HeaderParser::Base64Decode(std::string_view input, std::array<unsigned char, 32> &output) {
243  if (input.size() > 44 || input.size() % 4 != 0) return false;
244  if (input.size() == 0) return true;
245 
246  std::unique_ptr<BIO, decltype(&BIO_free_all)> b64(BIO_new(BIO_f_base64()), &BIO_free_all);
247  BIO_set_flags(b64.get(), BIO_FLAGS_BASE64_NO_NL);
248  std::unique_ptr<BIO, decltype(&BIO_free_all)> bmem(
249  BIO_new_mem_buf(const_cast<char *>(input.data()), input.size()), &BIO_free_all);
250  bmem.reset(BIO_push(b64.release(), bmem.release()));
251 
252  // Compute expected length of output; used to verify BIO_read consumes all input
253  size_t expectedLen = static_cast<size_t>(input.size() * 0.75);
254  if (input[input.size() - 1] == '=') {
255  expectedLen -= 1;
256  if (input[input.size() - 2] == '=') {
257  expectedLen -= 1;
258  }
259  }
260 
261  auto len = BIO_read(bmem.get(), &output[0], output.size());
262 
263  if (len == -1 || static_cast<size_t>(len) != expectedLen) return false;
264 
265  return true;
266 }
267 
268 // Parse a single header line.
269 //
270 // Curl promises for its callbacks "The header callback is
271 // called once for each header and only complete header lines
272 // are passed on to the callback".
273 bool HeaderParser::Parse(const std::string &header_line)
274 {
275  if (m_recv_all_headers) {
276  m_recv_all_headers = false;
277  m_recv_status_line = false;
278  }
279 
280  if (!m_recv_status_line) {
281  m_recv_status_line = true;
282 
283  std::stringstream ss(header_line);
284  std::string item;
285  if (!std::getline(ss, item, ' ')) return false;
286  m_resp_protocol = item;
287  if (!std::getline(ss, item, ' ')) return false;
288  try {
289  m_status_code = std::stol(item);
290  } catch (...) {
291  return false;
292  }
293  if (m_status_code < 100 || m_status_code >= 600) {
294  return false;
295  }
296  if (!std::getline(ss, item, '\n')) return false;
297  auto cr_loc = item.find('\r');
298  if (cr_loc != std::string::npos) {
299  m_resp_message = item.substr(0, cr_loc);
300  } else {
301  m_resp_message = item;
302  }
303  return true;
304  }
305 
306  if (header_line.empty() || header_line == "\n" || header_line == "\r\n") {
307  m_recv_all_headers = true;
308  return true;
309  }
310 
311  auto found = header_line.find(":");
312  if (found == std::string::npos) {
313  return false;
314  }
315 
316  std::string header_name = header_line.substr(0, found);
317  if (!Canonicalize(header_name)) {
318  return false;
319  }
320 
321  found += 1;
322  while (found < header_line.size()) {
323  if (header_line[found] != ' ') {break;}
324  found += 1;
325  }
326  std::string header_value = header_line.substr(found);
327  // Note: ignoring the fact headers are only supposed to contain ASCII.
328  // We should trim out UTF-8.
329  header_value.erase(header_value.find_last_not_of(" \r\n\t") + 1);
330 
331  // Record the line in our header structure. Will be returned as part
332  // of the response info object.
333  auto iter = m_headers.find(header_name);
334  if (iter == m_headers.end()) {
335  m_headers.insert(iter, {header_name, {header_value}});
336  } else {
337  iter->second.push_back(header_value);
338  }
339 
340  if (header_name == "Allow") {
341  std::string_view val(header_value);
342  while (!val.empty()) {
343  auto found = val.find(',');
344  auto method = val.substr(0, found);
345  if (method == "PROPFIND") {
346  auto new_verbs = static_cast<unsigned>(m_allow_verbs) | static_cast<unsigned>(VerbsCache::HttpVerb::kPROPFIND);
347  m_allow_verbs = static_cast<VerbsCache::HttpVerb>(new_verbs);
348  }
349  if (found == std::string_view::npos) break;
350  val = val.substr(found + 1);
351  }
352  if (static_cast<unsigned>(m_allow_verbs) & ~static_cast<unsigned>(VerbsCache::HttpVerb::kUnknown)) {
353  m_allow_verbs = static_cast<VerbsCache::HttpVerb>(static_cast<unsigned>(m_allow_verbs) & ~static_cast<unsigned>(VerbsCache::HttpVerb::kUnknown));
354  }
355  } else if (header_name == "Content-Length") {
356  try {
357  m_content_length = std::stoll(header_value);
358  } catch (...) {
359  return false;
360  }
361  }
362  else if (header_name == "Content-Type") {
363  std::string_view val(header_value);
364  auto found = val.find(";");
365  auto first_type = val.substr(0, found);
366  m_multipart_byteranges = first_type == "multipart/byteranges";
367  if (m_multipart_byteranges) {
368  auto remainder = val.substr(found + 1);
369  found = remainder.find("boundary=");
370  if (found != std::string_view::npos) {
371  SetMultipartSeparator(remainder.substr(found + 9));
372  }
373  }
374  }
375  else if (header_name == "Content-Range") {
376  auto found = header_value.find(" ");
377  if (found == std::string::npos) {
378  return false;
379  }
380  std::string range_unit = header_value.substr(0, found);
381  if (range_unit != "bytes") {
382  return false;
383  }
384  auto range_resp = header_value.substr(found + 1);
385  found = range_resp.find("/");
386  if (found == std::string::npos) {
387  return false;
388  }
389  auto incl_range = range_resp.substr(0, found);
390  found = incl_range.find("-");
391  if (found == std::string::npos) {
392  return false;
393  }
394  auto first_pos = incl_range.substr(0, found);
395  try {
396  m_response_offset = std::stoll(first_pos);
397  } catch (...) {
398  return false;
399  }
400  auto last_pos = incl_range.substr(found + 1);
401  size_t last_byte;
402  try {
403  last_byte = std::stoll(last_pos);
404  } catch (...) {
405  return false;
406  }
407  m_content_length = last_byte - m_response_offset + 1;
408  }
409  else if (header_name == "Location") {
410  m_location = header_value;
411  } else if (header_name == "Digest") {
412  ParseDigest(header_value, m_checksums);
413  }
414  else if (header_name == "Etag")
415  {
416  // Note, the original hader name is ETag, renamed to Etag in parsing
417  // remove additional quotes
418  m_etag = header_value;
419  m_etag.erase(remove(m_etag.begin(), m_etag.end(), '\"'), m_etag.end());
420  }
421  else if (header_name == "Cache-Control")
422  {
423  m_cache_control = header_value;
424  }
425 
426  return true;
427 }
428 
429 // Parse a RFC 3230 header into the checksum info structure
430 //
431 // If the parsing fails, the second element of the tuple will be false.
432 void HeaderParser::ParseDigest(const std::string &digest, XrdClHttp::ChecksumInfo &info) {
433  std::string_view view(digest);
434  std::array<unsigned char, 32> checksum_value;
435  std::string digest_lower;
436  while (!view.empty()) {
437  auto nextsep = view.find(',');
438  auto entry = view.substr(0, nextsep);
439  if (nextsep == std::string_view::npos) {
440  view = "";
441  } else {
442  view = view.substr(nextsep + 1);
443  }
444  nextsep = entry.find('=');
445  auto name = entry.substr(0, nextsep);
446  auto value = entry.substr(nextsep + 1);
447  digest_lower.clear();
448  digest_lower.resize(name.size());
449  std::transform(name.begin(), name.end(), digest_lower.begin(), [](unsigned char c) {
450  return std::tolower(c);
451  });
452  if (digest_lower == "md5") {
453  if (value.size() != 24) {
454  continue;
455  }
456  if (Base64Decode(value, checksum_value)) {
457  info.Set(XrdClHttp::ChecksumType::kMD5, checksum_value);
458  }
459  } else if (digest_lower == "crc32c") {
460  // XRootD currently incorrectly base64-encodes crc32c checksums; see
461  // https://github.com/xrootd/xrootd/issues/2456
462  // For backward comaptibility, if this looks like base64 encoded (8
463  // bytes long and last two bytes are padding), then we base64 decode.
464  if (value.size() == 8 && value[6] == '=' && value[7] == '=') {
465  if (Base64Decode(value, checksum_value)) {
466  info.Set(XrdClHttp::ChecksumType::kCRC32C, checksum_value);
467  }
468  continue;
469  }
470  std::size_t pos{0};
471  unsigned long val;
472  try {
473  val = std::stoul(value.data(), &pos, 16);
474  } catch (...) {
475  continue;
476  }
477  if (pos == value.size()) {
478  checksum_value[0] = (val >> 24) & 0xFF;
479  checksum_value[1] = (val >> 16) & 0xFF;
480  checksum_value[2] = (val >> 8) & 0xFF;
481  checksum_value[3] = val & 0xFF;
482  info.Set(XrdClHttp::ChecksumType::kCRC32C, checksum_value);
483  }
484  }
485  }
486 }
487 
488 // Convert the checksum type to a RFC 3230 digest name as recorded by IANA here:
489 // https://www.iana.org/assignments/http-dig-alg/http-dig-alg.xhtml
491  switch (type) {
492  case XrdClHttp::ChecksumType::kMD5:
493  return "MD5";
494  case XrdClHttp::ChecksumType::kCRC32C:
495  return "CRC32c";
496  case XrdClHttp::ChecksumType::kSHA1:
497  return "SHA";
498  case XrdClHttp::ChecksumType::kSHA256:
499  return "SHA-256";
500  default:
501  return "";
502  }
503 }
504 
505 // This clever approach was inspired by golang's net/textproto
506 bool HeaderParser::validHeaderByte(unsigned char c)
507 {
508  const static uint64_t mask_lower = 0 |
509  uint64_t((1<<10)-1) << '0' |
510  uint64_t(1) << '!' |
511  uint64_t(1) << '#' |
512  uint64_t(1) << '$' |
513  uint64_t(1) << '%' |
514  uint64_t(1) << '&' |
515  uint64_t(1) << '\'' |
516  uint64_t(1) << '*' |
517  uint64_t(1) << '+' |
518  uint64_t(1) << '-' |
519  uint64_t(1) << '.';
520 
521  const static uint64_t mask_upper = 0 |
522  uint64_t((1<<26)-1) << ('a'-64) |
523  uint64_t((1<<26)-1) << ('A'-64) |
524  uint64_t(1) << ('^'-64) |
525  uint64_t(1) << ('_'-64) |
526  uint64_t(1) << ('`'-64) |
527  uint64_t(1) << ('|'-64) |
528  uint64_t(1) << ('~'-64);
529 
530  if (c >= 128) return false;
531  if (c >= 64) return (uint64_t(1)<<(c-64)) & mask_upper;
532  return (uint64_t(1) << c) & mask_lower;
533 }
534 
535 bool HeaderParser::Canonicalize(std::string &headerName)
536 {
537  auto upper = true;
538  const static int toLower = 'a' - 'A';
539  for (size_t idx=0; idx<headerName.size(); idx++) {
540  char c = headerName[idx];
541  if (!validHeaderByte(c)) {
542  return false;
543  }
544  if (upper && 'a' <= c && c <= 'z') {
545  c -= toLower;
546  } else if (!upper && 'A' <= c && c <= 'Z') {
547  c += toLower;
548  }
549  headerName[idx] = c;
550  upper = c == '-';
551  }
552  return true;
553 }
554 
555 HandlerQueue::HandlerQueue(unsigned max_pending_ops) :
556  m_max_pending_ops(max_pending_ops)
557 {
558  int filedes[2];
559  auto result = pipe(filedes);
560  if (result == -1) {
561  throw std::runtime_error(strerror(errno));
562  }
563  if (fcntl(filedes[0], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1 || fcntl(filedes[1], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1) {
564  close(filedes[0]);
565  close(filedes[1]);
566  throw std::runtime_error(strerror(errno));
567  }
568  m_read_fd = filedes[0];
569  m_write_fd = filedes[1];
570 };
571 
572 namespace {
573 
574 bool EnableCurlHeaderDump() {
575  auto *log = XrdCl::DefaultEnv::GetLog();
576  if (log && log->GetLevel() >= XrdCl::Log::DumpMsg)
577  return true;
578 
579  return false;
580 }
581 
582 // Debug callback for libcurl headers; enabled with XRD_LOGLEVEL=Dump
583 int DumpHeader(CURL *handle, curl_infotype type, char *data, size_t size, void *clientp) {
584  (void)handle;
585  auto *logger = static_cast<XrdCl::Log *>(clientp);
586  if (!logger || !data || size == 0) {
587  return 0;
588  }
589 
590  const char *direction = nullptr;
591  switch (type) {
592  case CURLINFO_HEADER_OUT:
593  direction = ">";
594  break;
595  case CURLINFO_HEADER_IN:
596  direction = "<";
597  break;
598  default:
599  return 0;
600  }
601 
602  const std::string redacted = obfuscateAuth(std::string(data, size));
603  logger->Debug(kLogXrdClHttp, "%s %s", direction, redacted.c_str());
604  return 0;
605 }
606 
607 }
608 
609 // Trim left and right side of a string_view for space characters
610 std::string_view XrdClHttp::trim_view(const std::string_view &input_view) {
611  auto view = XrdClHttp::ltrim_view(input_view);
612  for (size_t idx = 0; idx < input_view.size(); idx++) {
613  if (!isspace(view[view.size() - 1 - idx])) {
614  return view.substr(0, view.size() - idx);
615  }
616  }
617  return "";
618 }
619 
620 // Trim the left side of a string_view for space
621 std::string_view XrdClHttp::ltrim_view(const std::string_view &input_view) {
622  for (size_t idx = 0; idx < input_view.size(); idx++) {
623  if (!isspace(input_view[idx])) {
624  return input_view.substr(idx);
625  }
626  }
627  return "";
628 }
629 
630 CURL *
631 XrdClHttp::GetHandle(bool verbose) {
632  auto result = curl_easy_init();
633  if (result == nullptr) {
634  return result;
635  }
636 
637  curl_easy_setopt(result, CURLOPT_USERAGENT, "xrdcl-http/" XrdVERSION);
638  curl_easy_setopt(result, CURLOPT_DEBUGFUNCTION, DumpHeader);
639  curl_easy_setopt(result, CURLOPT_DEBUGDATA, XrdCl::DefaultEnv::GetLog());
640  if (verbose)
641  curl_easy_setopt(result, CURLOPT_VERBOSE, 1L);
642 
643  auto env = XrdCl::DefaultEnv::GetEnv();
644  std::string ca_file;
645  if (!env->GetString("HttpCertFile", ca_file) || ca_file.empty()) {
646  char *x509_ca_file = getenv("X509_CERT_FILE");
647  if (x509_ca_file) {
648  ca_file = std::string(x509_ca_file);
649  }
650  }
651  if (!ca_file.empty()) {
652  curl_easy_setopt(result, CURLOPT_CAINFO, ca_file.c_str());
653  }
654  std::string ca_dir;
655  if (!env->GetString("HttpCertDir", ca_dir) || ca_dir.empty()) {
656  char *x509_ca_dir = getenv("X509_CERT_DIR");
657  if (x509_ca_dir) {
658  ca_dir = std::string(x509_ca_dir);
659  }
660  }
661  if (!ca_dir.empty()) {
662  curl_easy_setopt(result, CURLOPT_CAPATH, ca_dir.c_str());
663  }
664 
665  curl_easy_setopt(result, CURLOPT_BUFFERSIZE, 32*1024);
666 
667  return result;
668 }
669 
670 CURL *
672  if (m_handles.size()) {
673  auto result = m_handles.back();
674  m_handles.pop_back();
675  return result;
676  }
677 
678  return ::GetHandle(EnableCurlHeaderDump());
679 }
680 
681 void
683  m_handles.push_back(curl);
684 }
685 
686 void
688 {
689  std::unique_lock<std::mutex> lk(m_mutex);
690  auto now = std::chrono::steady_clock::now();
691 
692  // Iterate through the paused transfers, checking if they are done.
693  for (auto &op : m_ops) {
694  if (!op->IsPaused()) continue;
695 
696  if (op->TransferStalled(0, now)) {
697  op->ContinueHandle();
698  }
699  }
700 
701  std::vector<decltype(m_ops)::value_type> expired_ops;
702  unsigned expired_count = 0;
703  auto it = std::remove_if(m_ops.begin(), m_ops.end(),
704  [&](const std::shared_ptr<CurlOperation> &handler) {
705  auto expired = handler->GetOperationExpiry() < now;
706  if (expired) {
707  expired_ops.push_back(handler);
708  expired_count++;
709  }
710  return expired;
711  });
712  m_ops.erase(it, m_ops.end());
713 
714  // The contents of our pipe and the in-memory queue are now off by expired_count.
715  // Read exactly that many bytes from the pipe and throw them away.
716  char throwaway[64];
717  unsigned bytes_to_read = expired_count;
718  while (bytes_to_read > 0) {
719  size_t chunk = std::min<size_t>(sizeof(throwaway), bytes_to_read);
720  ssize_t n = read(m_read_fd, throwaway, chunk);
721  if (n > 0) {
722  bytes_to_read -= n;
723  } else if (n == -1) {
724  if (errno == EINTR) {
725  continue;
726  } else {
727  // EWOULDBLOCK is a possibility if there's a synchronization error;
728  // for now, just continue on as if we were successful in reading out
729  // the missing bytes
730  break;
731  }
732  } else {
733  break;
734  }
735  }
736 
737  // Note: the failure handler may trigger new operations submitted to the queue
738  // (which requires the lock to be held) such as a prefetch operation that gets split
739  // into multiple sub-operations.
740  //
741  // Thus, we must unlock the mutex protecting the queue and avoid touching the shared state of
742  // m_ops.
743  lk.unlock();
744  for (auto &handler : expired_ops) {
745  if (handler) handler->Fail(XrdCl::errOperationExpired, 0, "Operation expired while in queue");
746  }
747 }
748 
749 void
750 HandlerQueue::Produce(std::shared_ptr<CurlOperation> handler)
751 {
752  auto handler_expiry = handler->GetOperationExpiry();
753  std::unique_lock<std::mutex> lk{m_mutex};
754  m_producer_cv.wait_until(lk,
755  handler_expiry,
756  [&]{return m_ops.size() < m_max_pending_ops;}
757  );
758  if (std::chrono::steady_clock::now() > handler_expiry) {
759  lk.unlock();
760  handler->Fail(XrdCl::errOperationExpired, 0, "Operation expired while waiting for worker");
761  m_ops_rejected.fetch_add(1, std::memory_order_relaxed);
762  return;
763  }
764 
765  m_ops.push_back(handler);
766  char ready[] = "1";
767  while (true) {
768  auto result = write(m_write_fd, ready, 1);
769  if (result == -1) {
770  if (errno == EINTR) {
771  continue;
772  } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
773  // This should never happen, but if it does, just continue
774  // as if we successfully wrote the notification to the pipe.
775  break;
776  }
777  throw std::runtime_error(strerror(errno));
778  }
779  break;
780  }
781 
782  lk.unlock();
783  m_consumer_cv.notify_one();
784  m_ops_produced.fetch_add(1, std::memory_order_relaxed);
785 }
786 
787 std::shared_ptr<CurlOperation>
788 HandlerQueue::Consume(std::chrono::steady_clock::duration dur)
789 {
790  std::unique_lock<std::mutex> lk(m_mutex);
791  m_consumer_cv.wait_for(lk, dur, [&]{return m_ops.size() > 0 || m_shutdown;});
792  if (m_shutdown || m_ops.empty()) {
793  return {};
794  }
795 
796  std::shared_ptr<CurlOperation> result = m_ops.front();
797  m_ops.pop_front();
798 
799  char ready[1];
800  while (true) {
801  auto result = read(m_read_fd, ready, 1);
802  if (result == -1) {
803  if (errno == EINTR) {
804  continue;
805  } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
806  // This should never happen, but if it does, just continue
807  // as if we successfully read the byte.
808  break;
809  }
810  throw std::runtime_error(strerror(errno));
811  }
812  break;
813  }
814 
815  lk.unlock();
816  m_producer_cv.notify_one();
817  m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
818 
819  return result;
820 }
821 
822 std::string
823 HandlerQueue::GetMonitoringJson()
824 {
825  auto consumed = m_ops_consumed.load(std::memory_order_relaxed);
826  auto produced = m_ops_produced.load(std::memory_order_relaxed);
827  return "{"
828  "\"produced\":" + std::to_string(produced) + ","
829  "\"consumed\":" + std::to_string(consumed) + ","
830  "\"pending\":" + std::to_string(produced - consumed) + ","
831  "\"rejected\":" + std::to_string(m_ops_rejected.load(std::memory_order_relaxed)) +
832  "}";
833 }
834 
835 std::shared_ptr<CurlOperation>
836 HandlerQueue::TryConsume()
837 {
838  std::unique_lock<std::mutex> lk(m_mutex);
839  if (m_ops.size() == 0) {
840  std::shared_ptr<CurlOperation> result;
841  return result;
842  }
843 
844  std::shared_ptr<CurlOperation> result = m_ops.front();
845  m_ops.pop_front();
846 
847  char ready[1];
848  while (true) {
849  auto result = read(m_read_fd, ready, 1);
850  if (result == -1) {
851  if (errno == EINTR) {
852  continue;
853  } else if (errno == EAGAIN || errno == EWOULDBLOCK) {
854  // This should never happen, but if it does, just continue
855  // as if we successfully read the byte.
856  break;
857  }
858  throw std::runtime_error(strerror(errno));
859  }
860  break;
861  }
862 
863  lk.unlock();
864  m_producer_cv.notify_one();
865  m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
866 
867  return result;
868 }
869 
870 void
871 HandlerQueue::Shutdown()
872 {
873  std::unique_lock lock(m_mutex);
874  m_shutdown = true;
875  m_consumer_cv.notify_all();
876 }
877 
878 void
879 HandlerQueue::ReleaseHandles()
880 {
881  for (auto handle : m_handles) {
882  curl_easy_cleanup(handle);
883  }
884  m_handles.clear();
885 }
886 
887 CurlWorker::CurlWorker(std::shared_ptr<HandlerQueue> queue, VerbsCache &cache, XrdCl::Log* logger) :
888  m_cache(cache),
889  m_queue(queue),
890  m_logger(logger)
891 {
892  {
893  std::unique_lock lk(m_worker_stats_mutex);
894  m_stats_offset = m_workers_last_completed_cycle.size();
895  m_workers_last_completed_cycle.push_back(&m_last_completed_cycle);
896  m_workers_oldest_op.push_back(&m_oldest_op);
897  }
898  int pipeInfo[2];
899  if ((pipe(pipeInfo) == -1) || (fcntl(pipeInfo[0], F_SETFD, FD_CLOEXEC)) || (fcntl(pipeInfo[1], F_SETFD, FD_CLOEXEC))) {
900  throw std::runtime_error("Failed to create shutdown monitoring pipe for curl worker");
901  }
902  m_shutdown_pipe_r = pipeInfo[0];
903  m_shutdown_pipe_w = pipeInfo[1];
904 
905  // Handle setup of the X509 authentication
906  auto env = XrdCl::DefaultEnv::GetEnv();
907  env->GetString("HttpClientCertFile", m_x509_client_cert_file);
908  env->GetString("HttpClientKeyFile", m_x509_client_key_file);
909 }
910 
911 std::tuple<std::string, std::string> CurlWorker::ClientX509CertKeyFile() const
912 {
913  return std::make_tuple(m_x509_client_cert_file, m_x509_client_key_file);
914 }
915 
916 std::string
918 {
919  auto now = std::chrono::system_clock::now().time_since_epoch().count();
920  auto oldest_op = now;
921  auto oldest_cycle = now;
922  {
923  std::unique_lock lk(m_worker_stats_mutex);
924  for (const auto &entry : m_workers_last_completed_cycle) {
925  if (!entry) {continue;}
926  auto cycle = entry->load(std::memory_order_relaxed);
927  if (cycle < oldest_cycle) oldest_cycle = cycle;
928  }
929  for (const auto &entry : m_workers_oldest_op) {
930  if (!entry) {continue;}
931  auto op = entry->load(std::memory_order_relaxed);
932  if (op < oldest_op) oldest_op = op;
933  }
934  }
935  auto oldest_op_dbl = std::chrono::duration<double>(std::chrono::system_clock::time_point(std::chrono::system_clock::duration(oldest_op)).time_since_epoch()).count();
936  auto oldest_cycle_dbl = std::chrono::duration<double>(std::chrono::system_clock::time_point(std::chrono::system_clock::duration(oldest_cycle)).time_since_epoch()).count();
937  std::string retval = "{"
938  "\"oldest_op\":" + std::to_string(oldest_op_dbl) + ","
939  "\"oldest_cycle\":" + std::to_string(oldest_cycle_dbl) + ","
940  ;
941 
942  for (size_t verb_idx = 0; verb_idx < static_cast<int>(XrdClHttp::CurlOperation::HttpVerb::Count); verb_idx++) {
943  const auto &verb_str = XrdClHttp::CurlOperation::GetVerbString(static_cast<XrdClHttp::CurlOperation::HttpVerb>(verb_idx));
944  for (size_t op_idx = 0; op_idx < 402; op_idx++) {
945  if (op_idx == 401) continue;
946 
947  auto &op_stats = m_ops[verb_idx][op_idx];
948  auto duration = op_stats.m_duration.load(std::memory_order_relaxed);
949  if (duration == 0) continue;
950 
951  std::string prefix = "http_" + verb_str + "_" + ((op_idx == 402) ? "invalid" : std::to_string(200 + op_idx)) + "_";
952 
953  auto duration_dbl = std::chrono::duration<double>(std::chrono::steady_clock::duration(duration)).count();
954  retval += "\"" + prefix + "duration\":" + std::to_string(duration_dbl) + ",";
955 
956  duration = op_stats.m_pause_duration.load(std::memory_order_relaxed);
957  if (duration > 0) {
958  duration_dbl = std::chrono::duration<double>(std::chrono::steady_clock::duration(duration)).count();
959  retval += "\"" + prefix + "pause_duration\":" + std::to_string(duration_dbl) + ",";
960  }
961 
962  auto count = op_stats.m_bytes.load(std::memory_order_relaxed);
963  if (count) retval += "\"" + prefix + "bytes\":" + std::to_string(count) + ",";
964  count = op_stats.m_error.load(std::memory_order_relaxed);
965  if (count) retval += "\"" + prefix + "error\":" + std::to_string(count) + ",";
966  count = op_stats.m_finished.load(std::memory_order_relaxed);
967  if (count) retval += "\"" + prefix + "finished\":" + std::to_string(count) + ",";
968  count = op_stats.m_client_timeout.load(std::memory_order_relaxed);
969  if (count) retval += "\"" + prefix + "client_timeout\":" + std::to_string(count) + ",";
970  count = op_stats.m_server_timeout.load(std::memory_order_relaxed);
971  if (count) retval += "\"" + prefix + "server_timeout\":" + std::to_string(count) + ",";
972  }
973  {
974  auto &op_stats = m_ops[verb_idx][401];
975  auto duration = op_stats.m_duration.load(std::memory_order_relaxed);
976  if (duration == 0) continue;
977 
978  std::string prefix = "http_" + verb_str + "_";
979 
980  auto duration_dbl = std::chrono::duration<double>(std::chrono::steady_clock::duration(duration)).count();
981  retval += "\"" + prefix + "preheader_duration\":" + std::to_string(duration_dbl) + ",";
982 
983  auto count = op_stats.m_started.load(std::memory_order_relaxed);
984  if (count) retval += "\"" + prefix + "started\":" + std::to_string(count) + ",";
985  count = op_stats.m_error.load(std::memory_order_relaxed);
986  if (count) retval += "\"" + prefix + "preheader_error\":" + std::to_string(count) + ",";
987  count = op_stats.m_finished.load(std::memory_order_relaxed);
988  if (count) retval += "\"" + prefix + "preheader_finished\":" + std::to_string(count) + ",";
989  count = op_stats.m_server_timeout.load(std::memory_order_relaxed);
990  if (count) retval += "\"" + prefix + "preheader_timeout\":" + std::to_string(count) + ",";
991  count = op_stats.m_conncall_timeout.load(std::memory_order_relaxed);
992  if (count) retval += "\"" + prefix + "conncall_timeout\":" + std::to_string(count) + ",";
993  }
994  }
995 
996  retval +=
997  "\"conncall_error\":" + std::to_string(m_conncall_errors.load(std::memory_order_relaxed)) + ","
998  "\"conncall_started\":" + std::to_string(m_conncall_req.load(std::memory_order_relaxed)) + ","
999  "\"conncall_success\":" + std::to_string(m_conncall_success.load(std::memory_order_relaxed)) + ","
1000  "\"conncall_timeout\":" + std::to_string(m_conncall_timeout.load(std::memory_order_relaxed)) +
1001  "}";
1002 
1003  return retval;
1004 }
1005 
1006 void
1007 CurlWorker::OpRecord(XrdClHttp::CurlOperation &op, OpKind kind)
1008 {
1009  int sc = op.GetStatusCode();
1010  // - We encode everything pre-header as integer "401". We include a 100-continue request as "pre-header".
1011  // - Status codes out of the acceptable range are labeled "402"
1012  // - Otherwise, we store it in the array shifted by 200 (to avoid more sparsity)
1013  if (sc < 0 || kind == OpKind::Start || sc == 100) {
1014  sc = 401;
1015  } else if (sc < 200 || sc >= 600) {
1016  sc = 402;
1017  } else {
1018  sc -= 200;
1019  }
1020  auto [bytes, pre_headers, post_headers, pause_duration] = op.StatisticsReset();
1021  auto &op_stats = m_ops[static_cast<int>(op.GetVerb())][sc];
1022  op_stats.m_bytes.fetch_add(bytes, std::memory_order_relaxed);
1023  op_stats.m_duration.fetch_add((sc == 401) ? pre_headers.count() : post_headers.count(), std::memory_order_relaxed);
1024  op_stats.m_pause_duration.fetch_add(pause_duration.count(), std::memory_order_relaxed);
1025  if (pre_headers != std::chrono::steady_clock::duration::zero() && sc != 401) {
1026  auto &old_stats = m_ops[static_cast<int>(op.GetVerb())][401];
1027  old_stats.m_duration.fetch_add(pre_headers.count(), std::memory_order_relaxed);
1028  }
1029  switch (kind) {
1030  case OpKind::ConncallTimeout:
1031  op_stats.m_conncall_timeout.fetch_add(1, std::memory_order_relaxed);
1032  break;
1033  case OpKind::ClientTimeout:
1034  op_stats.m_client_timeout.fetch_add(1, std::memory_order_relaxed);
1035  break;
1036  case OpKind::Error:
1037  op_stats.m_error.fetch_add(1, std::memory_order_relaxed);
1038  break;
1039  case OpKind::Finish:
1040  op_stats.m_finished.fetch_add(1, std::memory_order_relaxed);
1041  break;
1042  case OpKind::Start:
1043  op_stats.m_started.fetch_add(1, std::memory_order_relaxed);
1044  break;
1045  case OpKind::ServerTimeout:
1046  op_stats.m_server_timeout.fetch_add(1, std::memory_order_relaxed);
1047  break;
1048  case OpKind::Update:
1049  break;
1050  }
1051 }
1052 
1053 void
1054 CurlWorker::Start(std::unique_ptr<XrdClHttp::CurlWorker> self, std::thread tid)
1055 {
1056  {
1057  std::unique_lock lock(m_workers_mutex);
1058  m_workers.emplace_back(std::move(self));
1059  m_self_tid = std::move(tid);
1060  }
1061  std::unique_lock lock(m_start_lock);
1062  m_start_complete = true;
1063  m_start_complete_cv.notify_one();
1064 }
1065 
1066 void
1068 {
1069  {
1070  std::unique_lock lock(myself->m_start_lock);
1071  myself->m_start_complete_cv.wait(lock, [&]{return myself->m_start_complete;});
1072  }
1073  try {
1074  myself->Run();
1075  } catch (...) {
1076  myself->m_logger->Warning(kLogXrdClHttp, "Curl worker got an exception");
1077  {
1078  std::unique_lock lock(m_workers_mutex);
1079  auto iter = std::remove_if(m_workers.begin(), m_workers.end(), [&](std::unique_ptr<XrdClHttp::CurlWorker> &worker){return worker.get() == myself;});
1080  m_workers.erase(iter);
1081  }
1082  }
1083 }
1084 
1085 void
1087  int max_pending = 50;
1088  XrdCl::DefaultEnv::GetEnv()->GetInt("HttpMaxPendingOps", max_pending);
1089  m_continue_queue.reset(new HandlerQueue(max_pending));
1090  auto &queue = *m_queue.get();
1091  m_logger->Debug(kLogXrdClHttp, "Started a curl worker");
1092 
1093  CURLM *multi_handle = curl_multi_init();
1094  if (multi_handle == nullptr) {
1095  throw std::runtime_error("Failed to create curl multi-handle");
1096  }
1097 
1098  int running_handles = 0;
1099  time_t last_maintenance = time(NULL);
1100  CURLMcode mres = CURLM_OK;
1101 
1102  // Map from a file descriptor that has an outstanding broker request
1103  // to the corresponding CURL handle.
1104  std::unordered_map<int, WaitingForBroker> broker_reqs;
1105  std::vector<struct curl_waitfd> waitfds;
1106 
1107  bool want_shutdown = false;
1108  while (!want_shutdown) {
1109  m_last_completed_cycle.store(std::chrono::system_clock::now().time_since_epoch().count());
1110  auto oldest_op = std::chrono::system_clock::now();
1111  for (const auto &entry : m_op_map) {
1112  OpRecord(*entry.second.first, OpKind::Update);
1113  if (entry.second.second < oldest_op) {
1114  oldest_op = entry.second.second;
1115  }
1116  }
1117  m_oldest_op.store(oldest_op.time_since_epoch().count());
1118 
1119  // Try continuing any available handles that have more data
1120  while (true) {
1121  auto op = m_continue_queue->TryConsume();
1122  if (!op) {
1123  break;
1124  }
1125  // Avoid race condition where external thread added a continue operation to queue
1126  // while the curl worker thread failed the transfer.
1127  if (op->IsDone()) {
1128  m_logger->Debug(kLogXrdClHttp, "Ignoring continuation of operation that has already completed");
1129  continue;
1130  }
1131  m_logger->Debug(kLogXrdClHttp, "Continuing the curl handle from op %p on thread %d", op.get(), getthreadid());
1132  auto curl = op->GetCurlHandle();
1133  if (!op->ContinueHandle()) {
1134  op->Fail(XrdCl::errInternal, 0, "Failed to continue the curl handle for the operation");
1135  OpRecord(*op, OpKind::Error);
1136  op->ReleaseHandle();
1137  if (curl) {
1138  curl_multi_remove_handle(multi_handle, curl);
1139  curl_easy_cleanup(curl);
1140  m_op_map.erase(curl);
1141  }
1142  running_handles -= 1;
1143  continue;
1144  } else {
1145  auto iter = m_op_map.find(curl);
1146  if (iter != m_op_map.end()) iter->second.second = std::chrono::system_clock::now();
1147  }
1148  }
1149  // Consume from the shared new operation queue
1150  while (running_handles < static_cast<int>(m_max_ops)) {
1151  auto op = running_handles == 0 ? queue.Consume(std::chrono::seconds(1)) : queue.TryConsume();
1152  if (!op) {
1153  break;
1154  }
1155  auto curl = queue.GetHandle();
1156  if (curl == nullptr) {
1157  m_logger->Debug(kLogXrdClHttp, "Unable to allocate a curl handle");
1158  op->Fail(XrdCl::errInternal, ENOMEM, "Unable to get allocate a curl handle");
1159  continue;
1160  }
1161  try {
1162  auto rv = op->Setup(curl, *this);
1163  if (!rv) {
1164  m_logger->Debug(kLogXrdClHttp, "Failed to setup the curl handle");
1165  op->Fail(XrdCl::errInternal, ENOMEM, "Failed to setup the curl handle for the operation");
1166  continue;
1167  }
1168  if (!op->FinishSetup(curl)) {
1169  m_logger->Debug(kLogXrdClHttp, "Failed to finish setup of the curl handle");
1170  op->Fail(XrdCl::errInternal, ENOMEM, "Failed to finish setup of the curl handle for the operation");
1171  continue;
1172  }
1173  } catch (...) {
1174  m_logger->Debug(kLogXrdClHttp, "Unable to setup the curl handle");
1175  op->Fail(XrdCl::errInternal, ENOMEM, "Failed to setup the curl handle for the operation");
1176  continue;
1177  }
1178  op->SetContinueQueue(m_continue_queue);
1179 
1180  if (op->IsDone()) {
1181  continue;
1182  }
1183  m_op_map[curl] = {op, std::chrono::system_clock::now()};
1184 
1185  // If the operation requires the result of the OPTIONS verb to function, then
1186  // we add that to the multi-handle instead, chaining the two calls together.
1187  if (op->RequiresOptions()) {
1188  std::string modified_url;
1189  std::shared_ptr<CurlOptionsOp> options_op(
1190  new CurlOptionsOp(
1191  curl, op,
1192  std::string(
1193  VerbsCache::GetUrlKey(op->GetUrl(), modified_url)
1194  ),
1195  m_logger, op->GetConnCalloutFunc()
1196  )
1197  );
1198  // Note this `curl` variable is not local to the conditional; it is the curl handle of the
1199  // CurlOptionsOp and will be added below to the multi-handle, causing it - not the parent's
1200  // curl handle - to be executed.
1201  curl = queue.GetHandle();
1202  if (curl == nullptr) {
1203  m_logger->Debug(kLogXrdClHttp, "Unable to allocate a curl handle");
1204  op->Fail(XrdCl::errInternal, ENOMEM, "Unable to get allocate a curl handle");
1205  OpRecord(*op, OpKind::Error);
1206  continue;
1207  }
1208  auto rv = options_op->Setup(curl, *this);
1209  if (!rv) {
1210  m_logger->Debug(kLogXrdClHttp, "Failed to allocate a curl handle for OPTIONS");
1211  continue;
1212  }
1213  m_op_map[curl] = {options_op, std::chrono::system_clock::now()};
1214  OpRecord(*options_op, OpKind::Start);
1215  running_handles += 1;
1216  } else {
1217  OpRecord(*op, OpKind::Start);
1218  }
1219 
1220  auto mres = curl_multi_add_handle(multi_handle, curl);
1221  if (mres != CURLM_OK) {
1222  m_logger->Debug(kLogXrdClHttp, "Unable to add operation to the curl multi-handle");
1223  op->Fail(XrdCl::errInternal, mres, "Unable to add operation to the curl multi-handle");
1224  OpRecord(*op, OpKind::Error);
1225  continue;
1226  }
1227  m_logger->Debug(kLogXrdClHttp, "Added request for URL %s to worker thread for processing", op->GetUrl().c_str());
1228  running_handles += 1;
1229  }
1230 
1231  // Maintain the periodic reporting of thread activity and fail any operations
1232  // that have expired / timed out.
1233  time_t now = time(NULL);
1234  time_t next_maintenance = last_maintenance + m_maintenance_period.load(std::memory_order_relaxed);
1235  if (now >= next_maintenance) {
1236  m_queue->Expire();
1237  m_continue_queue->Expire();
1238  m_logger->Debug(kLogXrdClHttp, "Curl worker thread %d is running %d operations",
1239  getthreadid(), running_handles);
1240  last_maintenance = now;
1241 
1242  // Timeout all the pending broker requests.
1243  std::vector<std::pair<int, CURL *>> expired_ops;
1244  for (const auto &entry : broker_reqs) {
1245  if (entry.second.expiry < now) {
1246  expired_ops.emplace_back(entry.first, entry.second.curl);
1247  }
1248  }
1249  for (const auto &entry : expired_ops) {
1250  auto iter = m_op_map.find(entry.second);
1251  if (iter == m_op_map.end()) {
1252  m_logger->Warning(kLogXrdClHttp, "Found an expired curl handle with no corresponding operation!");
1253  } else {
1254 
1255  CurlOptionsOp *options_op = nullptr;
1256  if ((options_op = dynamic_cast<CurlOptionsOp*>(iter->second.first.get())) != nullptr) {
1257  auto parent_op = options_op->GetOperation();
1258  bool parent_op_failed = false;
1259  if (parent_op->IsRedirect()) {
1260  std::string target;
1261  if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1262  auto iter = m_op_map.find(options_op->GetParentCurlHandle());
1263  if (iter != m_op_map.end()) {
1264  OpRecord(*iter->second.first, OpKind::Error);
1265  iter->second.first->Fail(XrdCl::errErrorResponse, 0, "Failed to send OPTIONS to redirect target");
1266  m_op_map.erase(iter);
1267  running_handles -= 1;
1268  }
1269  parent_op_failed = true;
1270  } else {
1271  OpRecord(*parent_op, OpKind::Start);
1272  }
1273  } else {
1274  OpRecord(*parent_op, OpKind::Start);
1275  }
1276  if (!parent_op_failed){
1277  curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1278  }
1279  }
1280 
1281  iter->second.first->Fail(XrdCl::errConnectionError, 1, "Timeout: connection never provided for request");
1282  iter->second.first->ReleaseHandle();
1283  OpRecord(*(iter->second.first), OpKind::ConncallTimeout);
1284  m_op_map.erase(entry.second);
1285  curl_easy_cleanup(entry.second);
1286  running_handles -= 1;
1287  }
1288  broker_reqs.erase(entry.first);
1289  m_conncall_timeout.fetch_add(1, std::memory_order_relaxed);
1290  }
1291 
1292  // Cleanup the fake connection cache entries.
1294  }
1295 
1296  waitfds.clear();
1297  waitfds.resize(3 + broker_reqs.size());
1298 
1299  waitfds[0].fd = queue.PollFD();
1300  waitfds[0].events = CURL_WAIT_POLLIN;
1301  waitfds[0].revents = 0;
1302  waitfds[1].fd = m_continue_queue->PollFD();
1303  waitfds[1].events = CURL_WAIT_POLLIN;
1304  waitfds[1].revents = 0;
1305  waitfds[2].fd = m_shutdown_pipe_r;
1306  waitfds[2].revents = 0;
1307  waitfds[2].events = CURL_WAIT_POLLIN | CURL_WAIT_POLLPRI;
1308 
1309  int idx = 3;
1310  for (const auto &entry : broker_reqs) {
1311  waitfds[idx].fd = entry.first;
1312  waitfds[idx].events = CURL_WAIT_POLLIN|CURL_WAIT_POLLPRI;
1313  waitfds[idx].revents = 0;
1314  idx += 1;
1315  }
1316 
1317  long timeo;
1318  curl_multi_timeout(multi_handle, &timeo);
1319  // These commented-out lines are purposely left; will need to revisit after the 0.9.1 release;
1320  // for now, they are too verbose on RHEL7.
1321  //m_logger->Debug(kLogXrdClHttp, "Curl advises a timeout of %ld ms", timeo);
1322  if (running_handles && timeo == -1) {
1323  // Bug workaround: we've seen RHEL7 libcurl have a race condition where it'll not
1324  // set a timeout while doing the DNS lookup; assume that if there are running handles
1325  // but no timeout, we've hit this bug.
1326  //m_logger->Debug(kLogXrdClHttp, "Will sleep for up to 50ms");
1327  mres = curl_multi_wait(multi_handle, &waitfds[0], waitfds.size(), 50, nullptr);
1328  } else {
1329  //m_logger->Debug(kLogXrdClHttp, "Will sleep for up to %d seconds", max_sleep_time);
1330  //mres = curl_multi_wait(multi_handle, &waitfds[0], waitfds.size(), max_sleep_time*1000, nullptr);
1331  // Temporary test: we've been seeing DNS lookups timeout on additional platforms. Switch to always
1332  // poll as curl_multi_wait doesn't seem to get notified when DNS lookups are done.
1333  mres = curl_multi_wait(multi_handle, &waitfds[0], waitfds.size(), 50, nullptr);
1334  }
1335  if (mres != CURLM_OK) {
1336  m_logger->Warning(kLogXrdClHttp, "Failed to wait on multi-handle: %d", mres);
1337  }
1338 
1339  // Iterate through the waiting broker callbacks.
1340  for (const auto &entry : waitfds) {
1341  // Ignore the queue's poll fd.
1342  if (waitfds[0].fd == entry.fd || waitfds[1].fd == entry.fd) {
1343  continue;
1344  }
1345  // Handle shutdown requests
1346  if ((waitfds[2].fd == entry.fd) && entry.revents) {
1347  want_shutdown = true;
1348  break;
1349  }
1350  if ((entry.revents & CURL_WAIT_POLLIN) != CURL_WAIT_POLLIN) {
1351  continue;
1352  }
1353  auto handle = broker_reqs[entry.fd].curl;
1354  auto iter = m_op_map.find(handle);
1355  if (iter == m_op_map.end()) {
1356  m_logger->Warning(kLogXrdClHttp, "Internal error: broker responded on FD %d but no corresponding curl operation", entry.fd);
1357  broker_reqs.erase(entry.fd);
1358  m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1359  continue;
1360  }
1361  std::string err;
1362  auto result = iter->second.first->WaitSocketCallback(err);
1363  if (result == -1) {
1364  m_logger->Warning(kLogXrdClHttp, "Error when invoking the broker callback: %s", err.c_str());
1365 
1366  CurlOptionsOp *options_op = nullptr;
1367  if ((options_op = dynamic_cast<CurlOptionsOp*>(iter->second.first.get())) != nullptr) {
1368  auto parent_op = options_op->GetOperation();
1369  bool parent_op_failed = false;
1370  if (parent_op->IsRedirect()) {
1371  std::string target;
1372  if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1373  auto iter = m_op_map.find(options_op->GetParentCurlHandle());
1374  if (iter != m_op_map.end()) {
1375  OpRecord(*iter->second.first, OpKind::Error);
1376  iter->second.first->Fail(XrdCl::errErrorResponse, 0, "Failed to send OPTIONS to redirect target");
1377  m_op_map.erase(iter);
1378  running_handles -= 1;
1379  }
1380  parent_op_failed = true;
1381  } else {
1382  OpRecord(*parent_op, OpKind::Start);
1383  }
1384  } else {
1385  OpRecord(*parent_op, OpKind::Start);
1386  }
1387  if (!parent_op_failed){
1388  curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1389  }
1390  }
1391 
1392  iter->second.first->Fail(XrdCl::errErrorResponse, 1, err);
1393  OpRecord(*iter->second.first, OpKind::Error);
1394  m_op_map.erase(handle);
1395  broker_reqs.erase(entry.fd);
1396  m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1397  running_handles -= 1;
1398  } else {
1399  broker_reqs.erase(entry.fd);
1400  curl_multi_add_handle(multi_handle, handle);
1401  m_conncall_success.fetch_add(1, std::memory_order_relaxed);
1402  }
1403  }
1404 
1405  // Do maintenance on the multi-handle
1406  int still_running;
1407  auto mres = curl_multi_perform(multi_handle, &still_running);
1408  if (mres == CURLM_CALL_MULTI_PERFORM) {
1409  continue;
1410  } else if (mres != CURLM_OK) {
1411  m_logger->Warning(kLogXrdClHttp, "Failed to perform multi-handle operation: %d", mres);
1412  break;
1413  }
1414 
1415  CURLMsg *msg;
1416  do {
1417  int msgq = 0;
1418  msg = curl_multi_info_read(multi_handle, &msgq);
1419  if (msg && (msg->msg == CURLMSG_DONE)) {
1420  if (!msg->easy_handle) {
1421  m_logger->Warning(kLogXrdClHttp, "Logic error: got a callback for a null handle");
1422  mres = CURLM_BAD_EASY_HANDLE;
1423  break;
1424  }
1425  auto iter = m_op_map.find(msg->easy_handle);
1426  if (iter == m_op_map.end()) {
1427  m_logger->Error(kLogXrdClHttp, "Logic error: got a callback for an entry that doesn't exist");
1428  mres = CURLM_BAD_EASY_HANDLE;
1429  break;
1430  }
1431  auto op = iter->second.first;
1432  auto res = msg->data.result;
1433  bool keep_handle = false;
1434  bool waiting_on_callout = false;
1435  if (res == CURLE_OK) {
1436  auto sc = op->GetStatusCode();
1437  OpRecord(*op, OpKind::Finish);
1438  if (HTTPStatusIsError(sc)) {
1439  auto httpErr = HTTPStatusConvert(sc);
1440  op->Fail(httpErr.first, httpErr.second, op->GetStatusMessage());
1441  op->ReleaseHandle();
1442  // If this was a failed CurlOptionsOp, then we re-activate the parent handle.
1443  // If the parent handle was stopped at a redirect that now returns failure, then
1444  // we'll clean it up.
1445  CurlOptionsOp *options_op = nullptr;
1446  if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get())) != nullptr) {
1447  auto parent_op = options_op->GetOperation();
1448  bool parent_op_failed = false;
1449  if (parent_op->IsRedirect()) {
1450  std::string target;
1451  if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1452  OpRecord(*parent_op, OpKind::Error);
1453  m_op_map.erase(options_op->GetParentCurlHandle());
1454  running_handles -= 1;
1455  parent_op_failed = true;
1456  } else {
1457  OpRecord(*parent_op, OpKind::Start);
1458  }
1459  } else {
1460  OpRecord(*parent_op, OpKind::Start);
1461  }
1462  // Have curl execute the parent operation
1463  if (!parent_op_failed) {
1464  curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1465  }
1466  }
1467  // The curl operation was successful, it's just the HTTP request failed; recycle the handle.
1468  queue.RecycleHandle(iter->first);
1469  } else {
1470  CurlOptionsOp *options_op = nullptr;
1471  // If this was a successful OPTIONS op, invoke the parent operation.
1472  if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get()))) {
1473  options_op->Success();
1474  options_op->ReleaseHandle();
1475  // Note: op is scoped external to the conditional block
1476  op = options_op->GetOperation();
1477  op->OptionsDone();
1478  OpRecord(*op, OpKind::Start);
1479  curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1480  curl_multi_remove_handle(multi_handle, iter->first);
1481  queue.RecycleHandle(iter->first);
1482  }
1483  // Check to see if the operation ended in a redirect (note: this might)
1484  // be invoked a second time if this was the parent operation of an OPTIONS
1485  // op.
1486  if (op->IsRedirect()) {
1487  std::string target;
1488  switch (op->Redirect(target)) {
1489  case CurlOperation::RedirectAction::Fail:
1490  if (options_op) {
1491  // In this case, we failed immediately after an OPTIONS finished.
1492  // Since there's a Start recorded after the OPTIONS processing, we
1493  // must record an error.
1494  // In the non-OPTIONS case, we never recorded a second start and
1495  // don't need a matching failure.
1496  OpRecord(*op, OpKind::Error);
1497  }
1498  keep_handle = false;
1499  break;
1500  case CurlOperation::RedirectAction::Reinvoke:
1501  if (!options_op) {
1502  // In this case, the redirect occurred without any prior
1503  // OPTIONS call. This implies that `op` is the original call
1504  // and we need to restart it later and record another op start.
1505  keep_handle = true;
1506  OpRecord(*op, OpKind::Start);
1507  }
1508  break;
1509  case CurlOperation::RedirectAction::ReinvokeAfterAllow:
1510  {
1511  // The redirect resulted in a new endpoint where the cache lookup failed;
1512  // we need to know what HTTP verbs are in the server's Allow list before this
1513  // operation can continue. Inject a new CurlOptionsOp and chain it to the one
1514  // being processed. Once the OPTIONS request is done, then we'll restart this
1515  // operation.
1516  std::string modified_url;
1517  target = VerbsCache::GetUrlKey(target, modified_url);
1518  options_op = new CurlOptionsOp(iter->first, op, target, m_logger, op->GetConnCalloutFunc());
1519  std::shared_ptr<CurlOperation> new_op(options_op);
1520  auto curl = queue.GetHandle();
1521  if (curl == nullptr) {
1522  m_logger->Debug(kLogXrdClHttp, "Unable to allocate a curl handle");
1523  op->Fail(XrdCl::errInternal, ENOMEM, "Unable to get allocate a curl handle");
1524  keep_handle = false;
1525  options_op = nullptr;
1526  break;
1527  }
1528  OpRecord(*new_op, OpKind::Start);
1529  try {
1530  auto rv = new_op->Setup(curl, *this);
1531  if (!rv) {
1532  m_logger->Debug(kLogXrdClHttp, "Unable to configure a curl handle for OPTIONS");
1533  keep_handle = false;
1534  options_op = nullptr;
1535  break;
1536  }
1537  } catch (...) {
1538  m_logger->Debug(kLogXrdClHttp, "Unable to setup the curl handle for the OPTIONS operation");
1539  new_op->Fail(XrdCl::errInternal, ENOMEM, "Failed to setup the curl handle for the OPTIONS operation");
1540  OpRecord(*new_op, OpKind::Error);
1541  keep_handle = false;
1542  break;
1543  }
1544  new_op->SetContinueQueue(m_continue_queue);
1545  m_op_map[curl] = {new_op, std::chrono::system_clock::now()};
1546  auto mres = curl_multi_add_handle(multi_handle, curl);
1547  if (mres != CURLM_OK) {
1548  m_logger->Debug(kLogXrdClHttp, "Unable to add OPTIONS operation to the curl multi-handle: %s", curl_multi_strerror(mres));
1549  op->Fail(XrdCl::errInternal, mres, "Unable to add OPTIONS operation to the curl multi-handle");
1550  OpRecord(*new_op, OpKind::Error);
1551  break;
1552  }
1553  running_handles += 1;
1554  m_logger->Debug(kLogXrdClHttp, "Invoking the OPTIONS operation before redirect to %s", target.c_str());
1555  // The original curl operation needs to be kept around. Note that because options_op
1556  // is non-nil, we won't re-add the handle to the multi-handle.
1557  keep_handle = true;
1558  }
1559  }
1560  int callout_socket = op->WaitSocket();
1561  if ((waiting_on_callout = callout_socket >= 0)) {
1562  auto expiry = time(nullptr) + 20;
1563  m_logger->Debug(kLogXrdClHttp, "Creating a callout wait request on socket %d", callout_socket);
1564  broker_reqs[callout_socket] = {iter->first, expiry};
1565  m_conncall_req.fetch_add(1, std::memory_order_relaxed);
1566  }
1567  } else if (options_op) {
1568  // In this case, the OPTIONS call happened before the parent operation was started.
1569  curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1570  }
1571  if (keep_handle) {
1572  curl_multi_remove_handle(multi_handle, iter->first);
1573  if (!waiting_on_callout && !options_op) {
1574  curl_multi_add_handle(multi_handle, iter->first);
1575  }
1576  } else if (!options_op) {
1577  op->Success();
1578  op->ReleaseHandle();
1579  // If the handle was successful, then we can recycle it.
1580  queue.RecycleHandle(iter->first);
1581  }
1582  }
1583  } else if (res == CURLE_COULDNT_CONNECT && op->UseConnectionCallout() && !op->GetTriedBoker()) {
1584  // In this case, we need to use the broker and the curl handle couldn't reuse
1585  // an existing socket.
1586  keep_handle = true;
1587  op->SetTriedBoker(); // Flag to ensure we try a connection only once per operation.
1588  std::string err;
1589  int wait_socket = -1;
1590  if (!op->StartConnectionCallout(err) || (wait_socket=op->WaitSocket()) == -1) {
1591  m_logger->Error(kLogXrdClHttp, "Failed to start broker-based connection: %s", err.c_str());
1592  op->ReleaseHandle();
1593  keep_handle = false;
1594  } else {
1595  curl_multi_remove_handle(multi_handle, iter->first);
1596  auto expiry = time(nullptr) + 20;
1597  m_logger->Debug(kLogXrdClHttp, "Curl operation requires a new TCP socket; waiting for callout to respond on socket %d", wait_socket);
1598  broker_reqs[wait_socket] = {iter->first, expiry};
1599  m_conncall_req.fetch_add(1, std::memory_order_relaxed);
1600  }
1601  } else {
1602  if (res == CURLE_ABORTED_BY_CALLBACK || res == CURLE_WRITE_ERROR) {
1603  // We cannot invoke the failure from within a callback as the curl thread and
1604  // original thread of execution may fight over the ownership of the handle memory.
1605  switch (op->GetError()) {
1606  case CurlOperation::OpError::ErrHeaderTimeout:
1607 #ifdef HAVE_XPROTOCOL_TIMEREXPIRED
1608  op->Fail(XrdCl::errOperationExpired, 0, "Origin did not respond with headers within timeout");
1609 #else
1610  op->Fail(XrdCl::errOperationExpired, 0, "Origin did not respond within timeout");
1611 #endif
1612  OpRecord(*op, OpKind::Error);
1613  break;
1614  case CurlOperation::OpError::ErrCallback: {
1615  auto [ecode, emsg] = op->GetCallbackError();
1616  op->Fail(XrdCl::errErrorResponse, ecode, emsg);
1617  OpRecord(*op, OpKind::Error);
1618  break;
1619  }
1620  case CurlOperation::OpError::ErrOperationTimeout:
1621  op->Fail(XrdCl::errOperationExpired, 0, "Operation timed out");
1622  OpRecord(*op, op->IsPaused() ? OpKind::ClientTimeout : OpKind::ServerTimeout);
1623  break;
1624  case CurlOperation::OpError::ErrTransferSlow:
1625  op->Fail(XrdCl::errOperationExpired, 0, "Transfer speed below minimum threshold");
1626  OpRecord(*op, OpKind::ServerTimeout);
1627  break;
1628  case CurlOperation::OpError::ErrTransferClientStall:
1629  op->Fail(XrdCl::errOperationExpired, 0, "Transfer stalled for too long");
1630  OpRecord(*op, OpKind::ClientTimeout);
1631  break;
1632  case CurlOperation::OpError::ErrTransferStall:
1633  op->Fail(XrdCl::errOperationExpired, 0, "Transfer stalled for too long");
1634  OpRecord(*op, OpKind::ServerTimeout);
1635  break;
1636  case CurlOperation::OpError::ErrNone:
1637  op->Fail(XrdCl::errInternal, 0, "Operation was aborted without recording an abort reason");
1638  OpRecord(*op, OpKind::Error);
1639  break;
1640  };
1641  CurlOptionsOp *options_op = nullptr;
1642  if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get())) != nullptr) {
1643  auto parent_op = options_op->GetOperation();
1644  bool parent_op_failed = false;
1645  if (parent_op->IsRedirect()) {
1646  std::string target;
1647  if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1648  auto iter = m_op_map.find(options_op->GetParentCurlHandle());
1649  if (iter != m_op_map.end()) {
1650  OpRecord(*iter->second.first, OpKind::Error);
1651  iter->second.first->Fail(XrdCl::errErrorResponse, 0, "Failed to send OPTIONS to redirect target");
1652  m_op_map.erase(iter);
1653  running_handles -= 1;
1654  }
1655  parent_op_failed = true;
1656  } else {
1657  OpRecord(*parent_op, OpKind::Start);
1658  }
1659  } else {
1660  OpRecord(*parent_op, OpKind::Start);
1661  }
1662  if (!parent_op_failed){
1663  curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1664  }
1665  }
1666  } else {
1667  auto xrdCode = CurlCodeConvert(res);
1668  const auto curl_err = op->GetCurlErrorMessage();
1669  const char *curl_easy_err = curl_easy_strerror(res);
1670  const std::string fail_err = !curl_err.empty() ? curl_err : curl_easy_err;
1671  m_logger->Debug(kLogXrdClHttp, "Curl generated an error: %s (%d)", fail_err.c_str(), res);
1672  op->Fail(xrdCode.first, xrdCode.second, fail_err);
1673  OpRecord(*op, OpKind::Error);
1674  CurlOptionsOp *options_op = nullptr;
1675  if ((options_op = dynamic_cast<CurlOptionsOp*>(op.get())) != nullptr) {
1676  auto parent_op = options_op->GetOperation();
1677  bool parent_op_failed = false;
1678  if (parent_op->IsRedirect()) {
1679  std::string target;
1680  if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1681  auto iter = m_op_map.find(options_op->GetParentCurlHandle());
1682  if (iter != m_op_map.end()) {
1683  OpRecord(*iter->second.first, OpKind::Error);
1684  iter->second.first->Fail(XrdCl::errErrorResponse, 0, "Failed to send OPTIONS to redirect target");
1685  m_op_map.erase(iter);
1686  running_handles -= 1;
1687  }
1688  parent_op_failed = true;
1689  }
1690  }
1691  if (!parent_op_failed){
1692  curl_multi_add_handle(multi_handle, options_op->GetParentCurlHandle());
1693  }
1694  }
1695  }
1696  op->ReleaseHandle();
1697  }
1698  if (!keep_handle) {
1699  curl_multi_remove_handle(multi_handle, iter->first);
1700  if (res != CURLE_OK) {
1701  curl_easy_cleanup(iter->first);
1702  }
1703  for (auto &req : broker_reqs) {
1704  if (req.second.curl == iter->first) {
1705  m_logger->Warning(kLogXrdClHttp, "Curl handle finished while a broker operation was outstanding");
1706  m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1707  }
1708  }
1709  m_op_map.erase(iter);
1710  running_handles -= 1;
1711  }
1712  }
1713  } while (msg);
1714  }
1715 
1716  for (auto map_entry : m_op_map) {
1717  if (mres) {
1718  map_entry.second.first->Fail(XrdCl::errInternal, mres, curl_multi_strerror(mres));
1719  OpRecord(*map_entry.second.first, OpKind::Error);
1720  }
1721  if (multi_handle && map_entry.first) curl_multi_remove_handle(multi_handle, map_entry.first);
1722  }
1723 
1724  m_queue->ReleaseHandles();
1725  curl_multi_cleanup(multi_handle);
1726 }
1727 
1728 void
1729 CurlWorker::Shutdown()
1730 {
1731  m_queue->Shutdown();
1732  if (m_shutdown_pipe_w == -1) {
1733  m_logger->Debug(kLogXrdClHttp, "Curl worker shutdown prior to launch of thread");
1734  return;
1735  }
1736  close(m_shutdown_pipe_w);
1737  m_shutdown_pipe_w = -1;
1738 
1739  // wait for worker thread to exit
1740  m_self_tid.join();
1741 
1742  {
1743  std::unique_lock lk(m_worker_stats_mutex);
1744  m_workers_last_completed_cycle[m_stats_offset] = nullptr;
1745  m_workers_oldest_op[m_stats_offset] = nullptr;
1746  }
1747  m_logger->Debug(kLogXrdClHttp, "Curl worker thread shutdown has completed.");
1748 }
1749 
1750 void
1751 CurlWorker::ShutdownAll()
1752 {
1753  std::unique_lock lock(m_workers_mutex);
1754  for (auto &worker : m_workers) {
1755  worker->Shutdown();
1756  }
1757 }
1758 
1759 CurlWorker::initcontrol::initcontrol()
1760 {
1761  curl_global_init(CURL_GLOBAL_DEFAULT);
1762 }
1763 
1764 CurlWorker::initcontrol::~initcontrol()
1765 {
1766  ShutdownAll();
1767  curl_global_cleanup();
1768 }
@ kXR_InvalidRequest
Definition: XProtocol.hh:1038
@ kXR_Impossible
Definition: XProtocol.hh:1063
@ kXR_TimerExpired
Definition: XProtocol.hh:1067
@ kXR_NotAuthorized
Definition: XProtocol.hh:1042
@ kXR_NotFound
Definition: XProtocol.hh:1043
@ kXR_FileLocked
Definition: XProtocol.hh:1035
@ kXR_overQuota
Definition: XProtocol.hh:1053
@ kXR_Conflict
Definition: XProtocol.hh:1064
@ kXR_ServerError
Definition: XProtocol.hh:1044
@ kXR_Overloaded
Definition: XProtocol.hh:1056
@ kXR_ReqTimedOut
Definition: XProtocol.hh:1066
std::pair< uint16_t, uint32_t > CurlCodeConvert(CURLcode res)
void CURL
std::string obfuscateAuth(const std::string &input)
ssize_t read(int fildes, void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:48
#define write(a, b, c)
Definition: XrdPosix.hh:123
void getline(uchar *buff, int blen)
int emsg(int rc, char *msg)
@ Error
bool Set(ChecksumType ctype, const std::array< unsigned char, g_max_checksum_length > &value)
virtual void Success()=0
virtual void OptionsDone()
bool FinishSetup(CURL *curl)
const std::string & GetUrl() const
std::pair< XErrorCode, std::string > GetCallbackError() const
virtual void Fail(uint16_t errCode, uint32_t errNum, const std::string &)
static const std::string GetVerbString(HttpVerb)
virtual HttpVerb GetVerb() const =0
std::string GetCurlErrorMessage() const
virtual void ReleaseHandle()
virtual bool RequiresOptions() const
static void CleanupDnsCache()
std::tuple< uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration > StatisticsReset()
virtual bool ContinueHandle()
std::string GetStatusMessage() const
CURL * GetCurlHandle() const
CreateConnCalloutType GetConnCalloutFunc() const
Definition: XrdClHttpOps.hh:99
OpError GetError() const
virtual RedirectAction Redirect(std::string &target)
virtual void SetContinueQueue(std::shared_ptr< XrdClHttp::HandlerQueue > queue)
bool StartConnectionCallout(std::string &err)
virtual bool Setup(CURL *curl, CurlWorker &)
CURL * GetParentCurlHandle() const
std::shared_ptr< CurlOperation > GetOperation() const
std::tuple< std::string, std::string > ClientX509CertKeyFile() const
static void RunStatic(CurlWorker *myself)
void Start(std::unique_ptr< XrdClHttp::CurlWorker > self, std::thread tid)
static std::string GetMonitoringJson()
HandlerQueue(unsigned max_pending_ops)
void SetMultipartSeparator(const std::string_view &sep)
static bool Base64Decode(std::string_view input, std::array< unsigned char, 32 > &output)
static void ParseDigest(const std::string &digest, XrdClHttp::ChecksumInfo &info)
static bool Canonicalize(std::string &headerName)
bool Parse(const std::string &headers)
static std::string ChecksumTypeToDigestName(XrdClHttp::ChecksumType type)
static std::string_view GetUrlKey(const std::string &url, std::string &modified_url)
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Handle diagnostics.
Definition: XrdClLog.hh:101
@ DumpMsg
print details of the request and responses
Definition: XrdClLog.hh:113
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
std::pair< uint16_t, uint32_t > HTTPStatusConvert(unsigned status)
CURL * GetHandle(bool verbose)
bool HTTPStatusIsError(unsigned status)
std::string_view ltrim_view(const std::string_view &input_view)
std::string_view trim_view(const std::string_view &input_view)
const uint16_t errUnknown
Unknown error.
Definition: XrdClStatus.hh:50
const uint16_t errInvalidAddr
Definition: XrdClStatus.hh:71
const uint16_t errRedirectLimit
Definition: XrdClStatus.hh:102
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t errLoginFailed
Definition: XrdClStatus.hh:87
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const uint16_t errConnectionError
Definition: XrdClStatus.hh:78
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const uint16_t errSocketError
Definition: XrdClStatus.hh:72
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
const uint16_t errNone
No error.
Definition: XrdClStatus.hh:48
const uint64_t kLogXrdClHttp