XRootD
XrdClHttpOps.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3 /* */
4 /* This file is part of the XrdClHttp client plugin for XRootD. */
5 /* */
6 /* XRootD is free software: you can redistribute it and/or modify it under */
7 /* the terms of the GNU Lesser General Public License as published by the */
8 /* Free Software Foundation, either version 3 of the License, or (at your */
9 /* option) any later version. */
10 /* */
11 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14 /* License for more details. */
15 /* */
16 /* The copyright holder's institutional names and contributor's names may not */
17 /* be used to endorse or promote products derived from this software without */
18 /* specific prior written permission of the institution or contributor. */
19 /******************************************************************************/
20 
21 #include "XrdClHttpOps.hh"
22 #include "XrdClHttpResponses.hh"
23 #include "XrdClHttpUtil.hh"
24 #include "XrdClHttpWorker.hh"
25 
26 #include <XrdCl/XrdClDefaultEnv.hh>
27 #include <XrdCl/XrdClLog.hh>
29 
30 #include <arpa/inet.h>
31 #include <unistd.h>
32 #include <chrono>
33 #include <cmath>
34 #ifdef __APPLE__
35 #include <stdlib.h>
36 #else
37 #include <sys/random.h>
38 #endif
39 #include <utility>
40 
41 using namespace XrdClHttp;
42 
43 std::chrono::steady_clock::duration CurlOperation::m_stall_interval{CurlOperation::m_default_stall_interval};
45 
46 namespace {
47 
48 // For connection callbacks, we don't want to require a real DNS lookup; instead, we
49 // will generate a fake address in the 169.254.x.y range and use that for the connection.
50 // This will be fed to libcurl via the CURLOPT_RESOLVE option, which will bypass DNS lookups.
51 
52 // A randomized counter for generating fake addresses in the 169.254.x.y range
53 thread_local int64_t fake_dns_counter = -1;
54 
55 // Map from hostname:port to fake address (e.g., 169.254.x.y:port) and
56 // std::string pointer for the fake address.
57 // We must track the std::string pointer so we can pass it to libcurl
58 // as the CURLOPT_CLOSESOCKETDATA, which is passed to the close socket callback; the
59 // lifetime of the pointer must be at least as long as the lifetime of the socket
60 // (we maintain a reference count manually below).
61 thread_local std::unordered_map<std::string, std::pair<std::string, std::string*>> fake_dns_map;
62 
63 // Reverse map from fake address (e.g., 169.254.x.y:port) to hostname:port and reference pointer
64 thread_local std::unordered_map<std::string, std::pair<std::string, std::string*>> reverse_fake_dns_map;
65 
66 // References to fake addresses in use. The value is a reference count of sockets using
67 // this address; when the count goes to zero, we can remove the entry from the above maps.
68 // The second member is a unique_ptr to the std::string for the fake address, which will be
69 // cleaned up when the refcount goes to zero.
70 struct refcount_entry {
71  int count;
72  std::unique_ptr<std::string> addr;
73  std::chrono::steady_clock::time_point last_used;
74 
75  bool IsExpired(std::chrono::steady_clock::time_point now) const {
76  return (now - last_used) > std::chrono::minutes(1);
77  }
78 };
79 
80 thread_local std::unordered_map<std::string *, std::unique_ptr<refcount_entry>> fake_dns_refcount;
81 
82 std::string GenerateFakeEndpoint() {
83  if (fake_dns_counter == -1) {
84 #ifdef __APPLE__
85  fake_dns_counter = arc4random();
86 #else
87  errno = 0;
88  while (fake_dns_counter < 0 || errno == EINTR) {
89  if (getrandom((void*)&fake_dns_counter, sizeof(fake_dns_counter), 0) == sizeof(fake_dns_counter)) {
90  break;
91  }
92  }
93 #endif
94  }
95  uint64_t addr = static_cast<uint64_t>(fake_dns_counter);
96  uint32_t class_d = addr & 0xff;
97  uint32_t class_c = (addr >> 8) & 0xff;
98  uint32_t port = 1024 + ((addr >> 16) % (65535 - 1024));
99  fake_dns_counter++;
100 
101  return std::string("169.254.") + std::to_string(class_c) + "." + std::to_string(class_d) + ":" + std::to_string(port);
102 }
103 
104 std::string *GetFakeEndpointForHost(const std::string &host, int port) {
105  std::string key = host + ":" + std::to_string(port);
106  auto it = fake_dns_map.find(key);
107  if (it != fake_dns_map.end()) {
108  return it->second.second;
109  }
110  auto addr = GenerateFakeEndpoint();
111  if (reverse_fake_dns_map.find(addr) != reverse_fake_dns_map.end()) {
112  return nullptr; // Collision, out of addresses.
113  }
114  auto addr_ptr_raw = new std::string(addr);
115  std::unique_ptr<std::string> addr_ptr(addr_ptr_raw);
116  fake_dns_map[key] = {addr, addr_ptr.get()};
117  reverse_fake_dns_map[addr] = {key, addr_ptr.get()};
118  std::unique_ptr<refcount_entry> new_entry(new refcount_entry{0, std::move(addr_ptr), std::chrono::steady_clock::now()});
119  fake_dns_refcount[addr_ptr_raw] = std::move(new_entry);
120  return addr_ptr_raw;
121 }
122 
123 std::pair<std::string, int> ParseHostPort(const std::string &location) {
124  auto pos = location.find("://");
125  std::string authority = (pos == std::string::npos) ? location : location.substr(pos + 3);
126  std::string schema = (pos == std::string::npos) ? "" : location.substr(0, pos);
127  int std_port = (schema == "https" || schema == "davs") ? 443 : 80;
128  auto at_pos = authority.find('@');
129  std::string hostport = (at_pos == std::string::npos) ? authority : authority.substr(at_pos + 1);
130  pos = hostport.find('/');
131  if (pos != std::string::npos) {
132  hostport = hostport.substr(0, pos);
133  }
134  pos = hostport.find(':');
135  if (pos == std::string::npos) {
136  return {hostport, std_port};
137  }
138  int port = std_port;
139  try {
140  port = std::stoi(hostport.substr(pos + 1));
141  } catch (...) {
142  port = std_port;
143  }
144  return {hostport.substr(0, pos), port};
145 }
146 
147 std::string DavToHttp(const std::string &url) {
148  if (url.compare(0, 6, "dav://") == 0) {
149  return "http://" + url.substr(6);
150  }
151  if (url.compare(0, 7, "davs://") == 0) {
152  return "https://" + url.substr(7);
153  }
154  return url;
155 }
156 
157 } // namespace
158 
159 std::chrono::steady_clock::time_point CalculateExpiry(struct timespec timeout) {
160  if (timeout.tv_sec == 0 && timeout.tv_nsec == 0) {
161  return std::chrono::steady_clock::now() + std::chrono::seconds(30);
162  }
163  return std::chrono::steady_clock::now() + std::chrono::seconds(timeout.tv_sec) + std::chrono::nanoseconds(timeout.tv_nsec);
164 }
165 
166 CurlOperation::CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url,
167  struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout,
168  HeaderCallout *header_callout) :
169  CurlOperation::CurlOperation(handler, url, CalculateExpiry(timeout), logger, callout, header_callout)
170  {}
171 
172 CurlOperation::CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url,
173  std::chrono::steady_clock::time_point expiry, XrdCl::Log *logger,
174  CreateConnCalloutType callout, HeaderCallout *header_callout) :
175  m_header_expiry(expiry),
176  m_header_callout(header_callout),
177  m_last_reset(std::chrono::steady_clock::now()),
178  m_last_header_reset(m_last_reset),
179  m_start_op(m_last_reset),
180  m_header_start(m_last_reset),
181  m_conn_callout(callout),
182  m_url(DavToHttp(url)),
183  m_handler(handler),
184  m_curl(nullptr, &curl_easy_cleanup),
185  m_logger(logger)
186  {}
187 
188 CurlOperation::~CurlOperation() {}
189 
190 void
191 CurlOperation::Fail(uint16_t errCode, uint32_t errNum, const std::string &msg)
192 {
193  SetDone(true);
194  if (m_handler == nullptr) {return;}
195  if (!msg.empty()) {
196  m_logger->Debug(kLogXrdClHttp, "curl operation failed with message: %s", msg.c_str());
197  } else {
198  m_logger->Debug(kLogXrdClHttp, "curl operation failed with status code %d", errNum);
199  }
200  auto status = new XrdCl::XRootDStatus(XrdCl::stError, errCode, errNum, msg);
201  auto handle = m_handler;
202  m_handler = nullptr;
203  handle->HandleResponse(status, nullptr);
204 }
205 
206 int
207 CurlOperation::FailCallback(XErrorCode ecode, const std::string &emsg) {
208  m_callback_error_code = ecode;
209  m_callback_error_str = emsg;
210  m_error = OpError::ErrCallback;
211  m_logger->Debug(kLogXrdClHttp, "%s", emsg.c_str());
212  return 0;
213 }
214 
215 bool
216 CurlOperation::FinishSetup(CURL *curl)
217 {
218  if (!m_header_callout) {
219  m_header_slist.reset();
220  for (const auto &header : m_headers_list) {
221  m_header_slist.reset(curl_slist_append(m_header_slist.release(),
222  (header.first + ": " + header.second).c_str()));
223  }
224  return curl_easy_setopt(curl, CURLOPT_HTTPHEADER, m_header_slist.get()) == CURLE_OK;
225  }
226  const auto &verb = GetVerbString(GetVerb());
227 
228  auto extra_headers = m_header_callout->GetHeaders(verb, m_url, m_headers_list);
229  if (!extra_headers) {
230  m_logger->Error(kLogXrdClHttp, "Failed to get headers from header callout for %s", m_url.c_str());
231  return false;
232  }
233  m_header_slist.reset();
234  for (const auto &header : *extra_headers) {
235  if (!strcasecmp(header.first.c_str(), "Content-Length")) {
236  auto upload_size = std::stoull(header.second);
237  curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, upload_size);
238  continue;
239  }
240  m_header_slist.reset(curl_slist_append(m_header_slist.release(),
241  (header.first + ": " + header.second).c_str()));
242  }
243  return curl_easy_setopt(curl, CURLOPT_HTTPHEADER, m_header_slist.get()) == CURLE_OK;
244 }
245 
246 const std::string
247 CurlOperation::GetVerbString(CurlOperation::HttpVerb verb)
248 {
249  switch (verb) {
250  case HttpVerb::COPY:
251  return "COPY";
252  case HttpVerb::DELETE:
253  return "DELETE";
254  case HttpVerb::GET:
255  return "GET";
256  case HttpVerb::HEAD:
257  return "HEAD";
258  case HttpVerb::MKCOL:
259  return "MKCOL";
260  case HttpVerb::OPTIONS:
261  return "OPTIONS";
262  case HttpVerb::PROPFIND:
263  return "PROPFIND";
264  case HttpVerb::PUT:
265  return "PUT";
266  case HttpVerb::Count:
267  return "UNKNOWN";
268  }
269  return "UNKNOWN";
270 }
271 
272 size_t
273 CurlOperation::HeaderCallback(char *buffer, size_t size, size_t nitems, void *this_ptr)
274 {
275  std::string header(buffer, size * nitems);
276  auto me = static_cast<CurlOperation*>(this_ptr);
277  auto now = std::chrono::steady_clock::now();
278  if (!me->m_received_header) {
279  me->m_received_header = true;
280  me->m_header_start = now;
281  }
282  me->m_header_lastop = now;
283  auto rv = me->Header(header);
284  return rv ? (size * nitems) : 0;
285 }
286 
287 bool
288 CurlOperation::Header(const std::string &header)
289 {
290  auto result = m_headers.Parse(header);
291  // m_logger->Debug(kLogXrdClHttp, "Got header: %s", header.c_str());
292  if (!result) {
293  m_logger->Debug(kLogXrdClHttp, "Failed to parse response header: %s", header.c_str());
294  }
295  if (m_headers.HeadersDone()) {
296  if (!m_response_info) {
297  m_response_info.reset(new ResponseInfo());
298  }
299  m_response_info->AddResponse(m_headers.MoveHeaders());
300  }
301  return result;
302 }
303 
304 CurlOperation::RedirectAction
305 CurlOperation::Redirect(std::string &target)
306 {
307  m_callout.reset();
308  m_conn_callout_result = -1;
309  m_conn_callout_listener = -1;
310  m_tried_broker = false;
311 
312  auto location = m_headers.GetLocation();
313  if (location.empty()) {
314  m_logger->Warning(kLogXrdClHttp, "After request to %s, server returned a redirect with no new location", m_url.c_str());
315  Fail(XrdCl::errErrorResponse, kXR_ServerError, "Server returned redirect without updated location");
316  return RedirectAction::Fail;
317  }
318  if (location.size() && location[0] == '/') { // hostname not included in the location - redirect to self.
319  std::string_view orig_url(m_url);
320  auto scheme_loc = orig_url.find("://");
321  if (scheme_loc == std::string_view::npos) {
322  Fail(XrdCl::errErrorResponse, kXR_ServerError, "Server returned a location with unknown hostname");
323  return RedirectAction::Fail;
324  }
325  auto path_loc = orig_url.find('/', scheme_loc + 3);
326  if (path_loc == std::string_view::npos) {
327  location = m_url + location;
328  } else {
329  location = std::string(orig_url.substr(0, path_loc)) + location;
330  }
331  }
332  m_logger->Debug(kLogXrdClHttp, "Request for %s redirected to %s", m_url.c_str(), location.c_str());
333  target = location;
334  curl_easy_setopt(m_curl.get(), CURLOPT_URL, location.c_str());
335  int disable_x509;
336  auto env = XrdCl::DefaultEnv::GetEnv();
337  if (env->GetInt("HttpDisableX509", disable_x509) && !disable_x509) {
338  std::string cert, key;
339  env->GetString("HttpClientCertFile", cert);
340  env->GetString("HttpClientKeyFile", key);
341  if (!cert.empty())
342  curl_easy_setopt(m_curl.get(), CURLOPT_SSLCERT, cert.c_str());
343  if (!key.empty())
344  curl_easy_setopt(m_curl.get(), CURLOPT_SSLKEY, key.c_str());
345  }
346  m_headers = HeaderParser();
347 
348  if (m_conn_callout) {
349  auto conn_callout = m_conn_callout(location, *m_response_info);
350  if (conn_callout != nullptr) {
351 
352  auto [host, port] = ParseHostPort(location);
353  if (host.empty() || port == -1) {
354  Fail(XrdCl::errInternal, 0, "Failed to parse host and port from URL " + location);
355  return RedirectAction::Fail;
356  }
357  auto fake_addr = GetFakeEndpointForHost(host, port);
358  if (!fake_addr || fake_addr->empty()) {
359  Fail(XrdCl::errInternal, 0, "Failed to generate a fake address for host " + host);
360  return RedirectAction::Fail;
361  }
362  m_resolve_slist.reset(curl_slist_append(m_resolve_slist.release(),
363  (host + ":" + std::to_string(port) + ":" + *fake_addr).c_str()));
364  m_logger->Debug(kLogXrdClHttp, "For connection callout in redirect, mapping %s:%d -> %s", host.c_str(), port, fake_addr->c_str());
365 
366  m_callout.reset(conn_callout);
367  std::string err;
368  SetTriedBoker();
369  if ((m_conn_callout_listener = m_callout->BeginCallout(err, m_header_expiry)) == -1) {
370  auto errMsg = "Failed to start a connection callout request: " + err;
371  Fail(XrdCl::errInternal, 0, errMsg.c_str());
372  return RedirectAction::Fail;
373  }
374  curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, CurlOperation::OpenSocketCallback);
375  curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETFUNCTION, CurlOperation::CloseSocketCallback);
376  curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETDATA, this);
377  curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETDATA, fake_addr);
378  curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTFUNCTION, CurlOperation::SockOptCallback);
379  curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTDATA, this);
380  curl_easy_setopt(m_curl.get(), CURLOPT_CONNECT_TO, m_resolve_slist.get());
381  }
382  }
383  m_received_header = false;
384 
385  m_last_header_reset = m_last_reset = m_header_start = m_start_op = m_header_lastop = std::chrono::steady_clock::now();
386  return RedirectAction::Reinvoke;
387 }
388 
389 namespace {
390 
391 size_t
392 NullCallback(char * /*buffer*/, size_t size, size_t nitems, void * /*this_ptr*/)
393 {
394  return size * nitems;
395 }
396 
397 }
398 
399 void
400 CurlOperation::SetPaused(bool paused) {
401  m_is_paused = paused;
402  if (m_is_paused) {
403  m_pause_start = std::chrono::steady_clock::now();
404  } else if (m_pause_start != std::chrono::steady_clock::time_point{}) {
405  m_pause_duration += std::chrono::steady_clock::now() - m_pause_start;
406  m_pause_start = std::chrono::steady_clock::time_point{};
407  }
408 }
409 
410 bool
411 CurlOperation::StartConnectionCallout(std::string &err)
412 {
413  if ((m_conn_callout_listener = m_callout->BeginCallout(err, m_header_expiry)) == -1) {
414  err = "Failed to start a callout for a socket connection: " + err;
415  Fail(XrdCl::errInternal, 1, err.c_str());
416  return false;
417  }
418  return true;
419 }
420 
421 std::tuple<uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration>
422 CurlOperation::StatisticsReset() {
423  auto now = std::chrono::steady_clock::now();
424  std::chrono::steady_clock::duration pre_header{}, post_header{}, pause_duration{};
425  if (m_received_header) {
426  if (m_last_header_reset < m_header_start) {
427  pre_header = m_header_start - m_last_header_reset;
428  m_last_header_reset = m_header_start;
429  }
430  post_header = now - ((m_last_reset < m_header_start) ? m_header_start : m_last_reset);
431  m_last_reset = now;
432  } else {
433  pre_header = now - m_last_header_reset;
434  m_last_header_reset = now;
435  }
436  if (IsPaused()) {
437  m_pause_duration += now - m_pause_start;
438  m_pause_start = now;
439  }
440  if (m_pause_duration != std::chrono::steady_clock::duration::zero()) {
441  pause_duration = m_pause_duration;
442  m_pause_duration = std::chrono::steady_clock::duration::zero();
443  }
444  auto bytes = m_bytes;
445  m_bytes = 0;
446  return {bytes, pre_header, post_header, pause_duration};
447 }
448 
449 bool
450 CurlOperation::HeaderTimeoutExpired(const std::chrono::steady_clock::time_point &now) {
451  if (m_received_header) return false;
452 
453  if (now > m_header_expiry) {
454  if (m_error == OpError::ErrNone) m_error = OpError::ErrHeaderTimeout;
455  return true;
456  }
457  return false;
458 }
459 
460 bool
461 CurlOperation::OperationTimeoutExpired(const std::chrono::steady_clock::time_point &now) {
462  if (m_operation_expiry == std::chrono::steady_clock::time_point{} ||
463  !m_received_header) {
464  return false;
465  }
466 
467  if (now > m_operation_expiry) {
468  if (m_error == OpError::ErrNone) m_error = OpError::ErrOperationTimeout;
469  return true;
470  }
471  return false;
472 }
473 
474 bool
475 CurlOperation::TransferStalled(uint64_t xfer, const std::chrono::steady_clock::time_point &now)
476 {
477  // First, check to see how long it's been since any data was sent.
478  if (m_last_xfer == std::chrono::steady_clock::time_point()) {
479  m_last_xfer = m_header_lastop;
480  }
481  auto elapsed = now - m_last_xfer;
482  uint64_t xfer_diff = 0;
483  if (xfer > m_last_xfer_count) {
484  xfer_diff = xfer - m_last_xfer_count;
485  m_last_xfer_count = xfer;
486  m_last_xfer = now;
487  }
488 
489  // If progress is made in this callback do not classify as stalled
490  if (elapsed > m_stall_interval && xfer_diff == 0) {
491  if (m_error == OpError::ErrNone) m_error = IsPaused() ? OpError::ErrTransferClientStall : OpError::ErrTransferStall;
492  return true;
493  }
494 
495  // Curl updated us with new timing but the byte count hasn't changed; no need to update the EMA.
496  if (xfer_diff == 0) {
497  return false;
498  }
499 
500  // If the transfer is not stalled, then we check to see if the exponentially-weighted
501  // moving average of the transfer rate is below the minimum.
502 
503  // If the stall interval since the last header hasn't passed, then we don't check for slow transfers.
504  auto elapsed_since_last_headerop = now - m_header_lastop;
505  if (elapsed_since_last_headerop < m_stall_interval) {
506  return false;
507  } else if (m_ema_rate < 0) {
508  m_ema_rate = xfer / std::chrono::duration<double>(elapsed_since_last_headerop).count();
509  }
510  // Calculate the exponential moving average of the transfer rate.
511  double elapsed_seconds = std::chrono::duration<double>(elapsed).count();
512  auto recent_rate = static_cast<double>(xfer_diff) / elapsed_seconds;
513  auto alpha = 1.0 - exp(-elapsed_seconds / std::chrono::duration<double>(m_stall_interval).count());
514  m_ema_rate = (1.0 - alpha) * m_ema_rate + alpha * recent_rate;
515  if (m_ema_rate < static_cast<double>(m_minimum_rate)) {
516  if (m_error == OpError::ErrNone) m_error = OpError::ErrTransferSlow;
517  return true;
518  }
519  return false;
520 }
521 
522 bool
523 CurlOperation::Setup(CURL *curl, CurlWorker &worker)
524 {
525  if (curl == nullptr) {
526  throw std::runtime_error("Unable to setup curl operation with no handle");
527  }
528  struct timespec now;
529  if (clock_gettime(CLOCK_MONOTONIC, &now) == -1) {
530  throw std::runtime_error("Unable to get current time");
531  }
532 
533  m_pause_start = {};
534  m_last_header_reset = m_last_reset = m_start_op = m_header_start = m_header_lastop = std::chrono::steady_clock::now();
535 
536  m_curl.reset(curl);
537  m_curl_error_buffer[0] = '\0';
538  curl_easy_setopt(m_curl.get(), CURLOPT_URL, m_url.c_str());
539  curl_easy_setopt(m_curl.get(), CURLOPT_ERRORBUFFER, m_curl_error_buffer);
540  curl_easy_setopt(m_curl.get(), CURLOPT_HEADERFUNCTION, CurlStatOp::HeaderCallback);
541  curl_easy_setopt(m_curl.get(), CURLOPT_HEADERDATA, this);
542  curl_easy_setopt(m_curl.get(), CURLOPT_WRITEFUNCTION, NullCallback);
543  curl_easy_setopt(m_curl.get(), CURLOPT_WRITEDATA, nullptr);
544  curl_easy_setopt(m_curl.get(), CURLOPT_XFERINFOFUNCTION, CurlOperation::XferInfoCallback);
545  curl_easy_setopt(m_curl.get(), CURLOPT_XFERINFODATA, this);
546  curl_easy_setopt(m_curl.get(), CURLOPT_NOPROGRESS, 0L);
547  // Note: libcurl is not threadsafe unless this option is set.
548  // Before we set it, we saw deadlocks (and partial deadlocks) in practice.
549  curl_easy_setopt(m_curl.get(), CURLOPT_NOSIGNAL, 1L);
550 
551  m_parsed_url.reset(new XrdCl::URL(m_url));
552  auto env = XrdCl::DefaultEnv::GetEnv();
553  int disable_x509;
554  if ((env->GetInt("HttpDisableX509", disable_x509) && !disable_x509)) {
555  auto [cert, key] = worker.ClientX509CertKeyFile();
556  if (!cert.empty()) {
557  m_logger->Debug(kLogXrdClHttp, "Using client X.509 credential found at %s", cert.c_str());
558  curl_easy_setopt(m_curl.get(), CURLOPT_SSLCERT, cert.c_str());
559  if (key.empty()) {
560  m_logger->Error(kLogXrdClHttp, "X.509 client credential specified but not the client key");
561  } else {
562  curl_easy_setopt(m_curl.get(), CURLOPT_SSLKEY, key.c_str());
563  }
564  }
565  }
566 
567  if (m_conn_callout) {
568  ResponseInfo info;
569  auto callout = m_conn_callout(m_url, info);
570  if (callout) {
571  m_callout.reset(callout);
572  m_conn_callout_listener = -1;
573  m_conn_callout_result = -1;
574  m_tried_broker = false;
575 
576  auto [host, port] = ParseHostPort(m_url);
577  if (host.empty() || port == -1) {
578  throw std::runtime_error ("Failed to parse host and port from URL " + m_url);
579  }
580  auto fake_addr = GetFakeEndpointForHost(host, port);
581  if (!fake_addr || fake_addr->empty()) {
582  throw std::runtime_error("Failed to generate a fake address for host " + host);
583  }
584  m_resolve_slist.reset(curl_slist_append(m_resolve_slist.release(),
585  (host + ":" + std::to_string(port) + ":" + *fake_addr).c_str()));
586  m_logger->Debug(kLogXrdClHttp, "For connection callout in operation setup, mapping %s:%d -> %s", host.c_str(), port, fake_addr->c_str());
587 
588  curl_easy_setopt(m_curl.get(), CURLOPT_CONNECT_TO, m_resolve_slist.get());
589 
590  curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, CurlOperation::OpenSocketCallback);
591  curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETFUNCTION, CurlOperation::CloseSocketCallback);
592  curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETDATA, this);
593  curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETDATA, fake_addr);
594  curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTFUNCTION, CurlOperation::SockOptCallback);
595  curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTDATA, this);
596  }
597  }
598 
599  return true;
600 }
601 
602 void
603 CurlOperation::ReleaseHandle()
604 {
605  m_conn_callout_listener = -1;
606  m_conn_callout_result = -1;
607  m_tried_broker = false;
608  m_callout.reset();
609 
610  if (m_curl == nullptr) return;
611  curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, nullptr);
612  curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETFUNCTION, nullptr);
613  curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETDATA, nullptr);
614  curl_easy_setopt(m_curl.get(), CURLOPT_CLOSESOCKETDATA, nullptr);
615  curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTFUNCTION, nullptr);
616  curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTDATA, nullptr);
617  curl_easy_setopt(m_curl.get(), CURLOPT_SSLCERT, nullptr);
618  curl_easy_setopt(m_curl.get(), CURLOPT_SSLKEY, nullptr);
619  curl_easy_setopt(m_curl.get(), CURLOPT_HTTPHEADER, nullptr);
620  curl_easy_setopt(m_curl.get(), CURLOPT_CONNECT_TO, nullptr);
621  m_header_slist.reset();
622  m_curl.release();
623 }
624 
625 curl_socket_t
626 CurlOperation::OpenSocketCallback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address)
627 {
628  auto me = reinterpret_cast<CurlOperation*>(clientp);
629  auto fd = me->m_conn_callout_result;
630  me->m_conn_callout_result = -1;
631  if (fd == -1) {
632  std::string err;
633  if ((me->m_conn_callout_listener = me->m_callout->BeginCallout(err, me->m_header_expiry)) == -1) {
634  me->m_logger->Debug(kLogXrdClHttp, "Failed to start a connection callout request: %s", err.c_str());
635  }
636  return CURL_SOCKET_BAD;
637  } else {
638  sockaddr_in *inaddr = reinterpret_cast<sockaddr_in*>(&address->addr);
639  char ip_str[INET_ADDRSTRLEN];
640  char full_address_str[INET_ADDRSTRLEN + 6];
641  inet_ntop(AF_INET, &(inaddr->sin_addr), ip_str, INET_ADDRSTRLEN);
642  int port = ntohs(inaddr->sin_port);
643  snprintf(full_address_str, sizeof(full_address_str), "%s:%d", ip_str, port);
644  me->m_logger->Debug(kLogXrdClHttp, "Recording socket %d for %s", fd, full_address_str);
645  auto reverse_iter = reverse_fake_dns_map.find(full_address_str);
646  if (reverse_iter == reverse_fake_dns_map.end()) {
647  me->m_logger->Error(kLogXrdClHttp, "Failed to find fake DNS reverse entry for %s", full_address_str);
648  close(fd);
649  return CURL_SOCKET_BAD;
650  } else {
651  auto iter = fake_dns_refcount.find(reverse_iter->second.second);
652  if (iter == fake_dns_refcount.end()) {
653  me->m_logger->Error(kLogXrdClHttp, "Failed to find fake DNS refcount entry for %s", full_address_str);
654  close(fd);
655  return CURL_SOCKET_BAD;
656  }
657  iter->second->count++;
658  iter->second->last_used = std::chrono::steady_clock::now();
659  }
660 
661  return fd;
662  }
663 }
664 
665 int
666 CurlOperation::SockOptCallback(void *clientp, curl_socket_t curlfd, curlsocktype purpose)
667 {
668  return CURL_SOCKOPT_ALREADY_CONNECTED;
669 }
670 
671 curl_socket_t
672 CurlOperation::CloseSocketCallback(void *clientp, curl_socket_t fd)
673 {
674  close(fd);
675  auto me = reinterpret_cast<std::string*>(clientp);
676  if (me == nullptr) {return 0;}
677  auto iter = fake_dns_refcount.find(me);
678  if (iter != fake_dns_refcount.end()) {
679  iter->second->count--;
680  if (iter->second->count <= 0 && iter->second->IsExpired(std::chrono::steady_clock::now())) {
681  auto rev_iter = reverse_fake_dns_map.find(*me);
682  if (rev_iter != reverse_fake_dns_map.end()) {
683  fake_dns_map.erase(rev_iter->second.first);
684  reverse_fake_dns_map.erase(rev_iter);
685  }
686  fake_dns_refcount.erase(iter);
687  }
688  }
689 
690  return 0;
691 }
692 
693 void
694 CurlOperation::CleanupDnsCache()
695 {
696  auto now = std::chrono::steady_clock::now();
697  for (auto it = fake_dns_refcount.begin(); it != fake_dns_refcount.end(); ) {
698  if (it->second->count <= 0 && it->second->IsExpired(now)) {
699  auto rev_iter = reverse_fake_dns_map.find(*it->first);
700  if (rev_iter != reverse_fake_dns_map.end()) {
701  fake_dns_map.erase(rev_iter->second.first);
702  reverse_fake_dns_map.erase(rev_iter);
703  }
704  it = fake_dns_refcount.erase(it);
705  } else {
706  ++it;
707  }
708  }
709 }
710 
711 int
712 CurlOperation::XferInfoCallback(void *clientp, curl_off_t /*dltotal*/, curl_off_t dlnow, curl_off_t /*ultotal*/, curl_off_t ulnow)
713 {
714  auto me = reinterpret_cast<CurlOperation*>(clientp);
715  auto now = std::chrono::steady_clock::now();
716  if (me->HeaderTimeoutExpired(now) || me->OperationTimeoutExpired(now)) {
717  return 1; // return value triggers CURLE_ABORTED_BY_CALLBACK
718  }
719  uint64_t xfer_bytes = dlnow > ulnow ? dlnow : ulnow;
720  if (me->TransferStalled(xfer_bytes, now)) {
721  return 1;
722  }
723  return 0;
724 }
725 
726 int
727 CurlOperation::WaitSocketCallback(std::string &err)
728 {
729  m_conn_callout_result = m_callout ? m_callout->FinishCallout(err) : -1;
730  if (m_callout && m_conn_callout_result == -1) {
731  m_logger->Error(kLogXrdClHttp, "Error when getting socket callout: %s", err.c_str());
732  } else if (m_callout) {
733  m_logger->Debug(kLogXrdClHttp, "Got callback socket %d", m_conn_callout_result);
734  }
735  return m_conn_callout_result;
736 }
XErrorCode
Definition: XProtocol.hh:1031
@ kXR_ServerError
Definition: XProtocol.hh:1044
static unsigned long long int stoull(const std::string &s)
simple integer parsing, to be replaced by std::stoll when C++11 can be used
std::chrono::steady_clock::time_point CalculateExpiry(struct timespec timeout)
void CURL
#define close(a)
Definition: XrdPosix.hh:48
int emsg(int rc, char *msg)
CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout)
static int m_minimum_transfer_rate
static constexpr int m_default_minimum_rate
std::tuple< std::string, std::string > ClientX509CertKeyFile() const
static Env * GetEnv()
Get default client environment.
Handle diagnostics.
Definition: XrdClLog.hh:101
Handle an async response.
URL representation.
Definition: XrdClURL.hh:31
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType