35 #include <XrdVersion.hh>
37 #include <curl/curl.h>
38 #include <openssl/bio.h>
39 #include <openssl/evp.h>
46 #include <sys/syscall.h>
47 #include <sys/types.h>
58 thread_local std::vector<CURL*> HandlerQueue::m_handles;
59 std::atomic<unsigned> CurlWorker::m_maintenance_period = 5;
60 std::vector<std::unique_ptr<XrdClHttp::CurlWorker>> CurlWorker::m_workers;
61 std::mutex CurlWorker::m_workers_mutex;
64 std::atomic<uint64_t> CurlWorker::m_conncall_errors = 0;
65 std::atomic<uint64_t> CurlWorker::m_conncall_req = 0;
66 std::atomic<uint64_t> CurlWorker::m_conncall_success = 0;
67 std::atomic<uint64_t> CurlWorker::m_conncall_timeout = 0;
68 decltype(CurlWorker::m_ops) CurlWorker::m_ops = {};
69 std::vector<std::atomic<std::chrono::system_clock::rep>*> CurlWorker::m_workers_last_completed_cycle;
70 std::vector<std::atomic<std::chrono::system_clock::rep>*> CurlWorker::m_workers_oldest_op;
71 std::mutex CurlWorker::m_worker_stats_mutex;
74 std::atomic<uint64_t> HandlerQueue::m_ops_consumed = 0;
75 std::atomic<uint64_t> HandlerQueue::m_ops_produced = 0;
76 std::atomic<uint64_t> HandlerQueue::m_ops_rejected = 0;
79 CurlWorker::initcontrol CurlWorker::m_initcontrol;
89 #if defined(__APPLE__)
90 uint64_t pth_threadid;
91 pthread_threadid_np(pthread_self(), &pth_threadid);
93 #elif defined(__linux__)
98 return syscall(SYS_gettid);
107 return (status < 100) || (status >= 400);
178 case CURLE_COULDNT_RESOLVE_PROXY:
179 case CURLE_COULDNT_RESOLVE_HOST:
181 case CURLE_LOGIN_DENIED:
186 case CURLE_REMOTE_ACCESS_DENIED:
188 case CURLE_SSL_CONNECT_ERROR:
189 case CURLE_SSL_ENGINE_NOTFOUND:
190 case CURLE_SSL_ENGINE_SETFAILED:
191 case CURLE_SSL_CERTPROBLEM:
192 case CURLE_SSL_CIPHER:
194 case CURLE_SSL_SHUTDOWN_FAILED:
195 case CURLE_SSL_CRL_BADFILE:
196 case CURLE_SSL_ISSUER_ERROR:
197 case CURLE_SSL_CACERT:
201 case CURLE_SEND_ERROR:
202 case CURLE_RECV_ERROR:
204 case CURLE_COULDNT_CONNECT:
205 case CURLE_GOT_NOTHING:
207 case CURLE_OPERATION_TIMEDOUT:
208 #ifdef HAVE_XPROTOCOL_TIMEREXPIRED
213 case CURLE_UNSUPPORTED_PROTOCOL:
214 case CURLE_NOT_BUILT_IN:
216 case CURLE_FAILED_INIT:
218 case CURLE_URL_MALFORMAT:
224 case CURLE_PARTIAL_FILE:
229 case CURLE_READ_ERROR:
230 case CURLE_WRITE_ERROR:
232 case CURLE_RANGE_ERROR:
233 case CURLE_BAD_CONTENT_ENCODING:
235 case CURLE_TOO_MANY_REDIRECTS:
243 if (input.size() > 44 || input.size() % 4 != 0)
return false;
244 if (input.size() == 0)
return true;
246 std::unique_ptr<BIO, decltype(&BIO_free_all)> b64(BIO_new(BIO_f_base64()), &BIO_free_all);
247 BIO_set_flags(b64.get(), BIO_FLAGS_BASE64_NO_NL);
248 std::unique_ptr<BIO, decltype(&BIO_free_all)> bmem(
249 BIO_new_mem_buf(
const_cast<char *
>(input.data()), input.size()), &BIO_free_all);
250 bmem.reset(BIO_push(b64.release(), bmem.release()));
253 size_t expectedLen =
static_cast<size_t>(input.size() * 0.75);
254 if (input[input.size() - 1] ==
'=') {
256 if (input[input.size() - 2] ==
'=') {
261 auto len = BIO_read(bmem.get(), &output[0], output.size());
263 if (len == -1 ||
static_cast<size_t>(len) != expectedLen)
return false;
275 if (m_recv_all_headers) {
276 m_recv_all_headers =
false;
277 m_recv_status_line =
false;
280 if (!m_recv_status_line) {
281 m_recv_status_line =
true;
283 std::stringstream ss(header_line);
286 m_resp_protocol = item;
289 m_status_code = std::stol(item);
293 if (m_status_code < 100 || m_status_code >= 600) {
297 auto cr_loc = item.find(
'\r');
298 if (cr_loc != std::string::npos) {
299 m_resp_message = item.substr(0, cr_loc);
301 m_resp_message = item;
306 if (header_line.empty() || header_line ==
"\n" || header_line ==
"\r\n") {
307 m_recv_all_headers =
true;
311 auto found = header_line.find(
":");
312 if (found == std::string::npos) {
316 std::string header_name = header_line.substr(0, found);
322 while (found < header_line.size()) {
323 if (header_line[found] !=
' ') {
break;}
326 std::string header_value = header_line.substr(found);
329 header_value.erase(header_value.find_last_not_of(
" \r\n\t") + 1);
333 auto iter = m_headers.find(header_name);
334 if (iter == m_headers.end()) {
335 m_headers.insert(iter, {header_name, {header_value}});
337 iter->second.push_back(header_value);
340 if (header_name ==
"Allow") {
341 std::string_view val(header_value);
342 while (!val.empty()) {
343 auto found = val.find(
',');
344 auto method = val.substr(0, found);
345 if (method ==
"PROPFIND") {
349 if (found == std::string_view::npos)
break;
350 val = val.substr(found + 1);
355 }
else if (header_name ==
"Content-Length") {
357 m_content_length = std::stoll(header_value);
362 else if (header_name ==
"Content-Type") {
363 std::string_view val(header_value);
364 auto found = val.find(
";");
365 auto first_type = val.substr(0, found);
366 m_multipart_byteranges = first_type ==
"multipart/byteranges";
367 if (m_multipart_byteranges) {
368 auto remainder = val.substr(found + 1);
369 found = remainder.find(
"boundary=");
370 if (found != std::string_view::npos) {
375 else if (header_name ==
"Content-Range") {
376 auto found = header_value.find(
" ");
377 if (found == std::string::npos) {
380 std::string range_unit = header_value.substr(0, found);
381 if (range_unit !=
"bytes") {
384 auto range_resp = header_value.substr(found + 1);
385 found = range_resp.find(
"/");
386 if (found == std::string::npos) {
389 auto incl_range = range_resp.substr(0, found);
390 found = incl_range.find(
"-");
391 if (found == std::string::npos) {
394 auto first_pos = incl_range.substr(0, found);
396 m_response_offset = std::stoll(first_pos);
400 auto last_pos = incl_range.substr(found + 1);
403 last_byte = std::stoll(last_pos);
407 m_content_length = last_byte - m_response_offset + 1;
409 else if (header_name ==
"Location") {
410 m_location = header_value;
411 }
else if (header_name ==
"Digest") {
414 else if (header_name ==
"Etag")
418 m_etag = header_value;
419 m_etag.erase(remove(m_etag.begin(), m_etag.end(),
'\"'), m_etag.end());
421 else if (header_name ==
"Cache-Control")
423 m_cache_control = header_value;
433 std::string_view view(digest);
434 std::array<unsigned char, 32> checksum_value;
435 std::string digest_lower;
436 while (!view.empty()) {
437 auto nextsep = view.find(
',');
438 auto entry = view.substr(0, nextsep);
439 if (nextsep == std::string_view::npos) {
442 view = view.substr(nextsep + 1);
444 nextsep = entry.find(
'=');
445 auto name = entry.substr(0, nextsep);
446 auto value = entry.substr(nextsep + 1);
447 digest_lower.clear();
448 digest_lower.resize(name.size());
449 std::transform(name.begin(), name.end(), digest_lower.begin(), [](
unsigned char c) {
450 return std::tolower(c);
452 if (digest_lower ==
"md5") {
453 if (value.size() != 24) {
457 info.
Set(XrdClHttp::ChecksumType::kMD5, checksum_value);
459 }
else if (digest_lower ==
"crc32c") {
464 if (value.size() == 8 && value[6] ==
'=' && value[7] ==
'=') {
466 info.
Set(XrdClHttp::ChecksumType::kCRC32C, checksum_value);
473 val = std::stoul(value.data(), &pos, 16);
477 if (pos == value.size()) {
478 checksum_value[0] = (val >> 24) & 0xFF;
479 checksum_value[1] = (val >> 16) & 0xFF;
480 checksum_value[2] = (val >> 8) & 0xFF;
481 checksum_value[3] = val & 0xFF;
482 info.
Set(XrdClHttp::ChecksumType::kCRC32C, checksum_value);
492 case XrdClHttp::ChecksumType::kMD5:
494 case XrdClHttp::ChecksumType::kCRC32C:
496 case XrdClHttp::ChecksumType::kSHA1:
498 case XrdClHttp::ChecksumType::kSHA256:
506 bool HeaderParser::validHeaderByte(
unsigned char c)
508 const static uint64_t mask_lower = 0 |
509 uint64_t((1<<10)-1) <<
'0' |
515 uint64_t(1) <<
'\'' |
521 const static uint64_t mask_upper = 0 |
522 uint64_t((1<<26)-1) << (
'a'-64) |
523 uint64_t((1<<26)-1) << (
'A'-64) |
524 uint64_t(1) << (
'^'-64) |
525 uint64_t(1) << (
'_'-64) |
526 uint64_t(1) << (
'`'-64) |
527 uint64_t(1) << (
'|'-64) |
528 uint64_t(1) << (
'~'-64);
530 if (c >= 128)
return false;
531 if (c >= 64)
return (uint64_t(1)<<(c-64)) & mask_upper;
532 return (uint64_t(1) << c) & mask_lower;
538 const static int toLower =
'a' -
'A';
539 for (
size_t idx=0; idx<headerName.size(); idx++) {
540 char c = headerName[idx];
541 if (!validHeaderByte(c)) {
544 if (upper &&
'a' <= c && c <=
'z') {
546 }
else if (!upper &&
'A' <= c && c <=
'Z') {
556 m_max_pending_ops(max_pending_ops)
559 auto result = pipe(filedes);
561 throw std::runtime_error(strerror(errno));
563 if (fcntl(filedes[0], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1 || fcntl(filedes[1], F_SETFL, O_NONBLOCK | O_CLOEXEC) == -1) {
566 throw std::runtime_error(strerror(errno));
568 m_read_fd = filedes[0];
569 m_write_fd = filedes[1];
574 bool EnableCurlHeaderDump() {
583 int DumpHeader(
CURL *handle, curl_infotype type,
char *data,
size_t size,
void *clientp) {
585 auto *logger =
static_cast<XrdCl::Log *
>(clientp);
586 if (!logger || !data || size == 0) {
590 const char *direction =
nullptr;
592 case CURLINFO_HEADER_OUT:
595 case CURLINFO_HEADER_IN:
602 const std::string redacted =
obfuscateAuth(std::string(data, size));
603 logger->Debug(
kLogXrdClHttp,
"%s %s", direction, redacted.c_str());
612 for (
size_t idx = 0; idx < input_view.size(); idx++) {
613 if (!isspace(view[view.size() - 1 - idx])) {
614 return view.substr(0, view.size() - idx);
622 for (
size_t idx = 0; idx < input_view.size(); idx++) {
623 if (!isspace(input_view[idx])) {
624 return input_view.substr(idx);
632 auto result = curl_easy_init();
633 if (result ==
nullptr) {
637 curl_easy_setopt(result, CURLOPT_USERAGENT,
"xrdcl-http/" XrdVERSION);
638 curl_easy_setopt(result, CURLOPT_DEBUGFUNCTION, DumpHeader);
641 curl_easy_setopt(result, CURLOPT_VERBOSE, 1L);
645 if (!env->GetString(
"HttpCertFile", ca_file) || ca_file.empty()) {
646 char *x509_ca_file = getenv(
"X509_CERT_FILE");
648 ca_file = std::string(x509_ca_file);
651 if (!ca_file.empty()) {
652 curl_easy_setopt(result, CURLOPT_CAINFO, ca_file.c_str());
655 if (!env->GetString(
"HttpCertDir", ca_dir) || ca_dir.empty()) {
656 char *x509_ca_dir = getenv(
"X509_CERT_DIR");
658 ca_dir = std::string(x509_ca_dir);
661 if (!ca_dir.empty()) {
662 curl_easy_setopt(result, CURLOPT_CAPATH, ca_dir.c_str());
665 curl_easy_setopt(result, CURLOPT_BUFFERSIZE, 32*1024);
672 if (m_handles.size()) {
673 auto result = m_handles.back();
674 m_handles.pop_back();
683 m_handles.push_back(curl);
689 std::unique_lock<std::mutex> lk(m_mutex);
690 auto now = std::chrono::steady_clock::now();
693 for (
auto &op : m_ops) {
694 if (!op->IsPaused())
continue;
696 if (op->TransferStalled(0, now)) {
697 op->ContinueHandle();
701 std::vector<decltype(m_ops)::value_type> expired_ops;
702 unsigned expired_count = 0;
703 auto it = std::remove_if(m_ops.begin(), m_ops.end(),
704 [&](
const std::shared_ptr<CurlOperation> &handler) {
705 auto expired = handler->GetOperationExpiry() < now;
707 expired_ops.push_back(handler);
712 m_ops.erase(it, m_ops.end());
717 unsigned bytes_to_read = expired_count;
718 while (bytes_to_read > 0) {
719 size_t chunk = std::min<size_t>(
sizeof(throwaway), bytes_to_read);
720 ssize_t n =
read(m_read_fd, throwaway, chunk);
723 }
else if (n == -1) {
724 if (errno == EINTR) {
744 for (
auto &handler : expired_ops) {
750 HandlerQueue::Produce(std::shared_ptr<CurlOperation> handler)
752 auto handler_expiry = handler->GetOperationExpiry();
753 std::unique_lock<std::mutex> lk{m_mutex};
754 m_producer_cv.wait_until(lk,
756 [&]{
return m_ops.size() < m_max_pending_ops;}
758 if (std::chrono::steady_clock::now() > handler_expiry) {
761 m_ops_rejected.fetch_add(1, std::memory_order_relaxed);
765 m_ops.push_back(handler);
768 auto result =
write(m_write_fd, ready, 1);
770 if (errno == EINTR) {
772 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
777 throw std::runtime_error(strerror(errno));
783 m_consumer_cv.notify_one();
784 m_ops_produced.fetch_add(1, std::memory_order_relaxed);
787 std::shared_ptr<CurlOperation>
788 HandlerQueue::Consume(std::chrono::steady_clock::duration dur)
790 std::unique_lock<std::mutex> lk(m_mutex);
791 m_consumer_cv.wait_for(lk, dur, [&]{
return m_ops.size() > 0 || m_shutdown;});
792 if (m_shutdown || m_ops.empty()) {
796 std::shared_ptr<CurlOperation> result = m_ops.front();
801 auto result =
read(m_read_fd, ready, 1);
803 if (errno == EINTR) {
805 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
810 throw std::runtime_error(strerror(errno));
816 m_producer_cv.notify_one();
817 m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
823 HandlerQueue::GetMonitoringJson()
825 auto consumed = m_ops_consumed.load(std::memory_order_relaxed);
826 auto produced = m_ops_produced.load(std::memory_order_relaxed);
828 "\"produced\":" + std::to_string(produced) +
","
829 "\"consumed\":" + std::to_string(consumed) +
","
830 "\"pending\":" + std::to_string(produced - consumed) +
","
831 "\"rejected\":" + std::to_string(m_ops_rejected.load(std::memory_order_relaxed)) +
835 std::shared_ptr<CurlOperation>
836 HandlerQueue::TryConsume()
838 std::unique_lock<std::mutex> lk(m_mutex);
839 if (m_ops.size() == 0) {
840 std::shared_ptr<CurlOperation> result;
844 std::shared_ptr<CurlOperation> result = m_ops.front();
849 auto result =
read(m_read_fd, ready, 1);
851 if (errno == EINTR) {
853 }
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
858 throw std::runtime_error(strerror(errno));
864 m_producer_cv.notify_one();
865 m_ops_consumed.fetch_add(1, std::memory_order_relaxed);
871 HandlerQueue::Shutdown()
873 std::unique_lock lock(m_mutex);
875 m_consumer_cv.notify_all();
879 HandlerQueue::ReleaseHandles()
881 for (
auto handle : m_handles) {
882 curl_easy_cleanup(handle);
893 std::unique_lock lk(m_worker_stats_mutex);
894 m_stats_offset = m_workers_last_completed_cycle.size();
895 m_workers_last_completed_cycle.push_back(&m_last_completed_cycle);
896 m_workers_oldest_op.push_back(&m_oldest_op);
899 if ((pipe(pipeInfo) == -1) || (fcntl(pipeInfo[0], F_SETFD, FD_CLOEXEC)) || (fcntl(pipeInfo[1], F_SETFD, FD_CLOEXEC))) {
900 throw std::runtime_error(
"Failed to create shutdown monitoring pipe for curl worker");
902 m_shutdown_pipe_r = pipeInfo[0];
903 m_shutdown_pipe_w = pipeInfo[1];
907 env->GetString(
"HttpClientCertFile", m_x509_client_cert_file);
908 env->GetString(
"HttpClientKeyFile", m_x509_client_key_file);
913 return std::make_tuple(m_x509_client_cert_file, m_x509_client_key_file);
919 auto now = std::chrono::system_clock::now().time_since_epoch().count();
920 auto oldest_op = now;
921 auto oldest_cycle = now;
923 std::unique_lock lk(m_worker_stats_mutex);
924 for (
const auto &entry : m_workers_last_completed_cycle) {
925 if (!entry) {
continue;}
926 auto cycle = entry->load(std::memory_order_relaxed);
927 if (cycle < oldest_cycle) oldest_cycle = cycle;
929 for (
const auto &entry : m_workers_oldest_op) {
930 if (!entry) {
continue;}
931 auto op = entry->load(std::memory_order_relaxed);
932 if (op < oldest_op) oldest_op = op;
935 auto oldest_op_dbl = std::chrono::duration<double>(std::chrono::system_clock::time_point(std::chrono::system_clock::duration(oldest_op)).time_since_epoch()).count();
936 auto oldest_cycle_dbl = std::chrono::duration<double>(std::chrono::system_clock::time_point(std::chrono::system_clock::duration(oldest_cycle)).time_since_epoch()).count();
937 std::string retval =
"{"
938 "\"oldest_op\":" + std::to_string(oldest_op_dbl) +
","
939 "\"oldest_cycle\":" + std::to_string(oldest_cycle_dbl) +
","
942 for (
size_t verb_idx = 0; verb_idx < static_cast<int>(XrdClHttp::CurlOperation::HttpVerb::Count); verb_idx++) {
944 for (
size_t op_idx = 0; op_idx < 402; op_idx++) {
945 if (op_idx == 401)
continue;
947 auto &op_stats = m_ops[verb_idx][op_idx];
948 auto duration = op_stats.m_duration.load(std::memory_order_relaxed);
949 if (duration == 0)
continue;
951 std::string prefix =
"http_" + verb_str +
"_" + ((op_idx == 402) ?
"invalid" : std::to_string(200 + op_idx)) +
"_";
953 auto duration_dbl = std::chrono::duration<double>(std::chrono::steady_clock::duration(duration)).count();
954 retval +=
"\"" + prefix +
"duration\":" + std::to_string(duration_dbl) +
",";
956 duration = op_stats.m_pause_duration.load(std::memory_order_relaxed);
958 duration_dbl = std::chrono::duration<double>(std::chrono::steady_clock::duration(duration)).count();
959 retval +=
"\"" + prefix +
"pause_duration\":" + std::to_string(duration_dbl) +
",";
962 auto count = op_stats.m_bytes.load(std::memory_order_relaxed);
963 if (count) retval +=
"\"" + prefix +
"bytes\":" + std::to_string(count) +
",";
964 count = op_stats.m_error.load(std::memory_order_relaxed);
965 if (count) retval +=
"\"" + prefix +
"error\":" + std::to_string(count) +
",";
966 count = op_stats.m_finished.load(std::memory_order_relaxed);
967 if (count) retval +=
"\"" + prefix +
"finished\":" + std::to_string(count) +
",";
968 count = op_stats.m_client_timeout.load(std::memory_order_relaxed);
969 if (count) retval +=
"\"" + prefix +
"client_timeout\":" + std::to_string(count) +
",";
970 count = op_stats.m_server_timeout.load(std::memory_order_relaxed);
971 if (count) retval +=
"\"" + prefix +
"server_timeout\":" + std::to_string(count) +
",";
974 auto &op_stats = m_ops[verb_idx][401];
975 auto duration = op_stats.m_duration.load(std::memory_order_relaxed);
976 if (duration == 0)
continue;
978 std::string prefix =
"http_" + verb_str +
"_";
980 auto duration_dbl = std::chrono::duration<double>(std::chrono::steady_clock::duration(duration)).count();
981 retval +=
"\"" + prefix +
"preheader_duration\":" + std::to_string(duration_dbl) +
",";
983 auto count = op_stats.m_started.load(std::memory_order_relaxed);
984 if (count) retval +=
"\"" + prefix +
"started\":" + std::to_string(count) +
",";
985 count = op_stats.m_error.load(std::memory_order_relaxed);
986 if (count) retval +=
"\"" + prefix +
"preheader_error\":" + std::to_string(count) +
",";
987 count = op_stats.m_finished.load(std::memory_order_relaxed);
988 if (count) retval +=
"\"" + prefix +
"preheader_finished\":" + std::to_string(count) +
",";
989 count = op_stats.m_server_timeout.load(std::memory_order_relaxed);
990 if (count) retval +=
"\"" + prefix +
"preheader_timeout\":" + std::to_string(count) +
",";
991 count = op_stats.m_conncall_timeout.load(std::memory_order_relaxed);
992 if (count) retval +=
"\"" + prefix +
"conncall_timeout\":" + std::to_string(count) +
",";
997 "\"conncall_error\":" + std::to_string(m_conncall_errors.load(std::memory_order_relaxed)) +
","
998 "\"conncall_started\":" + std::to_string(m_conncall_req.load(std::memory_order_relaxed)) +
","
999 "\"conncall_success\":" + std::to_string(m_conncall_success.load(std::memory_order_relaxed)) +
","
1000 "\"conncall_timeout\":" + std::to_string(m_conncall_timeout.load(std::memory_order_relaxed)) +
1013 if (sc < 0 || kind == OpKind::Start || sc == 100) {
1015 }
else if (sc < 200 || sc >= 600) {
1020 auto [bytes, pre_headers, post_headers, pause_duration] = op.
StatisticsReset();
1021 auto &op_stats = m_ops[
static_cast<int>(op.
GetVerb())][sc];
1022 op_stats.m_bytes.fetch_add(bytes, std::memory_order_relaxed);
1023 op_stats.m_duration.fetch_add((sc == 401) ? pre_headers.count() : post_headers.count(), std::memory_order_relaxed);
1024 op_stats.m_pause_duration.fetch_add(pause_duration.count(), std::memory_order_relaxed);
1025 if (pre_headers != std::chrono::steady_clock::duration::zero() && sc != 401) {
1026 auto &old_stats = m_ops[
static_cast<int>(op.
GetVerb())][401];
1027 old_stats.m_duration.fetch_add(pre_headers.count(), std::memory_order_relaxed);
1030 case OpKind::ConncallTimeout:
1031 op_stats.m_conncall_timeout.fetch_add(1, std::memory_order_relaxed);
1033 case OpKind::ClientTimeout:
1034 op_stats.m_client_timeout.fetch_add(1, std::memory_order_relaxed);
1037 op_stats.m_error.fetch_add(1, std::memory_order_relaxed);
1039 case OpKind::Finish:
1040 op_stats.m_finished.fetch_add(1, std::memory_order_relaxed);
1043 op_stats.m_started.fetch_add(1, std::memory_order_relaxed);
1045 case OpKind::ServerTimeout:
1046 op_stats.m_server_timeout.fetch_add(1, std::memory_order_relaxed);
1048 case OpKind::Update:
1057 std::unique_lock lock(m_workers_mutex);
1058 m_workers.emplace_back(std::move(
self));
1059 m_self_tid = std::move(tid);
1061 std::unique_lock lock(m_start_lock);
1062 m_start_complete =
true;
1063 m_start_complete_cv.notify_one();
1070 std::unique_lock lock(myself->m_start_lock);
1071 myself->m_start_complete_cv.wait(lock, [&]{
return myself->m_start_complete;});
1078 std::unique_lock lock(m_workers_mutex);
1079 auto iter = std::remove_if(m_workers.begin(), m_workers.end(), [&](std::unique_ptr<XrdClHttp::CurlWorker> &worker){return worker.get() == myself;});
1080 m_workers.erase(iter);
1087 int max_pending = 50;
1090 auto &queue = *m_queue.get();
1093 CURLM *multi_handle = curl_multi_init();
1094 if (multi_handle ==
nullptr) {
1095 throw std::runtime_error(
"Failed to create curl multi-handle");
1098 int running_handles = 0;
1099 time_t last_maintenance = time(NULL);
1100 CURLMcode mres = CURLM_OK;
1104 std::unordered_map<int, WaitingForBroker> broker_reqs;
1105 std::vector<struct curl_waitfd> waitfds;
1107 bool want_shutdown =
false;
1108 while (!want_shutdown) {
1109 m_last_completed_cycle.store(std::chrono::system_clock::now().time_since_epoch().count());
1110 auto oldest_op = std::chrono::system_clock::now();
1111 for (
const auto &entry : m_op_map) {
1112 OpRecord(*entry.second.first, OpKind::Update);
1113 if (entry.second.second < oldest_op) {
1114 oldest_op = entry.second.second;
1117 m_oldest_op.store(oldest_op.time_since_epoch().count());
1121 auto op = m_continue_queue->TryConsume();
1128 m_logger->
Debug(
kLogXrdClHttp,
"Ignoring continuation of operation that has already completed");
1131 m_logger->
Debug(
kLogXrdClHttp,
"Continuing the curl handle from op %p on thread %d", op.get(), getthreadid());
1138 curl_multi_remove_handle(multi_handle, curl);
1139 curl_easy_cleanup(curl);
1140 m_op_map.erase(curl);
1142 running_handles -= 1;
1145 auto iter = m_op_map.find(curl);
1146 if (iter != m_op_map.end()) iter->second.second = std::chrono::system_clock::now();
1150 while (running_handles <
static_cast<int>(m_max_ops)) {
1151 auto op = running_handles == 0 ? queue.Consume(std::chrono::seconds(1)) : queue.TryConsume();
1155 auto curl = queue.GetHandle();
1156 if (curl ==
nullptr) {
1162 auto rv = op->
Setup(curl, *
this);
1183 m_op_map[curl] = {op, std::chrono::system_clock::now()};
1188 std::string modified_url;
1189 std::shared_ptr<CurlOptionsOp> options_op(
1201 curl = queue.GetHandle();
1202 if (curl ==
nullptr) {
1208 auto rv = options_op->Setup(curl, *
this);
1213 m_op_map[curl] = {options_op, std::chrono::system_clock::now()};
1214 OpRecord(*options_op, OpKind::Start);
1215 running_handles += 1;
1217 OpRecord(*op, OpKind::Start);
1220 auto mres = curl_multi_add_handle(multi_handle, curl);
1221 if (mres != CURLM_OK) {
1228 running_handles += 1;
1233 time_t now = time(NULL);
1234 time_t next_maintenance = last_maintenance + m_maintenance_period.load(std::memory_order_relaxed);
1235 if (now >= next_maintenance) {
1237 m_continue_queue->Expire();
1239 getthreadid(), running_handles);
1240 last_maintenance = now;
1243 std::vector<std::pair<int, CURL *>> expired_ops;
1244 for (
const auto &entry : broker_reqs) {
1245 if (entry.second.expiry < now) {
1246 expired_ops.emplace_back(entry.first, entry.second.curl);
1249 for (
const auto &entry : expired_ops) {
1250 auto iter = m_op_map.find(entry.second);
1251 if (iter == m_op_map.end()) {
1252 m_logger->
Warning(
kLogXrdClHttp,
"Found an expired curl handle with no corresponding operation!");
1256 if ((options_op =
dynamic_cast<CurlOptionsOp*
>(iter->second.first.get())) !=
nullptr) {
1258 bool parent_op_failed =
false;
1259 if (parent_op->IsRedirect()) {
1261 if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1263 if (iter != m_op_map.end()) {
1266 m_op_map.erase(iter);
1267 running_handles -= 1;
1269 parent_op_failed =
true;
1271 OpRecord(*parent_op, OpKind::Start);
1274 OpRecord(*parent_op, OpKind::Start);
1276 if (!parent_op_failed){
1282 iter->second.first->ReleaseHandle();
1283 OpRecord(*(iter->second.first), OpKind::ConncallTimeout);
1284 m_op_map.erase(entry.second);
1285 curl_easy_cleanup(entry.second);
1286 running_handles -= 1;
1288 broker_reqs.erase(entry.first);
1289 m_conncall_timeout.fetch_add(1, std::memory_order_relaxed);
1297 waitfds.resize(3 + broker_reqs.size());
1299 waitfds[0].fd = queue.PollFD();
1300 waitfds[0].events = CURL_WAIT_POLLIN;
1301 waitfds[0].revents = 0;
1302 waitfds[1].fd = m_continue_queue->PollFD();
1303 waitfds[1].events = CURL_WAIT_POLLIN;
1304 waitfds[1].revents = 0;
1305 waitfds[2].fd = m_shutdown_pipe_r;
1306 waitfds[2].revents = 0;
1307 waitfds[2].events = CURL_WAIT_POLLIN | CURL_WAIT_POLLPRI;
1310 for (
const auto &entry : broker_reqs) {
1311 waitfds[idx].fd = entry.first;
1312 waitfds[idx].events = CURL_WAIT_POLLIN|CURL_WAIT_POLLPRI;
1313 waitfds[idx].revents = 0;
1318 curl_multi_timeout(multi_handle, &timeo);
1322 if (running_handles && timeo == -1) {
1327 mres = curl_multi_wait(multi_handle, &waitfds[0], waitfds.size(), 50,
nullptr);
1333 mres = curl_multi_wait(multi_handle, &waitfds[0], waitfds.size(), 50,
nullptr);
1335 if (mres != CURLM_OK) {
1340 for (
const auto &entry : waitfds) {
1342 if (waitfds[0].fd == entry.fd || waitfds[1].fd == entry.fd) {
1346 if ((waitfds[2].fd == entry.fd) && entry.revents) {
1347 want_shutdown =
true;
1350 if ((entry.revents & CURL_WAIT_POLLIN) != CURL_WAIT_POLLIN) {
1353 auto handle = broker_reqs[entry.fd].curl;
1354 auto iter = m_op_map.find(handle);
1355 if (iter == m_op_map.end()) {
1356 m_logger->
Warning(
kLogXrdClHttp,
"Internal error: broker responded on FD %d but no corresponding curl operation", entry.fd);
1357 broker_reqs.erase(entry.fd);
1358 m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1362 auto result = iter->second.first->WaitSocketCallback(err);
1367 if ((options_op =
dynamic_cast<CurlOptionsOp*
>(iter->second.first.get())) !=
nullptr) {
1369 bool parent_op_failed =
false;
1370 if (parent_op->IsRedirect()) {
1372 if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1374 if (iter != m_op_map.end()) {
1377 m_op_map.erase(iter);
1378 running_handles -= 1;
1380 parent_op_failed =
true;
1382 OpRecord(*parent_op, OpKind::Start);
1385 OpRecord(*parent_op, OpKind::Start);
1387 if (!parent_op_failed){
1394 m_op_map.erase(handle);
1395 broker_reqs.erase(entry.fd);
1396 m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1397 running_handles -= 1;
1399 broker_reqs.erase(entry.fd);
1400 curl_multi_add_handle(multi_handle, handle);
1401 m_conncall_success.fetch_add(1, std::memory_order_relaxed);
1407 auto mres = curl_multi_perform(multi_handle, &still_running);
1408 if (mres == CURLM_CALL_MULTI_PERFORM) {
1410 }
else if (mres != CURLM_OK) {
1418 msg = curl_multi_info_read(multi_handle, &msgq);
1419 if (msg && (msg->msg == CURLMSG_DONE)) {
1420 if (!msg->easy_handle) {
1422 mres = CURLM_BAD_EASY_HANDLE;
1425 auto iter = m_op_map.find(msg->easy_handle);
1426 if (iter == m_op_map.end()) {
1427 m_logger->
Error(
kLogXrdClHttp,
"Logic error: got a callback for an entry that doesn't exist");
1428 mres = CURLM_BAD_EASY_HANDLE;
1431 auto op = iter->second.first;
1432 auto res = msg->data.result;
1433 bool keep_handle =
false;
1434 bool waiting_on_callout =
false;
1435 if (res == CURLE_OK) {
1437 OpRecord(*op, OpKind::Finish);
1446 if ((options_op =
dynamic_cast<CurlOptionsOp*
>(op.get())) !=
nullptr) {
1448 bool parent_op_failed =
false;
1449 if (parent_op->IsRedirect()) {
1451 if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1454 running_handles -= 1;
1455 parent_op_failed =
true;
1457 OpRecord(*parent_op, OpKind::Start);
1460 OpRecord(*parent_op, OpKind::Start);
1463 if (!parent_op_failed) {
1468 queue.RecycleHandle(iter->first);
1472 if ((options_op =
dynamic_cast<CurlOptionsOp*
>(op.get()))) {
1478 OpRecord(*op, OpKind::Start);
1480 curl_multi_remove_handle(multi_handle, iter->first);
1481 queue.RecycleHandle(iter->first);
1489 case CurlOperation::RedirectAction::Fail:
1498 keep_handle =
false;
1500 case CurlOperation::RedirectAction::Reinvoke:
1506 OpRecord(*op, OpKind::Start);
1509 case CurlOperation::RedirectAction::ReinvokeAfterAllow:
1516 std::string modified_url;
1519 std::shared_ptr<CurlOperation> new_op(options_op);
1520 auto curl = queue.GetHandle();
1521 if (curl ==
nullptr) {
1524 keep_handle =
false;
1525 options_op =
nullptr;
1528 OpRecord(*new_op, OpKind::Start);
1530 auto rv = new_op->Setup(curl, *
this);
1533 keep_handle =
false;
1534 options_op =
nullptr;
1538 m_logger->
Debug(
kLogXrdClHttp,
"Unable to setup the curl handle for the OPTIONS operation");
1539 new_op->Fail(
XrdCl::errInternal, ENOMEM,
"Failed to setup the curl handle for the OPTIONS operation");
1541 keep_handle =
false;
1544 new_op->SetContinueQueue(m_continue_queue);
1545 m_op_map[curl] = {new_op, std::chrono::system_clock::now()};
1546 auto mres = curl_multi_add_handle(multi_handle, curl);
1547 if (mres != CURLM_OK) {
1548 m_logger->
Debug(
kLogXrdClHttp,
"Unable to add OPTIONS operation to the curl multi-handle: %s", curl_multi_strerror(mres));
1553 running_handles += 1;
1554 m_logger->
Debug(
kLogXrdClHttp,
"Invoking the OPTIONS operation before redirect to %s", target.c_str());
1561 if ((waiting_on_callout = callout_socket >= 0)) {
1562 auto expiry = time(
nullptr) + 20;
1563 m_logger->
Debug(
kLogXrdClHttp,
"Creating a callout wait request on socket %d", callout_socket);
1564 broker_reqs[callout_socket] = {iter->first, expiry};
1565 m_conncall_req.fetch_add(1, std::memory_order_relaxed);
1567 }
else if (options_op) {
1572 curl_multi_remove_handle(multi_handle, iter->first);
1573 if (!waiting_on_callout && !options_op) {
1574 curl_multi_add_handle(multi_handle, iter->first);
1576 }
else if (!options_op) {
1580 queue.RecycleHandle(iter->first);
1589 int wait_socket = -1;
1591 m_logger->
Error(
kLogXrdClHttp,
"Failed to start broker-based connection: %s", err.c_str());
1593 keep_handle =
false;
1595 curl_multi_remove_handle(multi_handle, iter->first);
1596 auto expiry = time(
nullptr) + 20;
1597 m_logger->
Debug(
kLogXrdClHttp,
"Curl operation requires a new TCP socket; waiting for callout to respond on socket %d", wait_socket);
1598 broker_reqs[wait_socket] = {iter->first, expiry};
1599 m_conncall_req.fetch_add(1, std::memory_order_relaxed);
1602 if (res == CURLE_ABORTED_BY_CALLBACK || res == CURLE_WRITE_ERROR) {
1606 case CurlOperation::OpError::ErrHeaderTimeout:
1607 #ifdef HAVE_XPROTOCOL_TIMEREXPIRED
1614 case CurlOperation::OpError::ErrCallback: {
1620 case CurlOperation::OpError::ErrOperationTimeout:
1622 OpRecord(*op, op->
IsPaused() ? OpKind::ClientTimeout : OpKind::ServerTimeout);
1624 case CurlOperation::OpError::ErrTransferSlow:
1626 OpRecord(*op, OpKind::ServerTimeout);
1628 case CurlOperation::OpError::ErrTransferClientStall:
1630 OpRecord(*op, OpKind::ClientTimeout);
1632 case CurlOperation::OpError::ErrTransferStall:
1634 OpRecord(*op, OpKind::ServerTimeout);
1636 case CurlOperation::OpError::ErrNone:
1642 if ((options_op =
dynamic_cast<CurlOptionsOp*
>(op.get())) !=
nullptr) {
1644 bool parent_op_failed =
false;
1645 if (parent_op->IsRedirect()) {
1647 if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1649 if (iter != m_op_map.end()) {
1652 m_op_map.erase(iter);
1653 running_handles -= 1;
1655 parent_op_failed =
true;
1657 OpRecord(*parent_op, OpKind::Start);
1660 OpRecord(*parent_op, OpKind::Start);
1662 if (!parent_op_failed){
1669 const char *curl_easy_err = curl_easy_strerror(res);
1670 const std::string fail_err = !curl_err.empty() ? curl_err : curl_easy_err;
1671 m_logger->
Debug(
kLogXrdClHttp,
"Curl generated an error: %s (%d)", fail_err.c_str(), res);
1672 op->
Fail(xrdCode.first, xrdCode.second, fail_err);
1675 if ((options_op =
dynamic_cast<CurlOptionsOp*
>(op.get())) !=
nullptr) {
1677 bool parent_op_failed =
false;
1678 if (parent_op->IsRedirect()) {
1680 if (parent_op->Redirect(target) == CurlOperation::RedirectAction::Fail) {
1682 if (iter != m_op_map.end()) {
1685 m_op_map.erase(iter);
1686 running_handles -= 1;
1688 parent_op_failed =
true;
1691 if (!parent_op_failed){
1699 curl_multi_remove_handle(multi_handle, iter->first);
1700 if (res != CURLE_OK) {
1701 curl_easy_cleanup(iter->first);
1703 for (
auto &req : broker_reqs) {
1704 if (req.second.curl == iter->first) {
1705 m_logger->
Warning(
kLogXrdClHttp,
"Curl handle finished while a broker operation was outstanding");
1706 m_conncall_errors.fetch_add(1, std::memory_order_relaxed);
1709 m_op_map.erase(iter);
1710 running_handles -= 1;
1716 for (
auto map_entry : m_op_map) {
1721 if (multi_handle && map_entry.first) curl_multi_remove_handle(multi_handle, map_entry.first);
1724 m_queue->ReleaseHandles();
1725 curl_multi_cleanup(multi_handle);
1729 CurlWorker::Shutdown()
1731 m_queue->Shutdown();
1732 if (m_shutdown_pipe_w == -1) {
1736 close(m_shutdown_pipe_w);
1737 m_shutdown_pipe_w = -1;
1743 std::unique_lock lk(m_worker_stats_mutex);
1744 m_workers_last_completed_cycle[m_stats_offset] =
nullptr;
1745 m_workers_oldest_op[m_stats_offset] =
nullptr;
1751 CurlWorker::ShutdownAll()
1753 std::unique_lock lock(m_workers_mutex);
1754 for (
auto &worker : m_workers) {
1759 CurlWorker::initcontrol::initcontrol()
1761 curl_global_init(CURL_GLOBAL_DEFAULT);
1764 CurlWorker::initcontrol::~initcontrol()
1767 curl_global_cleanup();
std::pair< uint16_t, uint32_t > CurlCodeConvert(CURLcode res)
std::string obfuscateAuth(const std::string &input)
ssize_t read(int fildes, void *buf, size_t nbyte)
void getline(uchar *buff, int blen)
int emsg(int rc, char *msg)
bool Set(ChecksumType ctype, const std::array< unsigned char, g_max_checksum_length > &value)
virtual void OptionsDone()
int GetStatusCode() const
bool FinishSetup(CURL *curl)
const std::string & GetUrl() const
std::pair< XErrorCode, std::string > GetCallbackError() const
virtual void Fail(uint16_t errCode, uint32_t errNum, const std::string &)
static const std::string GetVerbString(HttpVerb)
bool UseConnectionCallout()
virtual HttpVerb GetVerb() const =0
std::string GetCurlErrorMessage() const
virtual void ReleaseHandle()
virtual bool RequiresOptions() const
static void CleanupDnsCache()
bool GetTriedBoker() const
std::tuple< uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration > StatisticsReset()
virtual bool ContinueHandle()
std::string GetStatusMessage() const
CURL * GetCurlHandle() const
CreateConnCalloutType GetConnCalloutFunc() const
virtual RedirectAction Redirect(std::string &target)
virtual void SetContinueQueue(std::shared_ptr< XrdClHttp::HandlerQueue > queue)
bool StartConnectionCallout(std::string &err)
virtual bool Setup(CURL *curl, CurlWorker &)
CURL * GetParentCurlHandle() const
void ReleaseHandle() override
std::shared_ptr< CurlOperation > GetOperation() const
std::tuple< std::string, std::string > ClientX509CertKeyFile() const
static void RunStatic(CurlWorker *myself)
void Start(std::unique_ptr< XrdClHttp::CurlWorker > self, std::thread tid)
static std::string GetMonitoringJson()
HandlerQueue(unsigned max_pending_ops)
void RecycleHandle(CURL *)
static std::string_view GetUrlKey(const std::string &url, std::string &modified_url)
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
@ DumpMsg
print details of the request and responses
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
std::pair< uint16_t, uint32_t > HTTPStatusConvert(unsigned status)
CURL * GetHandle(bool verbose)
bool HTTPStatusIsError(unsigned status)
std::string_view ltrim_view(const std::string_view &input_view)
std::string_view trim_view(const std::string_view &input_view)
const uint16_t errUnknown
Unknown error.
const uint16_t errInvalidAddr
const uint16_t errRedirectLimit
const uint16_t errErrorResponse
const uint16_t errTlsError
const uint16_t errOperationExpired
const uint16_t errLoginFailed
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t errInvalidArgs
const uint16_t errConnectionError
const uint16_t errNotSupported
const uint16_t errSocketError
const uint16_t errCorruptedHeader
const uint16_t errNone
No error.
const uint64_t kLogXrdClHttp