XRootD
XrdClHttpFile.hh
Go to the documentation of this file.
1 /***************************************************************
2  *
3  * Copyright (C) 2025, Morgridge Institute for Research
4  *
5  ***************************************************************/
6 
7 #ifndef XRDCLHTTP_CURLFILE_HH
8 #define XRDCLHTTP_CURLFILE_HH
9 
12 
13 #include <XrdCl/XrdClFile.hh>
15 
16 #include <atomic>
17 #include <condition_variable>
18 #include <deque>
19 #include <memory>
20 #include <mutex>
21 #include <shared_mutex>
22 #include <string>
23 #include <unordered_map>
24 #include <utility>
25 #include <variant>
26 #include <vector>
27 
28 namespace XrdCl {
29 
30 class Log;
31 
32 }
33 
34 namespace XrdClHttp {
35 
36 class CurlPutOp;
37 class CurlReadOp;
38 class HandlerQueue;
39 
40 }
41 namespace XrdClHttp {
42 
43 class File final : public XrdCl::FilePlugIn {
44 public:
45  File(std::shared_ptr<XrdClHttp::HandlerQueue> queue, XrdCl::Log *log) :
46  m_queue(queue),
47  m_logger(log),
48  m_default_put_handler(new PutDefaultHandler(*this))
49  {}
50 
51  virtual ~File() noexcept;
52 
53  virtual XrdCl::XRootDStatus Open(const std::string &url,
54  XrdCl::OpenFlags::Flags flags,
55  XrdCl::Access::Mode mode,
56  XrdCl::ResponseHandler *handler,
57  time_t timeout) override;
58 
59  virtual XrdCl::XRootDStatus Close(XrdCl::ResponseHandler *handler,
60  time_t timeout) override;
61 
62  virtual XrdCl::XRootDStatus Stat(bool force,
63  XrdCl::ResponseHandler *handler,
64  time_t timeout) override;
65 
66  virtual XrdCl::XRootDStatus Fcntl(const XrdCl::Buffer &arg,
67  XrdCl::ResponseHandler *handler,
68  time_t timeout) override;
69 
70  virtual XrdCl::XRootDStatus Read(uint64_t offset,
71  uint32_t size,
72  void *buffer,
73  XrdCl::ResponseHandler *handler,
74  time_t timeout) override;
75 
76  virtual XrdCl::XRootDStatus PgRead(uint64_t offset,
77  uint32_t size,
78  void *buffer,
79  XrdCl::ResponseHandler *handler,
80  time_t timeout) override;
81 
82  virtual XrdCl::XRootDStatus VectorRead(const XrdCl::ChunkList &chunks,
83  void *buffer,
84  XrdCl::ResponseHandler *handler,
85  time_t timeout ) override;
86 
87  virtual XrdCl::XRootDStatus Write(uint64_t offset,
88  uint32_t size,
89  const void *buffer,
90  XrdCl::ResponseHandler *handler,
91  time_t timeout) override;
92 
93  virtual XrdCl::XRootDStatus Write(uint64_t offset,
94  XrdCl::Buffer &&buffer,
95  XrdCl::ResponseHandler *handler,
96  time_t timeout) override;
97 
98  virtual bool IsOpen() const override;
99 
100  virtual bool SetProperty( const std::string &name,
101  const std::string &value ) override;
102 
103  virtual bool GetProperty( const std::string &name,
104  std::string &value ) const override;
105 
106  // Returns the flags used to open the file
107  XrdCl::OpenFlags::Flags Flags() const {return m_open_flags;}
108 
109  // Sets the minimum client timeout
110  static void SetMinimumHeaderTimeout(struct timespec &ts) {m_min_client_timeout.tv_sec = ts.tv_sec; m_min_client_timeout.tv_nsec = ts.tv_nsec;}
111 
112  // Gets the minimum client timeout
113  static const struct timespec &GetMinimumHeaderTimeout() {return m_min_client_timeout;}
114 
115  // Sets the default header timeout
116  static void SetDefaultHeaderTimeout(struct timespec &ts) {m_default_header_timeout.tv_sec = ts.tv_sec; m_default_header_timeout.tv_nsec = ts.tv_nsec;}
117 
118  // Gets the default header timeout
119  static const struct timespec &GetDefaultHeaderTimeout() {return m_default_header_timeout;}
120 
121  // Sets the open file's header timeout
122  void SetHeaderTimeout(const struct timespec &ts) {m_header_timeout.tv_sec = ts.tv_sec; m_header_timeout.tv_nsec = ts.tv_nsec;}
123 
124  // Get the header timeout value, taking into consideration the contents of the header and XrdCl's default values
125  static struct timespec ParseHeaderTimeout(const std::string &header_value, XrdCl::Log *logger);
126 
127  // Get the header timeout value, taking into consideration the provided command timeout, the existing open timeout, and XrdCl's default values
128  struct timespec GetHeaderTimeout(time_t oper_timeout) const;
129 
130  // Get the header timeout value, taking into consideration the provided command timeout, a default timeout, and XrdCl's default values
131  static struct timespec GetHeaderTimeoutWithDefault(time_t oper_timeout, const struct timespec &header_timeout);
132 
133  // Set the federation metadata timeout
134  static void SetFederationMetadataTimeout(const struct timespec &ts) {m_fed_timeout.tv_sec = ts.tv_sec; m_fed_timeout.tv_nsec = ts.tv_nsec;}
135 
136  // Get the federation metadata timeout
137  static struct timespec GetFederationMetadataTimeout() {return m_fed_timeout;}
138 
139  // Get the global monitoring statistics data
140  static std::string GetMonitoringJson();
141 
142 private:
143 
144  // Try to read a buffer via the prefetch mechanism.
145  //
146  // Returns tuple (status, ok); if `ok` is set to true, then the operation
147  // was attempted. Otherwise, the operation was skipped and `status` should
148  // be ignored.
149  std::tuple<XrdCl::XRootDStatus, bool> ReadPrefetch(uint64_t offset, uint64_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout, bool isPgRead);
150 
151  // The "*Response" variant of the callback response objects defined in DirectorCacheResponse.hh
152  // are opt-in; if the caller isn't expecting them, then they will leak memory. This
153  // function determines whether the opt-in is enabled.
154  bool SendResponseInfo() const;
155 
156  // Returns a pointer to the connection callout function
157  CreateConnCalloutType GetConnCallout() const;
158 
159  // Get the current URL to use for file operations.
160  //
161  // The `XrdClHttpQueryParam` property allows for additional query parameters to be added by
162  // the owner of the file handle; these can change while the file is open (for example, if there
163  // is a credential in the query parameter that might expire) so we must reconstruct the URL,
164  // even for `Read`-type calls.
165  const std::string GetCurrentURL() const;
166 
167  // Calculate the current URL given the query parameter value
168  //
169  // Must be called with the m_properties_mutex held for write.
170  void CalculateCurrentURL(const std::string &value) const;
171 
172  bool m_is_opened{false};
173  std::atomic<bool> m_full_download{false}; // Whether the file was in "full download mode" when opened.
174 
175  // The flags used to open the file
177 
178  std::string m_url; // The URL as given to the Open() method.
179  std::string m_last_url; // The last server the file was connected to after Open() (potentially after redirections)
180  mutable std::string m_url_current; // The URL to use for future HTTP requests; may be the last URL plus additional query parameters.
181  std::shared_ptr<XrdClHttp::HandlerQueue> m_queue;
182  XrdCl::Log *m_logger{nullptr};
183  std::unordered_map<std::string, std::string> m_properties;
184 
185  // Protects the contents of m_properties
186  mutable std::shared_mutex m_properties_mutex;
187 
188  // The header timeout for the current file
189  struct timespec m_timeout{0, 0};
190 
191  // The minimum timeout value requested by a client that we will honor
192  static struct timespec m_min_client_timeout;
193 
194  // The default header timeout.
195  static struct timespec m_default_header_timeout;
196 
197  // The per-file header timeout.
198  struct timespec m_header_timeout;
199 
200  // The federation metadata timeout.
201  static struct timespec m_fed_timeout;
202 
203  // An in-progress put operation.
204  //
205  // This shared pointer is also copied to the queue and kept
206  // by the curl worker thread. We will need to refer to the
207  // operation later to continue the write.
208  std::shared_ptr<XrdClHttp::CurlPutOp> m_put_op;
209 
210  // A response handler for PUT operations that ensures multiple writes are serialized.
211  class PutResponseHandler : public XrdCl::ResponseHandler {
212  public:
213  PutResponseHandler(XrdCl::ResponseHandler *handler);
214 
215  virtual void HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) override;
216 
217  XrdCl::XRootDStatus QueueWrite(std::variant<std::pair<const void *, size_t>, XrdCl::Buffer> buffer, XrdCl::ResponseHandler *handler);
218 
219  void SetOp(std::shared_ptr<XrdClHttp::CurlPutOp> op) {m_op = op;}
220 
221  void WaitForCompletion();
222 
223  private:
224  bool m_active{true};
225  bool m_initial{true};
226  std::shared_ptr<XrdClHttp::CurlPutOp> m_op;
227  XrdCl::ResponseHandler *m_active_handler;
228  std::condition_variable m_cv;
229  std::mutex m_mutex;
230  std::deque<std::tuple<std::variant<std::pair<const void *, size_t>, XrdCl::Buffer>, XrdCl::ResponseHandler*>> m_pending_writes;
231 
232  // Start the next pending write operation.
233  // Returns false if the active handler was invoked due to an error.
234  bool ProcessQueue();
235  };
236 
237  // The callback handler for the in-progress put operation.
238  // This handler wraps the user-provided handler to ensure
239  // that multiple writes are serialized.
240  std::atomic<PutResponseHandler *>m_put_handler{nullptr};
241 
242  // Ultimate length of the in-progress PUT operation
243  off_t m_asize{-1};
244 
245  // Handle a failure in the PUT code while there are no outstanding
246  // write requests
247  class PutDefaultHandler : public XrdCl::ResponseHandler {
248  public:
249  PutDefaultHandler(File &file) : m_logger(file.m_logger) {}
250 
251  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response);
252 
253  private:
254  XrdCl::Log *m_logger{nullptr};
255  };
256 
257  // The default object for all put failures
258  std::shared_ptr<PutDefaultHandler> m_default_put_handler;
259 
260  // An in-progress GET operation
261  //
262  // For the first read from the file, we will issue a GET for
263  // the entire rest of the file. As in-sequence reads are
264  // encountered, they will be fed from the prefetch operation instead
265  // of standalone reads.
266  std::shared_ptr<XrdClHttp::CurlReadOp> m_prefetch_op;
267 
268  // Next offset for prefetching.
269  // Protected by m_prefetch_mutex
270  std::atomic<off_t> m_prefetch_offset{0};
271 
272  // Prefetch callback handler class
273  //
274  // Objects form a linked list of pending prefetch handlers.
275  // Once the first entry in the list is completed, it will pass the prefetch
276  // operation to the subsequent entry.
277  class PrefetchResponseHandler : public XrdCl::ResponseHandler {
278  public:
279 
280  // Constructor for the prefetch response handler.
281  // - `parent`: The parent file object for the prefetch.
282  // - `offset`: The offset within the file to start the prefetch.
283  // - `size`: The size of the prefetch operation.
284  // - `prefetch_offset`: A reference to the prefetch offset. As the prefetch
285  // operation progresses, this offset will be updated to reflect the new
286  // position in the file. Lifetime must exceed that of the response handler.
287  // - `buffer`: A pointer to the buffer to store the prefetch data.
288  // - `handler`: The response handler for the prefetch operation.
289  // - `lock`: A unique lock for the prefetch operation. If `lock` is the `nullptr`,
290  // then we assume this is called during the creation of the m_prefetch_op and we
291  // will assume that this is NOT a continuation of the existing operation. In that
292  // case, the lock will not be dropped during the constructor. A reference to the
293  // lock is not taken outside the constructor. The lock must be held when the
294  // constructor is called.
295  // - `timeout`: The timeout for the prefetch operation.
296  //
297  // The constructor can throw a std::runtime_exception if the handler would have
298  // continued an ongoing prefetch operation but it failed to submit it.
299  PrefetchResponseHandler(File &parent,
300  off_t offset, size_t size, std::atomic<off_t> *prefetch_offset, char *buffer, XrdCl::ResponseHandler *handler,
301  std::unique_lock<std::mutex> *lock, time_t timeout);
302 
303  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response);
304 
305  private:
306  // When the prefetch fails, we must resubmit our handler as a non-prefetching read.
307  void ResubmitOperation();
308 
309  // The open file we are associated with
310  File &m_parent;
311 
312  // A reference to the handler we are wrapping. Note we don't own the handler
313  // so this is not a unique_ptr.
314  XrdCl::ResponseHandler *m_handler;
315 
316  // A reference to the next handle in the linked list.
317  PrefetchResponseHandler *m_next{nullptr};
318 
319  // The buffer for this prefetch callback
320  char *m_buffer{nullptr};
321 
322  // The size of the prefetch callback buffer
323  size_t m_size{0};
324 
325  // The offset of the operation within the file.
326  off_t m_offset{0};
327 
328  // A pointer to the prefetch offset. If the read is shorter than we had
329  // expected, we'll decrease the offset pointer to match the actual size.
330  std::atomic<off_t> *m_prefetch_offset{nullptr};
331 
332 
333  // The desired timeout for the operation.
334  time_t m_timeout{0};
335  };
336 
337  // Last prefetch handler on the stack.
338  PrefetchResponseHandler *m_last_prefetch_handler{nullptr};
339 
340  // Size of prefetch operation
341  off_t m_prefetch_size{-1};
342 
343  // Offset of the next write operation;
344  std::atomic<off_t> m_put_offset{0};
345 
346  // Handle a failure in the prefetch code while there is no outstanding
347  // read requests
348  //
349  // The status of the File's prefetching is kept in this class because the callback's
350  // lifetime is independent of the File and the callback needs to be able to disable
351  // prefetching.
352  class PrefetchDefaultHandler : public XrdCl::ResponseHandler {
353  public:
354  PrefetchDefaultHandler(File &file) : m_logger(file.m_logger), m_url(file.m_url) {}
355 
356  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response);
357 
358  // Disable prefetching for all future operations
359  void DisablePrefetch() {
360  auto enabled = m_prefetch_enabled.load(std::memory_order_relaxed);
361  if (enabled) {
362  std::unique_lock lock(m_prefetch_mutex);
363  m_prefetch_enabled.store(false, std::memory_order_relaxed);
364  }
365  }
366 
367  // Determine if we are prefetching
368  bool IsPrefetching() const {
369  auto enabled = m_prefetch_enabled.load(std::memory_order_relaxed);
370  if (enabled) {
371  std::unique_lock lock(m_prefetch_mutex);
372  return m_prefetch_enabled.load(std::memory_order_relaxed);
373  }
374  return false;
375  }
376 
377  XrdCl::Log *m_logger{nullptr};
378  std::string m_url;
379 
380  // Mutex protecting the state of the in-progress GET operation
381  // and relevant callback handlers and state
382  mutable std::mutex m_prefetch_mutex;
383 
384  // Whether prefetching is active
385  //
386  // If set to "false", then prefetch is disabled.
387  // If set to "true", then you must re-read the value with
388  // m_prefetch_mutex held to ensure is actually true and not
389  // a spurious reading.
390  mutable std::atomic<bool> m_prefetch_enabled{true};
391  };
392 
393  // "Default" handler for prefetching
394  //
395  // When there is no outstanding read operation but the prefetch
396  // operation fails, this will be called
397  std::shared_ptr<PrefetchDefaultHandler> m_default_prefetch_handler;
398 
399  // Pointer to the header callout function
400  std::atomic<XrdClHttp::HeaderCallout *> m_header_callout{nullptr};
401 
402  // Class for setting up the required HTTP headers for S3 requests
403  class HeaderCallout : public XrdClHttp::HeaderCallout {
404  public:
405  HeaderCallout(File &fs) : m_parent(fs)
406  {}
407 
408  virtual ~HeaderCallout() noexcept = default;
409 
410  virtual std::shared_ptr<HeaderList> GetHeaders(const std::string &verb,
411  const std::string &url,
412  const HeaderList &headers) override;
413 
414  private:
415  File &m_parent;
416  };
417 
418  HeaderCallout m_default_header_callout{*this};
419 
420  static std::atomic<uint64_t> m_prefetch_count; // Count of prefetch operations that have been initiated.
421  static std::atomic<uint64_t> m_prefetch_expired_count; // Count of prefetch operations that have expired due to unused data.
422  static std::atomic<uint64_t> m_prefetch_failed_count; // Count of prefetch operations that have failed due to errors.
423  static std::atomic<uint64_t> m_prefetch_reads_hit; // Count of read operations served from prefetch data.
424  static std::atomic<uint64_t> m_prefetch_reads_miss; // Count of read operations that were not served from prefetch data.
425  static std::atomic<uint64_t> m_prefetch_bytes_used; // Count of prefetch operations that have succeeded.
426 };
427 
428 }
429 
430 #endif // XRDCLHTTP_CURLFILE_HH
static std::string ts()
timestamp output for logging messages
Definition: XrdCephOss.cc:53
static void parent()
int Mode
struct timespec GetHeaderTimeout(time_t oper_timeout) const
File(std::shared_ptr< XrdClHttp::HandlerQueue > queue, XrdCl::Log *log)
virtual bool IsOpen() const override
virtual XrdCl::XRootDStatus Stat(bool force, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Open(const std::string &url, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode mode, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Fcntl(const XrdCl::Buffer &arg, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual bool GetProperty(const std::string &name, std::string &value) const override
void SetHeaderTimeout(const struct timespec &ts)
static const struct timespec & GetMinimumHeaderTimeout()
virtual XrdCl::XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual ~File() noexcept
virtual XrdCl::XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
XrdCl::OpenFlags::Flags Flags() const
static void SetFederationMetadataTimeout(const struct timespec &ts)
virtual bool SetProperty(const std::string &name, const std::string &value) override
static void SetDefaultHeaderTimeout(struct timespec &ts)
static const struct timespec & GetDefaultHeaderTimeout()
static struct timespec GetFederationMetadataTimeout()
virtual XrdCl::XRootDStatus Close(XrdCl::ResponseHandler *handler, time_t timeout) override
static struct timespec GetHeaderTimeoutWithDefault(time_t oper_timeout, const struct timespec &header_timeout)
static std::string GetMonitoringJson()
static void SetMinimumHeaderTimeout(struct timespec &ts)
virtual XrdCl::XRootDStatus VectorRead(const XrdCl::ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
static struct timespec ParseHeaderTimeout(const std::string &header_value, XrdCl::Log *logger)
virtual XrdCl::XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
Binary blob representation.
Definition: XrdClBuffer.hh:34
An interface for file plug-ins.
Handle diagnostics.
Definition: XrdClLog.hh:101
Handle an async response.
std::vector< ChunkInfo > ChunkList
List of chunks.
XrdSysError Log
Definition: XrdConfig.cc:113
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
Flags
Open flags, may be or'd when appropriate.