XRootD
XrdClHttpOps.hh
Go to the documentation of this file.
1 /******************************************************************************/
2 /* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3 /* */
4 /* This file is part of the XrdClHttp client plugin for XRootD. */
5 /* */
6 /* XRootD is free software: you can redistribute it and/or modify it under */
7 /* the terms of the GNU Lesser General Public License as published by the */
8 /* Free Software Foundation, either version 3 of the License, or (at your */
9 /* option) any later version. */
10 /* */
11 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14 /* License for more details. */
15 /* */
16 /* The copyright holder's institutional names and contributor's names may not */
17 /* be used to endorse or promote products derived from this software without */
18 /* specific prior written permission of the institution or contributor. */
19 /******************************************************************************/
20 
21 #ifndef XRDCLHTTP_CURLOPS_HH
22 #define XRDCLHTTP_CURLOPS_HH
23 
26 #include "XrdClHttpResponseInfo.hh"
27 #include "XrdClHttpUtil.hh"
28 
29 #include <XrdCl/XrdClBuffer.hh>
31 
32 #include <atomic>
33 #include <memory>
34 #include <string>
35 #include <utility>
36 #include <vector>
37 
38 #include <curl/curl.h>
39 
40 namespace XrdCl {
41 
42 class Log;
43 class ResponseHandler;
44 class URL;
45 
46 }
47 
48 class TiXmlElement;
49 
50 namespace XrdClHttp {
51 
52 class CurlWorker;
53 class File;
54 class ResponseInfo;
55 
57 public:
58  using HeaderList = std::vector<std::pair<std::string, std::string>>;
59 
60  enum class HttpVerb {
61  COPY,
62  DELETE,
63  HEAD,
64  GET,
65  MKCOL,
66  OPTIONS,
67  PROPFIND,
68  PUT,
69  Count
70  };
71 
72  // Operation constructor when the timeout is given as an offset from now.
73  CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
74  XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout);
75 
76 
77  // Operation constructor when the timeout is given as an absolute time.
78  CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, std::chrono::steady_clock::time_point expiry,
79  XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout);
80 
81  virtual ~CurlOperation();
82 
83  CurlOperation(const CurlOperation &) = delete;
84 
85  // Finish the setup of the curl handle
86  //
87  // Used for configuring any extra headers
88  bool FinishSetup(CURL *curl);
89 
90  virtual bool Setup(CURL *curl, CurlWorker &);
91 
92  virtual void Fail(uint16_t errCode, uint32_t errNum, const std::string &);
93 
94  virtual void ReleaseHandle();
95 
96  virtual void Success() = 0;
97 
98  // Returns the connection callout function for this operation
99  CreateConnCalloutType GetConnCalloutFunc() const {return m_conn_callout;}
100 
101  // Return the HTTP verb to use with this operation.
102  virtual HttpVerb GetVerb() const = 0;
103 
104  // Return a string version of the HTTP operation
105  static const std::string GetVerbString(HttpVerb);
106 
107  // Returns when the curl header timeout expires.
108  //
109  // The first byte of the header must be received before this time.
110  std::chrono::steady_clock::time_point GetHeaderExpiry() const {return m_header_expiry;}
111 
112  // Returns when the curl operation expires
113  std::chrono::steady_clock::time_point GetOperationExpiry() {
114  if (m_last_xfer == std::chrono::steady_clock::time_point()) {
115  return GetHeaderExpiry();
116  }
117  return m_last_xfer + m_stall_interval;
118  }
119 
120  // Clean up the thread-local DNS cache for fake lookups associated with the
121  // connection callback cache.
122  static void CleanupDnsCache();
123 
124  // Invoked when the worker thread is ready to resume a request after a pause.
125  //
126  // Pauses occur when a PUT request has started but is waiting on more data
127  // from the client; when additional data has arrived, the operation will
128  // be continued and this function called by the worker thread.
129  virtual bool ContinueHandle() {return true;}
130 
131  // Set the continue queue to use for when a paused handle is ready to
132  // be re-run.
133  virtual void SetContinueQueue(std::shared_ptr<XrdClHttp::HandlerQueue> queue) {}
134 
135  enum class RedirectAction {
136  Fail, // The redirect parsing failed and Fail() was called
137  Reinvoke, // Reinvoke the curl handle, following redirect
138  ReinvokeAfterAllow, // Reinvoke the Redirect function once the allowed verbs are known.
139  };
140  // Handle a redirect to a different URL.
141  // Returns Reinvoke if the curl handle should be invoked again immediately.
142  // Returns ReinvokeAfterAllow if the redirect should be invoked after the allowed verbs are known.
143  // In this case, the operation will set the target to the redirect target.
144  // Implementations must call Fail() if the handler should not re-invoke the curl handle.
145  virtual RedirectAction Redirect(std::string &target);
146 
147  // Indicate whether the result of the operation is a redirect.
148  //
149  // This relies on the response headers having been parsed and available; anything in
150  // the 30X range is considered a redirect.
151  bool IsRedirect() const {return m_headers.GetStatusCode() >= 300 && m_headers.GetStatusCode() < 400;}
152 
153  // If returns non-negative, the result is a FD that should be waited on after a broker connection request.
154  virtual int WaitSocket() {return m_conn_callout_listener;}
155  // Callback when the `WaitSocket` is active for read.
156  virtual int WaitSocketCallback(std::string &err);
157 
158  // Connection broker-related functionality.
159  // When the broker URL is set, the operation will use the connection broker to get a TCP socket
160  // to the remote server. Note that we will try the operation initially without in case the curl
161  // handle has an existing socket it can reuse. If reuse fails, then the operation is going to fail
162  // with CURLE_COULDNT_CONNECT and we will retry (once) to connect via the broker. This is all
163  // done outside curl's open socket callback to ensure the event loop stays non-blocking.
164 
165  bool StartConnectionCallout(std::string &err); // Start the connection callout process for a URL.
166  bool UseConnectionCallout() {return m_callout.get();} // Returns true if the callout should be tried.
167  bool GetTriedBoker() const {return m_tried_broker;} // Returns true if the connection broker has been tried.
168  void SetTriedBoker() {m_tried_broker = true;} // Note that the connection broker has been attempted.
169 
170  // Returns whethe the OPTIONS call needs to be made before the operation is started.
171  bool virtual RequiresOptions() const {return false;}
172 
173  // Invoked after the OPTIONS request is done and results are available
174  void virtual OptionsDone() {}
175 
176  // Returns the URL that was used for the operation.
177  const std::string &GetUrl() const {return m_url;}
178 
179  // Returns the response info for the operation
180  std::unique_ptr<ResponseInfo> GetResponseInfo();
181 
182  // Returns true if the header timeout has expired.
183  //
184  // The "header timeout" fires if the remote service has not returned any
185  // headers or data within the specified time.
186  // If the header timeout has expired - and no error has already been set -
187  // the m_error will be set
188  bool HeaderTimeoutExpired(const std::chrono::steady_clock::time_point &now);
189 
190  // Returns true if the operation timeout has expired.
191  //
192  // Some operations (HEAD, PROPFIND for open) return nearly no data and thus have
193  // no need for adaptive timeouts. Instead, we use a fixed timeout.
194  // If the header timeout has expired - and no error has already been set -
195  // the m_error will be set
196  bool OperationTimeoutExpired(const std::chrono::steady_clock::time_point &now);
197 
198  // Returns true if the body timeout has expired.
199  //
200  // The "body timeout" fires if the remote service has not returned any
201  // data within the specified time.
202  // If the body timeout has expired - and no error has already been set -
203  // the m_error will be set
204  bool TransferStalled(uint64_t xfer_bytes, const std::chrono::steady_clock::time_point &now);
205 
206  enum OpError {
207  ErrNone, // No error
208  ErrHeaderTimeout, // Header was not sent back in time
209  ErrCallback, // Error in the read/write callback (e.g., response too large for propfind)
210  ErrOperationTimeout, // Entire curl request operation has timed out
211  ErrTransferClientStall, // Transfer stalled while client had paused it (no data was available)
212  ErrTransferStall, // Transfer has stalled, not receiving any data within 60 seconds
213  ErrTransferSlow, // Average transfer rate is below the minimum
214  };
215 
216  // Return the libcurl handle owned by this operation.
217  CURL *GetCurlHandle() const {return m_curl.get();}
218 
219  // Return the error generated by the operation itself (separate from a curl error)
220  OpError GetError() const {return m_error;}
221 
222  // Move response info to the caller.
223  std::unique_ptr<ResponseInfo> MoveResponseInfo() {return std::move(m_response_info);}
224 
225  // Return the error generated by the callback (e.g., server has incorrect multipart framing)
226  std::pair<XErrorCode, std::string> GetCallbackError() const {return std::make_pair(m_callback_error_code, m_callback_error_str);}
227 
228  // Returns the HTTP status code (-1 if the response has not been parsed)
229  int GetStatusCode() const {return m_headers.GetStatusCode();}
230 
231  // Returns the HTTP status message (empty if the response has not been parsed)
232  std::string GetStatusMessage() const {return m_headers.GetStatusMessage();}
233 
234  // Return true if the transfer is done
235  bool IsDone() const {return m_done;}
236 
237  // Return true if the operation is paused in libcurl
238  bool IsPaused() const {return m_is_paused;}
239 
240  // Returns true if the operation has been marked as failed.
241  bool HasFailed() const {return m_has_failed.load(std::memory_order_acquire);}
242 
243  // Resets the statistics for the operation and returns a tuple of:
244  // - bytes transferred,
245  // - duration between operation start and header receipt.
246  // - duration between header receipt and now.
247  // - duration the operation has spent on pause in libcurl (waiting for client data)
248  // These numbers are reset to zero each time the `StatisticsReset` function is called.
249  std::tuple<uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration> StatisticsReset();
250 
251 
252  std::string GetCurlErrorMessage() const {
253  if (m_curl_error_buffer[0] != '\0')
254  return m_curl_error_buffer;
255  return "";
256  }
257 
258  // Sets the stall timeout for the operation in seconds.
259  static void SetStallTimeout(int stall_interval)
260  {
261  std::chrono::seconds seconds{stall_interval};
262  m_stall_interval = std::chrono::duration_cast<std::chrono::steady_clock::duration>(seconds);
263  }
264 
265  // Sets the stall timeout for the operation
266  static void SetStallTimeout(const std::chrono::steady_clock::duration &stall_interval)
267  {
268  m_stall_interval = stall_interval;
269  }
270 
271  // Gets the code's default stall timeout in seconds
273  {
274  return std::chrono::duration_cast<std::chrono::seconds>(m_default_stall_interval).count();
275  }
276 
277  // Gets the code's default slow transfer rate
279  {
280  return m_default_minimum_rate;
281  }
282 
283  // Sets the slow transfer rate for transfer operations.
284  static void SetSlowRateBytesSec(int rate)
285  {
287  }
288 
289 protected:
290 
291  // Update the count of bytes transferred
292  void UpdateBytes(uint64_t bytes) {m_bytes += bytes;}
293 
294  // Set failure from a callback function.
295  // The Fail() function may invoke libcurl functions and hence cannot be invoked from a
296  // libcurl callback. This stores the failure in the object itself and the worker
297  // thread will invoke the `Fail()` after libcurl fails the handle.
298  int FailCallback(XErrorCode ecode, const std::string &emsg);
299 
300  // Set the pause status
301  void SetPaused(bool paused);
302 
303  // The default minimum transfer rate for the operation, in bytes / sec
304  static constexpr int m_default_minimum_rate{1024 * 256}; // 256 KB/sec
305 
306  // The current global instance's minimum transfer rate for "transfer type"
307  // operations (GET, PUT). Defaults to the m_default_minimum_rate but can be
308  // overridden by configuration.
310 
311  // The minimum transfer rate for this operation, in bytes / sec
313 
314  // The expiration of the entire operation.
315  std::chrono::steady_clock::time_point m_operation_expiry;
316 
317  // The expiration time for receiving the first header.
318  std::chrono::steady_clock::time_point m_header_expiry;
319 
320  // Any additional headers to send with the request.
322 
323 private:
324  bool Header(const std::string &header);
325  static size_t HeaderCallback(char *buffer, size_t size, size_t nitems, void *data);
326 
327  // Information about the responses received for this operation.
328  std::unique_ptr<ResponseInfo> m_response_info;
329 
330  // The "stall time" for the body transfer.
331  // If the body transfer has not been updated in this time, the operation
332  // will be marked as expired.
333  //
334  // This is also used for the calculation of the interval of the EMA rate
335  static constexpr std::chrono::steady_clock::duration m_default_stall_interval{std::chrono::seconds(60)};
336  static std::chrono::steady_clock::duration m_stall_interval;
337 
338  OpError m_error{ErrNone};
339  XErrorCode m_callback_error_code{kXR_noErrorYet}; // Stored error that occurred in a callback.
340  std::string m_callback_error_str; // Stored error message that occurred in a callback.
341  bool m_tried_broker{false};
342  bool m_received_header{false};
343  bool m_done{false};
344  std::atomic<bool> m_has_failed{false};
345  bool m_is_paused{false};
346  int m_conn_callout_result{-1}; // The result of the connection callout
347  int m_conn_callout_listener{-1}; // The listener socket for the connection callout
348  uint64_t m_bytes{0}; // Count of bytes transferred by operation since last StatisticsReset()
349  std::chrono::steady_clock::time_point m_last_reset{}; // Time of last StatisticsReset()
350  std::chrono::steady_clock::time_point m_last_header_reset{}; // Time of last StatisticsReset() for header statistics
351  std::chrono::steady_clock::time_point m_start_op{}; // Time when the entire operation was started.
352  std::chrono::steady_clock::time_point m_header_start{}; // Time when the first header was received.
353  std::chrono::steady_clock::time_point m_pause_start{}; // Time of the last pause start/reset
354  std::chrono::steady_clock::duration m_pause_duration{}; // Accumulated pause time since last statistics update.
355 
356  // List of custom headers for the operation.
357  std::unique_ptr<struct curl_slist, void(*)(struct curl_slist *)> m_header_slist{nullptr, &curl_slist_free_all};
358 
359  // The callout class for connection creation.
360  CreateConnCalloutType m_conn_callout{nullptr};
361 
362  // The last time header data was received.
363  std::chrono::steady_clock::time_point m_header_lastop;
364 
365  // The last time data was transferred.
366  std::chrono::steady_clock::time_point m_last_xfer;
367 
368  // The last recorded number of bytes that had been transferred.
369  uint64_t m_last_xfer_count{0};
370 
371  // The exponential moving average of the transfer rate
372  double m_ema_rate{-1.0};
373 
374  // Detailed error message populated by libcurl via CURLOPT_ERRORBUFFER.
375  char m_curl_error_buffer[CURL_ERROR_SIZE]{};
376 
377  // Object representing the state of the callout for a connected socket.
378  std::unique_ptr<ConnectionCallout> m_callout;
379  std::unique_ptr<XrdCl::URL> m_parsed_url{nullptr};
380 
381  // A map of endpoints to IP addresses for the CURLOPT_CONNECT_TO option.
382  std::unique_ptr<struct curl_slist, void(*)(struct curl_slist *)> m_resolve_slist{nullptr, &curl_slist_free_all};
383 
384  static curl_socket_t OpenSocketCallback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address);
385  static int SockOptCallback(void *clientp, curl_socket_t curlfd, curlsocktype purpose);
386  static curl_socket_t CloseSocketCallback(void *clientp, curl_socket_t item);
387 
388  // Periodic transfer info callback function invoked by curl; used for more fine-grained timeouts.
389  static int XferInfoCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow);
390 
391 protected:
392  void SetDone(bool has_failed) {m_done = true; m_has_failed.store(has_failed, std::memory_order_release);}
393  const std::string m_url;
395  std::unique_ptr<CURL, void(*)(CURL *)> m_curl;
397  std::vector<std::pair<std::string, std::string>> m_headers_list;
399 };
400 
401 // Query the remote service using the OPTIONS verb.
402 //
403 // This is used to determine the capabilities of the remote service,
404 // such as whether it supports the PROPFIND verb.
405 // Note this does not take an XrdCl::ResponseHandler callback but is meant to be
406 // invoked directly by a libcurl worker which, based on the response, will use
407 // it to invoke the original operation.
408 class CurlOptionsOp final : public CurlOperation {
409 public:
410  CurlOptionsOp(CURL *curl, std::shared_ptr<CurlOperation> op, const std::string &url,
411  XrdCl::Log *log, CreateConnCalloutType callout) :
412  CurlOperation(nullptr, url, op->GetHeaderExpiry(), log, callout, {}),
413  m_parent(op),
414  m_parent_curl(curl)
415  {
417  }
418 
419  virtual ~CurlOptionsOp() {}
420 
421  bool Setup(CURL *curl, CurlWorker &) override;
422  void Success() override;
423  void Fail(uint16_t errCode, uint32_t errNum, const std::string &) override;
424  void ReleaseHandle() override;
425 
426  // Returns the parent operation that has been paused while waiting for the
427  // OPTIONS response.
428  std::shared_ptr<CurlOperation> GetOperation() const {return m_parent;}
429 
430  // Returns the parent operation's curl handle that has been paused while
431  // waiting for the OPTIONS response.
432  CURL *GetParentCurlHandle() const {return m_parent_curl;}
433 
434  virtual HttpVerb GetVerb() const override {return HttpVerb::OPTIONS;}
435 
436 private:
437  std::shared_ptr<CurlOperation> m_parent;
438  CURL *m_parent_curl{nullptr};
439 };
440 
441 // An operation representing a `stat` operation.
442 //
443 // Queries the remote service and parses out the response to a `stat` buffer.
444 // Depending on the remote service, this may be a HEAD or PROPFIND request.
445 class CurlStatOp : public CurlOperation {
446 public:
447  CurlStatOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
448  XrdCl::Log *log, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout) :
449  CurlOperation(handler, url, timeout, log, callout, header_callout),
450  m_response_info(response_info)
451  {
453  }
454 
455  virtual ~CurlStatOp() {}
456 
457  bool Setup(CURL *curl, CurlWorker &) override;
458  void Success() override;
459  RedirectAction Redirect(std::string &target) override;
460  void ReleaseHandle() override;
461 
462  bool virtual RequiresOptions() const override;
463  void virtual OptionsDone() override;
464 
465  std::pair<int64_t, bool> GetStatInfo();
466 
467  virtual HttpVerb GetVerb() const override {return m_is_propfind ? HttpVerb::PROPFIND : HttpVerb::HEAD;}
468 
469 protected:
470  // Mark the operation as a success and, as requested, return the stat info back
471  // to the object handler.
472  //
473  // Returning the info is optional as the CurlOpenOp derives from this clasa and
474  // if stat info is returned from an open without being requested then the
475  // object is leaked
476  void SuccessImpl(bool returnObj);
477 
478 private:
479  // Parse the properties element of a PROPFIND response.
480  std::pair<int64_t, bool> ParseProp(TiXmlElement *prop);
481  // Callback for writing the response body to the internal buffer.
482  static size_t WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr);
483 
484  // Whether the response info variant of the info object should be sent
485  bool m_response_info{false};
486  // Whether the stat request is made using the PROPFIND verb.
487  bool m_is_propfind{false};
488  // Whether the stat response indicated that the object is a directory.
489  bool m_is_dir{false};
490  std::string m_response; // Body of the response (if using PROPFIND)
491  int64_t m_length{-1}; // Length of the object from the response
492 };
493 
494 class CurlOpenOp final : public CurlStatOp {
495 public:
496  CurlOpenOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
497  XrdCl::Log *logger, XrdClHttp::File *file, bool response_info, CreateConnCalloutType callout,
498  HeaderCallout *header_callout);
499 
500  virtual ~CurlOpenOp() {}
501 
502  void ReleaseHandle() override;
503  void Success() override;
504 
505  // Invoked to handle a failure-to-open (HEAD returns non-200)
506  //
507  // If the open operation is invoked for a file with the `New` flag set, this
508  // may be a success if the remote server returned a 404.
509  void Fail(uint16_t errCode, uint32_t errNum, const std::string &) override;
510 
511 private:
512  // Set various common properties after an open has completed.
513  //
514  // If `setSize` is set, then we'll set the file size as a file property.
515  // This is made optional because the open operation may succeed after a 404
516  // (if this was invoked by an open with O_CREAT set); in such a case, setting
517  // the size is nonsensical because the file doesn't exist.
518  void SetOpenProperties(bool setSize);
519 
520  XrdClHttp::File *m_file{nullptr};
521 };
522 
523 // Query the origin for a checksum via a HEAD request.
524 //
525 // Since the open op is a PROPFIND, we need a second operation for checksums.
526 // We expect the checksum only is done after a successful transfer.
527 class CurlChecksumOp final : public CurlStatOp {
528  public:
529  CurlChecksumOp(XrdCl::ResponseHandler *handler, const std::string &url, XrdClHttp::ChecksumType preferred,
530  struct timespec timeout, XrdCl::Log *logger,
531  bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout);
532 
533  virtual ~CurlChecksumOp() {}
534 
535  virtual HttpVerb GetVerb() const override {return HttpVerb::HEAD;}
536  virtual void OptionsDone() override;
537  bool Setup(CURL *curl, CurlWorker &) override;
538  void Success() override;
539  RedirectAction Redirect(std::string &target) override;
540  void ReleaseHandle() override;
541 
542  private:
543  XrdClHttp::ChecksumType m_preferred_cksum{XrdClHttp::ChecksumType::kCRC32C};
544  XrdClHttp::File *m_file{nullptr};
545  };
546 
547 // Operation issuing a DELETE request to the remote server.
548 //
549 class CurlDeleteOp final : public CurlOperation {
550 public:
551  CurlDeleteOp(XrdCl::ResponseHandler *handler, const std::string &url,
552  struct timespec timeout, XrdCl::Log *logger,
553  bool response_info, CreateConnCalloutType callout,
554  HeaderCallout *header_callout);
555 
556  virtual ~CurlDeleteOp();
557 
558  bool Setup(CURL *curl, CurlWorker &) override;
559  void Success() override;
560  void ReleaseHandle() override;
561 
562  virtual HttpVerb GetVerb() const override {return HttpVerb::DELETE;}
563 
564 private:
565  bool m_response_info{false}; // Indicate whether to give extended information in the response.
566 };
567 
568 // Operation issuing a MKCOL request to the remote server.
569 //
570 // Creates a "directory" on the remote side
571 //
572 class CurlMkcolOp final : public CurlOperation {
573 public:
574 CurlMkcolOp(XrdCl::ResponseHandler *handler, const std::string &url,
575  struct timespec timeout, XrdCl::Log *logger,
576  bool response_info, CreateConnCalloutType callout,
577  HeaderCallout *header_callout);
578 
579  virtual ~CurlMkcolOp();
580 
581  void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override;
582  void ReleaseHandle() override;
583  bool Setup(CURL *curl, CurlWorker &) override;
584  void Success() override;
585 
586  virtual HttpVerb GetVerb() const override {return HttpVerb::MKCOL;}
587 
588 private:
589  bool m_response_info{false}; // Indicate whether to give extended information in the response.
590 };
591 
592 // Cache control query
593 //
594 class CurlQueryOp final : public CurlStatOp {
595 public:
596  CurlQueryOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
597  XrdCl::Log *log, bool response_info, CreateConnCalloutType callout, int queryCode, HeaderCallout *header_callout) :
598  CurlStatOp(handler, url, timeout, log, response_info, callout, header_callout),
599  m_queryCode(queryCode)
600  {
601  }
602 
603  virtual ~CurlQueryOp() {}
604 
605  void Success() override;
606 
608  std::string m_queryVal;
609 };
610 
611 class CurlReadOp : public CurlOperation {
612 public:
613  CurlReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
614  const std::string &url, struct timespec timeout, const std::pair<uint64_t, uint64_t> &op,
615  char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout);
616 
617  virtual ~CurlReadOp() {}
618 
619  // Start continuation of a previously-started operation with additional data.
620  bool Continue(std::shared_ptr<CurlOperation> op, XrdCl::ResponseHandler *handler, char *buffer, size_t buffer_size);
621 
622  // Make state changes necessary to the curl handle for it to unpause.
623  bool ContinueHandle() override;
624 
625  // Pause the GET operation; indicates the current buffer was sent successfully
626  // but the operation is not yet complete. Will invoke the current callback.
627  virtual void Pause();
628 
629  bool Setup(CURL *curl, CurlWorker &) override;
630  void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override;
631  void Success() override;
632  void ReleaseHandle() override;
633 
634  virtual void SetContinueQueue(std::shared_ptr<XrdClHttp::HandlerQueue> queue) override {
635  m_continue_queue = queue;
636  }
637 
638  virtual HttpVerb GetVerb() const override {return HttpVerb::GET;}
639 
640 
641 private:
642  // Deliver the current buffer to the response handler and reset internal buffer state.
643  void DeliverResponse();
644 
645  static size_t WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr);
646  size_t Write(char *buffer, size_t size);
647 
648  // Extra response data from curl that overflowed the last buffer
649  //
650  // libcurl's callback is "all or nothing": you cannot accept part of a buffer
651  // then pause the operation until the user provides a new buffer. Hence, we keep
652  // this as the "overflow" buffer; next time Continue() is called, we will process
653  // this data first.
654  std::string m_prefetch_buffer;
655 
656  // Offset into m_prefetch_buffer pointing at the first byte of unconsumed data.
657  size_t m_prefetch_buffer_offset{0};
658 
659  // Offset into the object, for the current Continue() call, relative to m_op.first
660  off_t m_prefetch_object_offset{0};
661 
662  // Default callback handler; used when the HTTP operation times out while there
663  // is no ongoing CurlFile read operation.
664  std::shared_ptr<XrdCl::ResponseHandler> m_default_handler;
665 
666 protected:
667  std::pair<uint64_t, uint64_t> m_op;
668  uint64_t m_written{0}; // Bytes written into the current client-provided buffer
669  char *m_buffer{nullptr}; // Buffer passed by XrdCl; we do not own it.
670  size_t m_buffer_size{0}; // Size of the provided buffer
671 
672  // When the read fails, the body of the response will be copied
673  // here instead of invoking the callback.
674  std::string m_err_msg;
675 
676  // Reference to the continue queue to use when the operation should be resumed.
677  std::shared_ptr<XrdClHttp::HandlerQueue> m_continue_queue;
678 };
679 
680 // Open operation that is actually an entire-object GET
682 public:
683  CurlPrefetchOpenOp(XrdClHttp::File &file, XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
684  const std::string &url, struct timespec timeout, const std::pair<uint64_t, uint64_t> &op,
685  char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
686  : CurlReadOp(handler, default_handler, url, timeout, op, buffer, sz, logger, callout, header_callout), m_file(file)
687  {}
688 
689  // Special handling of the first "Pause" operation after the read
690  // has started. Do the correct invocation of success or failure.
691  virtual void Pause() override;
692 
693 private:
694  bool m_first_pause{true};
695  XrdClHttp::File &m_file;
696 };
697 
699  public:
700 
701  CurlVectorReadOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
702  const XrdCl::ChunkList &op_list, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout);
703 
704  virtual ~CurlVectorReadOp() {}
705 
706  bool Setup(CURL *curl, CurlWorker &) override;
707  void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override;
708  void Success() override;
709  void ReleaseHandle() override;
710 
711  // Set the expected separator between parts of a response;
712  // not expected to be used externally except by unit tests.
713  void SetSeparator(const std::string &sep) {
715  }
716 
717  // Set the status code for the operation
719 
720  // Invoke the write callback for the vector read.
721  //
722  // Note: made public to help unit testing of the class; not intended for direct invocation.
723  size_t Write(char *buffer, size_t size);
724 
725  virtual HttpVerb GetVerb() const override {return HttpVerb::GET;}
726 
727  private:
728  static size_t WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr);
729 
730  // Calculate the next request buffer the current response buffer will service.
731  // Sets the m_response_idx and m_skip_bytes
732  void CalculateNextBuffer();
733 
734  protected:
735  size_t m_response_idx{0}; // The offset in the m_chunk_list which the current response chunk will write into.
736  off_t m_chunk_buffer_idx{0}; // Current offset in requested chunk where we are writing bytes.
737  off_t m_bytes_consumed{0}; // Total number of bytes used for results serving the request.
738  uint64_t m_skip_bytes{0}; // Count of bytes to skip in the next response (if response chunk contains unneeded bytes).
739  std::string m_response_headers; // Buffer of an incomplete response line from a prior curl write operation.
740  std::pair<off_t, off_t> m_current_op{-1, -1}; // The (offset, length) of the current response chunk.
741  std::unique_ptr<XrdCl::VectorReadInfo> m_vr; // The response buffers for the client.
742  XrdCl::ChunkList m_chunk_list; // The requested chunks from the client.
743 };
744 
745 class CurlPgReadOp final : public CurlReadOp {
746 public:
747  CurlPgReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
748  const std::string &url, struct timespec timeout, const std::pair<uint64_t, uint64_t> &op,
749  char *buffer, size_t buffer_size, XrdCl::Log *logger, CreateConnCalloutType callout,
750  HeaderCallout *header_callout)
751  :
752  CurlReadOp(handler, default_handler, url, timeout, op, buffer, buffer_size, logger, callout, header_callout)
753  {}
754 
755  virtual ~CurlPgReadOp() {}
756 
757  void Success() override;
758 
759  virtual HttpVerb GetVerb() const override {return HttpVerb::GET;}
760 
761 };
762 
763 class CurlListdirOp final : public CurlOperation {
764 public:
765  CurlListdirOp(XrdCl::ResponseHandler *handler, const std::string &url, const std::string &host_addr, bool response_info,
766  struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout);
767 
768  virtual ~CurlListdirOp() {}
769 
770  bool Setup(CURL *curl, CurlWorker &) override;
771  void Success() override;
772  void ReleaseHandle() override;
773 
774  virtual HttpVerb GetVerb() const override {return HttpVerb::PROPFIND;}
775 
776 private:
777  struct DavEntry {
778  std::string m_name;
779  bool m_isdir{false};
780  bool m_isexec{false};
781  int64_t m_size{-1};
782  time_t m_lastmodified{-1};
783  };
784  // Parses the properties element of a PROPFIND response into a DavEntry object
785  //
786  // - prop: The properties element to parse
787  // - Returns: A pair containing the DavEntry object and a boolean indicating success or not
788  bool ParseProp(DavEntry &entry, TiXmlElement *prop);
789 
790  // Indicate whether the operation should use the extended "response info" object in response
791  const bool m_response_info{false};
792 
793  // Parses the response element of a PROPFIND
794  std::pair<DavEntry, bool> ParseResponse(TiXmlElement *response);
795 
796  // Callback for writing the response body to the internal buffer.
797  static size_t WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr);
798 
799  // Whether the provided URL is an origin URL (and hence PROPFIND can be done directly).
800  bool m_is_origin{false};
801 
802  // Response body from the PROPFIND request.
803  std::string m_response;
804 
805  // Host address (hostname:port) of the data federation
806  std::string m_host_addr;
807 };
808 
809 // A third-party-copy operation
810 //
811 // Invoke the COPY verb to move a file between two HTTP endpoints.
812 class CurlCopyOp final : public CurlOperation {
813 public:
814  using Headers = std::vector<std::pair<std::string, std::string>>;
815 
816  CurlCopyOp(XrdCl::ResponseHandler *handler, const std::string &source_url, const Headers &source_hdrs, const std::string &dest_url, const Headers &dest_hdrs, struct timespec timeout,
817  XrdCl::Log *logger, CreateConnCalloutType callout);
818 
819  virtual ~CurlCopyOp() {}
820 
821  bool Setup(CURL *curl, CurlWorker &) override;
822  void Success() override;
823  void ReleaseHandle() override;
824 
826  public:
828  virtual void Progress(off_t bytemark) = 0;
829  };
830 
831  void SetCallback(std::unique_ptr<CurlProgressCallback> callback);
832 
833  virtual HttpVerb GetVerb() const override {return HttpVerb::COPY;}
834 
835 private:
836  // Callback for writing the response body to the internal buffer.
837  static size_t WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr);
838 
839  // Handle a line of information in the control channel.
840  void HandleLine(std::string_view line);
841 
842  // Returns true if the control channel has not gotten data recently enough.
843  bool ControlChannelTimeoutExpired() const;
844 
845  // Source of the TPC transfer
846  std::string m_source_url;
847 
848  // Buffer of current response line
849  std::string m_line_buffer;
850 
851  // A callback object for when a performance marker is received
852  std::unique_ptr<CurlProgressCallback> m_callback;
853 
854  // The performance marker indication of bytes processed.
855  off_t m_bytemark{-1};
856 
857  // Whether the COPY operation indicated a success status in the control channel:
858  bool m_sent_success{false};
859 
860  // Failure string sent back in the control channel:
861  std::string m_failure;
862 };
863 
864 // An upload operation
865 //
866 // Invoke a PUT on the remote HTTP server; assumes that Writes are done
867 // in a single-stream
868 class CurlPutOp final : public CurlOperation {
869 public:
870  CurlPutOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
871  const std::string &url, const char *buffer, size_t buffer_size,
872  struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout,
873  HeaderCallout *header_callout);
874  CurlPutOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
875  const std::string &url, XrdCl::Buffer &&buffer,
876  struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout,
877  HeaderCallout *header_callout);
878 
879  virtual ~CurlPutOp() {}
880 
881  void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override;
882  bool Setup(CURL *curl, CurlWorker &) override;
883  void Success() override;
884  void ReleaseHandle() override;
885  bool ContinueHandle() override;
886 
887  virtual void SetContinueQueue(std::shared_ptr<XrdClHttp::HandlerQueue> queue) override {
888  m_continue_queue = queue;
889  }
890 
891  // Start continuation of a previously-started operation with additional data.
892  //
893  // Since the CurlPutOp itself is kept as a reference-counted pointer by the
894  // XrdClHttp::File handle, we need to pass a shared pointer to the continue queue.
895  // Hence the awkward interface of needing to be provided a shared pointer to oneself.
896  bool Continue(std::shared_ptr<CurlOperation> op, XrdCl::ResponseHandler *handler, const char *buffer, size_t buffer_size);
897  bool Continue(std::shared_ptr<CurlOperation> op, XrdCl::ResponseHandler *handler, XrdCl::Buffer &&buffer);
898 
899  // Pause the put operation; indicates the current buffer was sent successfully
900  // but the operation is not yet complete.
901  void Pause();
902 
903  virtual HttpVerb GetVerb() const override {return HttpVerb::PUT;}
904 
905 private:
906 
907  // Callback function for libcurl when it would like to read data from m_data
908  // (and write it to the remote socket).
909  static size_t ReadCallback(char *buffer, size_t size, size_t n, void *v);
910 
911  // Handle that represents the current operation to libcurl
912  CURL *m_curl_handle{nullptr};
913 
914  // Reference to the continue queue to use when the operation should be resumed.
915  std::shared_ptr<XrdClHttp::HandlerQueue> m_continue_queue;
916 
917  // The buffer of data to upload (if the CurlPutOp owns the buffer).
918  XrdCl::Buffer m_owned_buffer;
919 
920  // The non-owned view of the data to upload.
921  // This may reference m_owned_buffer or an externally-owned `const char *`.
922  std::string_view m_data;
923 
924  // The default handler to invoke if an File::Write operation is not pending.
925  // Typically used for timeouts/errors on the PUT operation between client
926  // writes.
927  std::shared_ptr<XrdCl::ResponseHandler> m_default_handler;
928 
929  // File pointer offset
930  off_t m_offset{0};
931 
932  // The final size of the object to be uploaded; -1 if not known
933  off_t m_object_size{-1};
934 
935  bool m_final{false};
936 };
937 
938 } // namespace XrdClHttp
939 
940 #endif // XRDCLHTTP_CURLOPS_HH
XErrorCode
Definition: XProtocol.hh:1031
@ kXR_noErrorYet
Definition: XProtocol.hh:1069
void CURL
XrdOucString File
int emsg(int rc, char *msg)
virtual void OptionsDone() override
RedirectAction Redirect(std::string &target) override
CurlChecksumOp(XrdCl::ResponseHandler *handler, const std::string &url, XrdClHttp::ChecksumType preferred, struct timespec timeout, XrdCl::Log *logger, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual HttpVerb GetVerb() const override
bool Setup(CURL *curl, CurlWorker &) override
virtual void Progress(off_t bytemark)=0
void Success() override
virtual HttpVerb GetVerb() const override
void ReleaseHandle() override
CurlCopyOp(XrdCl::ResponseHandler *handler, const std::string &source_url, const Headers &source_hdrs, const std::string &dest_url, const Headers &dest_hdrs, struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout)
std::vector< std::pair< std::string, std::string > > Headers
bool Setup(CURL *curl, CurlWorker &) override
void SetCallback(std::unique_ptr< CurlProgressCallback > callback)
bool Setup(CURL *curl, CurlWorker &) override
CurlDeleteOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *logger, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout)
void ReleaseHandle() override
virtual HttpVerb GetVerb() const override
bool Setup(CURL *curl, CurlWorker &) override
CurlListdirOp(XrdCl::ResponseHandler *handler, const std::string &url, const std::string &host_addr, bool response_info, struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual HttpVerb GetVerb() const override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
bool Setup(CURL *curl, CurlWorker &) override
virtual HttpVerb GetVerb() const override
void ReleaseHandle() override
CurlMkcolOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *logger, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout)
void ReleaseHandle() override
void Success() override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &) override
CurlOpenOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *logger, XrdClHttp::File *file, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual void Success()=0
virtual void OptionsDone()
void SetDone(bool has_failed)
static void SetStallTimeout(const std::chrono::steady_clock::duration &stall_interval)
int FailCallback(XErrorCode ecode, const std::string &emsg)
CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout)
static int m_minimum_transfer_rate
std::chrono::steady_clock::time_point GetHeaderExpiry() const
bool FinishSetup(CURL *curl)
std::vector< std::pair< std::string, std::string > > HeaderList
Definition: XrdClHttpOps.hh:58
const std::string m_url
std::chrono::steady_clock::time_point m_header_expiry
const std::string & GetUrl() const
std::unique_ptr< CURL, void(*)(CURL *)> m_curl
std::pair< XErrorCode, std::string > GetCallbackError() const
bool TransferStalled(uint64_t xfer_bytes, const std::chrono::steady_clock::time_point &now)
virtual void Fail(uint16_t errCode, uint32_t errNum, const std::string &)
static const std::string GetVerbString(HttpVerb)
virtual HttpVerb GetVerb() const =0
std::string GetCurlErrorMessage() const
virtual void ReleaseHandle()
void UpdateBytes(uint64_t bytes)
virtual bool RequiresOptions() const
static void CleanupDnsCache()
std::chrono::steady_clock::time_point GetOperationExpiry()
std::tuple< uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration > StatisticsReset()
virtual bool ContinueHandle()
std::string GetStatusMessage() const
CURL * GetCurlHandle() const
static constexpr int m_default_minimum_rate
static void SetSlowRateBytesSec(int rate)
static void SetStallTimeout(int stall_interval)
CreateConnCalloutType GetConnCalloutFunc() const
Definition: XrdClHttpOps.hh:99
OpError GetError() const
std::vector< std::pair< std::string, std::string > > m_headers_list
HeaderCallout * m_header_callout
static int GetDefaultSlowRateBytesSec()
bool HeaderTimeoutExpired(const std::chrono::steady_clock::time_point &now)
virtual int WaitSocketCallback(std::string &err)
std::chrono::steady_clock::time_point m_operation_expiry
CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, std::chrono::steady_clock::time_point expiry, XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout)
virtual RedirectAction Redirect(std::string &target)
XrdCl::ResponseHandler * m_handler
std::unique_ptr< ResponseInfo > MoveResponseInfo()
static int GetDefaultStallTimeout()
void SetPaused(bool paused)
virtual void SetContinueQueue(std::shared_ptr< XrdClHttp::HandlerQueue > queue)
bool StartConnectionCallout(std::string &err)
std::unique_ptr< ResponseInfo > GetResponseInfo()
bool OperationTimeoutExpired(const std::chrono::steady_clock::time_point &now)
CurlOperation(const CurlOperation &)=delete
virtual bool Setup(CURL *curl, CurlWorker &)
CURL * GetParentCurlHandle() const
CurlOptionsOp(CURL *curl, std::shared_ptr< CurlOperation > op, const std::string &url, XrdCl::Log *log, CreateConnCalloutType callout)
virtual HttpVerb GetVerb() const override
std::shared_ptr< CurlOperation > GetOperation() const
bool Setup(CURL *curl, CurlWorker &) override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &) override
virtual HttpVerb GetVerb() const override
CurlPgReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, struct timespec timeout, const std::pair< uint64_t, uint64_t > &op, char *buffer, size_t buffer_size, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
CurlPrefetchOpenOp(XrdClHttp::File &file, XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, struct timespec timeout, const std::pair< uint64_t, uint64_t > &op, char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual void Pause() override
bool ContinueHandle() override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
bool Setup(CURL *curl, CurlWorker &) override
bool Continue(std::shared_ptr< CurlOperation > op, XrdCl::ResponseHandler *handler, const char *buffer, size_t buffer_size)
virtual HttpVerb GetVerb() const override
virtual void SetContinueQueue(std::shared_ptr< XrdClHttp::HandlerQueue > queue) override
CurlPutOp(XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, const char *buffer, size_t buffer_size, struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
void Success() override
void ReleaseHandle() override
CurlQueryOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, bool response_info, CreateConnCalloutType callout, int queryCode, HeaderCallout *header_callout)
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
std::pair< uint64_t, uint64_t > m_op
void Success() override
bool Setup(CURL *curl, CurlWorker &) override
std::shared_ptr< XrdClHttp::HandlerQueue > m_continue_queue
bool ContinueHandle() override
CurlReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, struct timespec timeout, const std::pair< uint64_t, uint64_t > &op, char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual void SetContinueQueue(std::shared_ptr< XrdClHttp::HandlerQueue > queue) override
virtual HttpVerb GetVerb() const override
bool Continue(std::shared_ptr< CurlOperation > op, XrdCl::ResponseHandler *handler, char *buffer, size_t buffer_size)
void ReleaseHandle() override
void ReleaseHandle() override
void SuccessImpl(bool returnObj)
virtual bool RequiresOptions() const override
bool Setup(CURL *curl, CurlWorker &) override
RedirectAction Redirect(std::string &target) override
std::pair< int64_t, bool > GetStatInfo()
CurlStatOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual HttpVerb GetVerb() const override
virtual void OptionsDone() override
void Success() override
size_t Write(char *buffer, size_t size)
CurlVectorReadOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, const XrdCl::ChunkList &op_list, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
void SetSeparator(const std::string &sep)
XrdCl::ChunkList m_chunk_list
virtual HttpVerb GetVerb() const override
std::pair< off_t, off_t > m_current_op
std::unique_ptr< XrdCl::VectorReadInfo > m_vr
bool Setup(CURL *curl, CurlWorker &) override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
void SetMultipartSeparator(const std::string_view &sep)
std::string GetStatusMessage() const
void SetStatusCode(int sc)
Binary blob representation.
Definition: XrdClBuffer.hh:34
Handle diagnostics.
Definition: XrdClLog.hh:101
Handle an async response.
std::vector< ChunkInfo > ChunkList
List of chunks.
XrdSysError Log
Definition: XrdConfig.cc:113
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType