XRootD
XrdClHttpFile.cc
Go to the documentation of this file.
1 /***************************************************************
2  *
3  * Copyright (C) 2025, Morgridge Institute for Research
4  *
5  ***************************************************************/
6 
7 #include "XrdClHttpFile.hh"
8 #include "XrdClHttpFilesystem.hh"
9 #include "XrdClHttpOps.hh"
10 #include "XrdClHttpParseTimeout.hh"
11 #include "XrdClHttpResponses.hh"
12 #include "XrdClHttpUtil.hh"
13 #include "XrdClHttpWorker.hh"
14 
15 #include <XrdCl/XrdClConstants.hh>
16 #include <XrdCl/XrdClDefaultEnv.hh>
17 #include <XrdCl/XrdClLog.hh>
18 #include <XrdCl/XrdClStatus.hh>
19 #include <XrdCl/XrdClURL.hh>
20 #include <XrdOuc/XrdOucCRC.hh>
21 #include <XrdSys/XrdSysPageSize.hh>
22 #include <XrdOuc/XrdOucJson.hh>
23 
24 #include <charconv>
25 #include <iostream>
26 
27 using namespace XrdClHttp;
28 
29 std::atomic<uint64_t> File::m_prefetch_count = 0;
30 std::atomic<uint64_t> File::m_prefetch_expired_count = 0;
31 std::atomic<uint64_t> File::m_prefetch_failed_count = 0;
32 std::atomic<uint64_t> File::m_prefetch_reads_hit = 0;
33 std::atomic<uint64_t> File::m_prefetch_reads_miss = 0;
34 std::atomic<uint64_t> File::m_prefetch_bytes_used = 0;
35 
36 namespace {
37 
38 // A response handler for the file open operation when "full download" is requested.
39 //
40 // In this case, the open triggers a GET of the entire object with a zero-sized buffer;
41 // that means the response handler is invoked as soon as the GET response is started.
42 // Subsequent calls to Read() will return the data from the GET response.
43 class OpenFullDownloadResponseHandler : public XrdCl::ResponseHandler {
44 public:
45  OpenFullDownloadResponseHandler(bool *is_opened, bool send_response_info, XrdCl::ResponseHandler *handler)
46  : m_send_response_info(send_response_info), m_is_opened(is_opened), m_handler(handler)
47  {}
48 
49  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
50  std::unique_ptr<OpenFullDownloadResponseHandler> holder(this);
51  std::unique_ptr<XrdCl::AnyObject> response_holder(response);
52  std::unique_ptr<XrdCl::XRootDStatus> status_holder(status);
53 
54  if (!status || !status->IsOK()) {
55  if (m_handler) m_handler->HandleResponse(status_holder.release(), response_holder.release());
56  return;
57  }
58  if (m_is_opened) *m_is_opened = true;
59  if (!m_handler) {
60  return;
61  }
62  if (m_send_response_info) {
63  XrdCl::ChunkInfo *ci = nullptr;
64  response->Get(ci);
65  if (!ci) {
66  m_handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInternal, ENOENT, "No ChunkInfo in response"), nullptr);
67  return;
68  }
69  std::unique_ptr<XrdClHttp::ReadResponseInfo> read_response_info(static_cast<XrdClHttp::ReadResponseInfo *>(ci));
70  auto info = read_response_info->GetResponseInfo();
72  open_info->SetResponseInfo(std::move(info));
73  auto obj = new XrdCl::AnyObject();
74  obj->Set(open_info);
75  m_handler->HandleResponse(status_holder.release(), obj);
76  } else {
77  m_handler->HandleResponse(status_holder.release(), nullptr);
78  }
79  }
80 private:
81  bool m_send_response_info; // If true, the response handler will set the response info object.
82  bool *m_is_opened; // If the file-open is successful, this will be set to true.
83  XrdCl::ResponseHandler *m_handler; // The handler to call with the final result
84 };
85 
86 // A response handler for the "normal" open mode (which typically translates
87 // to a HEAD or PROPFIND).
88 class OpenResponseHandler : public XrdCl::ResponseHandler {
89 public:
90  OpenResponseHandler(bool *is_opened, XrdCl::ResponseHandler *handler)
91  : m_is_opened(is_opened), m_handler(handler)
92  {}
93 
94  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
95  std::unique_ptr<OpenResponseHandler> holder(this);
96  std::unique_ptr<XrdCl::AnyObject> response_holder(response);
97  std::unique_ptr<XrdCl::XRootDStatus> status_holder(status);
98 
99  if (!status || !status->IsOK()) {
100  if (m_handler) m_handler->HandleResponse(status_holder.release(), response_holder.release());
101  return;
102  }
103  if (m_is_opened) *m_is_opened = true;
104  if (!m_handler) {
105  return;
106  }
107  m_handler->HandleResponse(status_holder.release(), response_holder.release());
108  }
109 
110 private:
111  bool *m_is_opened; // If the file-open is successful, this will be set to true.
112  XrdCl::ResponseHandler *m_handler; // The handler to call with the final result
113 };
114 
115 // A response handler that transforms the read result into a PageInfo object.
116 // This is used for page reads which require a checksum of each page; note
117 // this is computed client-side whereas for the xroot protocol the checksum is computed server-side.
118 class PgReadResponseHandler : public XrdCl::ResponseHandler {
119 public:
120  PgReadResponseHandler(XrdCl::ResponseHandler *handler)
121  : m_handler(handler)
122  {}
123 
124  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
125  std::unique_ptr<PgReadResponseHandler> holder(this);
126  if (!status || !status->IsOK()) {
127  if (m_handler) m_handler->HandleResponse(status, response);
128  else delete response;
129  return;
130  }
131  if (!m_handler) {
132  delete response;
133  return;
134  }
135 
136  // Transform the read result ChunkInfo into a PageInfo.
137  XrdCl::ChunkInfo *ci = nullptr;
138  response->Get(ci);
139  if (!ci) {
140  delete response;
141  if (m_handler) m_handler->HandleResponse(status, nullptr);
142  return;
143  }
144  std::vector<uint32_t> cksums;
145  size_t nbpages = ci->GetLength() / XrdSys::PageSize;
146  if (ci->GetLength() % XrdSys::PageSize) ++nbpages;
147  cksums.reserve(nbpages);
148 
149  auto buffer = static_cast<const char *>(ci->GetBuffer());
150  size_t size = ci->GetLength();
151  for (size_t pg=0; pg<nbpages; ++pg)
152  {
153  auto pgsize = static_cast<size_t>(XrdSys::PageSize);
154  if (pgsize > size) pgsize = size;
155  cksums.push_back(XrdOucCRC::Calc32C(buffer, pgsize));
156  buffer += pgsize;
157  size -= pgsize;
158  }
159 
160  auto page_info = new XrdCl::PageInfo(ci->GetOffset(), ci->GetLength(), ci->GetBuffer(), std::move(cksums));
161  auto obj = new XrdCl::AnyObject();
162  obj->Set(page_info);
163  delete response;
164  auto handle = m_handler;
165  m_handler = nullptr;
166  handle->HandleResponse(status, obj);
167  }
168 
169 private:
170  XrdCl::ResponseHandler *m_handler;
171 };
172 
173 // A response handler for close operations that require creating a zero-length
174 // object.
175 class CloseCreateHandler : public XrdCl::ResponseHandler {
176 public:
177  CloseCreateHandler(XrdCl::ResponseHandler *handler)
178  : m_handler(handler)
179  {}
180 
181  virtual void HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) {
182  std::unique_ptr<CloseCreateHandler> self(this);
183  std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
184  std::unique_ptr<XrdCl::AnyObject> response(response_raw);
185 
186  if (m_handler) m_handler->HandleResponse(status.release(), nullptr);
187  }
188 
189 private:
190  XrdCl::ResponseHandler *m_handler;
191 };
192 
193 } // anonymous namespace
194 
195 // Note: these values are typically overwritten by `CurlFactory::CurlFactory`;
196 // they are set here just to avoid uninitialized globals.
197 struct timespec XrdClHttp::File::m_min_client_timeout = {2, 0};
198 struct timespec XrdClHttp::File::m_default_header_timeout = {9, 5};
199 struct timespec XrdClHttp::File::m_fed_timeout = {5, 0};
200 
201 
202 File::~File() noexcept {
203  auto handler = m_put_handler.load(std::memory_order_acquire);
204  if (handler) {
205  // We must wait for all ongoing writes to complete; the XrdCl::File
206  // destructor will trigger a Close() operation when it is called without
207  // waiting for the Close to finish, then invoke our destructor.
208  // If the Close() is still ongoing, then the handler will receive a
209  // callback after its memory is freed.
210  handler->WaitForCompletion();
211  delete handler;
212  }
213 }
214 
216 File::GetConnCallout() const {
217  std::string pointer_str;
218  if (!GetProperty("XrdClConnectionCallout", pointer_str) && pointer_str.empty()) {
219  return nullptr;
220  }
221  long long pointer;
222  try {
223  pointer = std::stoll(pointer_str, nullptr, 16);
224  } catch (...) {
225  return nullptr;
226  }
227  if (!pointer) {
228  return nullptr;
229  }
230  return reinterpret_cast<CreateConnCalloutType>(pointer);
231 }
232 
233 struct timespec
234 File::ParseHeaderTimeout(const std::string &timeout_string, XrdCl::Log *logger)
235 {
236  struct timespec ts = File::GetDefaultHeaderTimeout();
237  if (!timeout_string.empty()) {
238  std::string errmsg;
239  // Parse the provided timeout and decrease by a second if we can (if it's below a second, halve it).
240  // The thinking is that if the client needs a response in N seconds, then we ought to set the internal
241  // timeout to (N-1) seconds to provide enough time for our response to arrive at the client.
242  if (!XrdClHttp::ParseTimeout(timeout_string, ts, errmsg)) {
243  logger->Error(kLogXrdClHttp, "Failed to parse xrdclhttp.timeout parameter: %s", errmsg.c_str());
244  } else if (ts.tv_sec >= 1) {
245  ts.tv_sec--;
246  } else {
247  ts.tv_nsec /= 2;
248  }
249  }
250  const auto mct = File::GetMinimumHeaderTimeout();
251  if (ts.tv_sec < mct.tv_sec ||
252  (ts.tv_sec == mct.tv_sec && ts.tv_nsec < mct.tv_nsec))
253  {
254  ts.tv_sec = mct.tv_sec;
255  ts.tv_nsec = mct.tv_nsec;
256  }
257 
258  return ts;
259 }
260 
261 struct timespec
262 File::GetHeaderTimeoutWithDefault(time_t oper_timeout, const struct timespec &header_timeout)
263 {
264  if (oper_timeout == 0) {
266  XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
267  oper_timeout = val;
268  }
269  if (oper_timeout <= 0) {
270  return header_timeout;
271  }
272  if (oper_timeout == header_timeout.tv_sec) {
273  return {header_timeout.tv_sec, 0};
274  } else if (header_timeout.tv_sec < oper_timeout) {
275  return header_timeout;
276  } else { // header timeout is larger than the operation timeout
277  return {oper_timeout, 0};
278  }
279 }
280 
281 struct timespec
282 File::GetHeaderTimeout(time_t oper_timeout) const
283 {
284  return GetHeaderTimeoutWithDefault(oper_timeout, m_header_timeout);
285 }
286 
287 std::string
289 {
290  return "{\"prefetch\": {"
291  "\"count\": " + std::to_string(m_prefetch_count) + ","
292  "\"expired\": " + std::to_string(m_prefetch_expired_count) + ","
293  "\"failed\": " + std::to_string(m_prefetch_failed_count) + ","
294  "\"reads_hit\": " + std::to_string(m_prefetch_reads_hit) + ","
295  "\"reads_miss\": " + std::to_string(m_prefetch_reads_miss) + ","
296  "\"bytes_used\": " + std::to_string(m_prefetch_bytes_used) +
297  "}}";
298 }
299 
301 File::Open(const std::string &url,
303  XrdCl::Access::Mode mode,
304  XrdCl::ResponseHandler *handler,
305  time_t timeout)
306 {
307  if (m_is_opened) {
308  m_logger->Error(kLogXrdClHttp, "URL %s already open", url.c_str());
310  }
311 
312  // Note: workaround for a design flaw of the XrdCl API.
313  //
314  // Any properties we set on the file *prior* to opening it are sent to the
315  // XrdCl base implementation, not the plugin object. Hence, they are effectively
316  // ignored because the later `GetProperty` accesses a different object. We want
317  // the SetProperty calls to take effect because they are needed for successfully
318  // `Open`ing the file. There's no way to "setup the plugin", "set properties", and
319  // then "open file" because the first and third operations are part of the same API
320  // call. We thus allow the caller to trigger the plugin loading by doing a special
321  // `Open` call (flags set to Compress, access mode None) that is a no-op.
322  //
323  // Contrast the XrdCl::File plugin loading style with XrdCl::Filesystem; the latter
324  // gets a target URL on construction, before any operations are done, allowing
325  // the `SetProperty` to work.
326  if ((flags == XrdCl::OpenFlags::Compress) && (mode == XrdCl::Access::None) &&
327  (handler == nullptr) && (timeout == 0))
328  {
329  return XrdCl::XRootDStatus();
330  }
331 
332  m_open_flags = flags;
333 
334  m_header_timeout.tv_nsec = m_default_header_timeout.tv_nsec;
335  m_header_timeout.tv_sec = m_default_header_timeout.tv_sec;
336  auto parsed_url = XrdCl::URL();
337  parsed_url.SetPort(0);
338  if (!parsed_url.FromString(url)) {
339  m_logger->Error(kLogXrdClHttp, "Failed to parse provided URL as a valid URL: %s", url.c_str());
341  }
342  auto pm = parsed_url.GetParams();
343  auto iter = pm.find("xrdclhttp.timeout");
344  std::string timeout_string = (iter == pm.end()) ? "" : iter->second;
345  m_header_timeout = ParseHeaderTimeout(timeout_string, m_logger);
346  pm["xrdclhttp.timeout"] = XrdClHttp::MarshalDuration(m_header_timeout);
347  parsed_url.SetParams(pm);
348  iter = pm.find("oss.asize");
349  if (iter != pm.end()) {
350  off_t asize;
351  auto ec = std::from_chars(iter->second.c_str(), iter->second.c_str() + iter->second.size(), asize);
352  if ((ec.ec == std::errc()) && (ec.ptr == iter->second.c_str() + iter->second.size()) && asize >= 0) {
353  m_asize = asize;
354  } else {
355  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Unable to parse oss.asize to a valid size");
356  }
357  pm.erase(iter);
358  parsed_url.SetParams(pm);
359  }
360 
361  m_url = parsed_url.GetURL();
362  m_last_url = "";
363  m_url_current = "";
364 
365  auto ts = GetHeaderTimeout(timeout);
366 
367  bool full_download = m_full_download.load(std::memory_order_relaxed);
368  m_default_prefetch_handler.reset(new PrefetchDefaultHandler(*this));
369  if (full_download) {
370  m_default_prefetch_handler->m_prefetch_enabled.store(true, std::memory_order_relaxed);
371  }
372 
373  if (full_download && !(flags & XrdCl::OpenFlags::Write)) {
374  m_logger->Debug(kLogXrdClHttp, "Opening %s in full download mode", m_url.c_str());
375 
376  handler = new OpenFullDownloadResponseHandler(&m_is_opened, SendResponseInfo(), handler);
377  m_prefetch_size = std::numeric_limits<off_t>::max();
378  auto [status, ok] = ReadPrefetch(0, 0, nullptr, handler, timeout, false);
379  if (ok) {
380  return status;
381  } else {
382  m_logger->Error(kLogXrdClHttp, "Failed to start prefetch of data at open (URL %s): %s", m_url.c_str(), status.ToString().c_str());
383  return status;
384  }
385  }
386 
387 
388  m_logger->Debug(kLogXrdClHttp, "Opening %s (with timeout %lld)", m_url.c_str(), (long long) timeout);
389 
390  // This response handler sets the m_is_opened flag to true if the open callback is successfully invoked.
391  handler = new OpenResponseHandler(&m_is_opened, handler);
392 
393  std::shared_ptr<XrdClHttp::CurlOpenOp> openOp(
395  handler, GetCurrentURL(), ts, m_logger, this, SendResponseInfo(), GetConnCallout(),
396  &m_default_header_callout
397  )
398  );
399  try {
400  m_queue->Produce(std::move(openOp));
401  } catch (...) {
402  m_logger->Warning(kLogXrdClHttp, "Failed to add open op to queue");
404  }
405 
406  return XrdCl::XRootDStatus();
407 }
408 
411  time_t timeout)
412 {
413  if (!m_is_opened) {
414  m_logger->Error(kLogXrdClHttp, "Cannot close. URL isn't open");
416  }
417  m_is_opened = false;
418 
419  std::unique_ptr<XrdCl::XRootDStatus> status(new XrdCl::XRootDStatus{});
420  if (m_put_op && !m_put_op->HasFailed()) {
421  auto put_size = m_put_offset.load(std::memory_order_relaxed);
422  if (m_asize >= 0 && put_size == m_asize) {
423  if (put_size == m_asize) {
424  m_logger->Debug(kLogXrdClHttp, "Closing a finished file %s", m_url.c_str());
425  } else {
426  m_logger->Debug(kLogXrdClHttp, "Closing a file %s with partial size (offset %llu, expected %lld)",
427  m_url.c_str(), static_cast<unsigned long long>(put_size), static_cast<long long>(m_asize));
429  0, "Cannot close file with partial size"));
430  }
431  } else {
432  m_logger->Debug(kLogXrdClHttp, "Flushing final write buffer on close");
433  auto put_handler = m_put_handler.load(std::memory_order_acquire);
434  if (put_handler) {
435  return put_handler->QueueWrite(std::make_pair(nullptr, 0), handler);
436  } else {
437  m_logger->Error(kLogXrdClHttp, "Internal state error - put operation ongoing without handle");
439  }
440  }
441  } else if (!m_put_op && m_open_flags & XrdCl::OpenFlags::Write) {
442  timespec ts;
443  timespec_get(&ts, TIME_UTC);
444  ts.tv_sec += timeout;
445  m_asize = 0;
446  auto handler_wrapper = new PutResponseHandler(new CloseCreateHandler(handler));
447  m_put_handler.store(handler_wrapper, std::memory_order_release);
448  m_put_op.reset(new XrdClHttp::CurlPutOp(
449  handler_wrapper, m_default_put_handler, m_url, nullptr, 0, ts, m_logger,
450  GetConnCallout(), &m_default_header_callout
451  ));
452  handler_wrapper->SetOp(m_put_op);
453  m_url_current = "";
454  m_last_url = "";
455  m_logger->Debug(kLogXrdClHttp, "Creating a zero-sized object at %s for close", m_url.c_str());
456  try {
457  m_queue->Produce(m_put_op);
458  } catch (...) {
459  m_put_handler.store(nullptr, std::memory_order_release);
460  m_logger->Warning(kLogXrdClHttp, "Failed to add put op to queue");
462  }
463  return {};
464  }
465 
466  m_logger->Debug(kLogXrdClHttp, "Closed %s", m_url.c_str());
467  m_url_current = "";
468  m_last_url = "";
469 
470  if (handler) {
471  handler->HandleResponse(status.release(), nullptr);
472  }
473  return XrdCl::XRootDStatus();
474 }
475 
477 File::Stat(bool /*force*/,
478  XrdCl::ResponseHandler *handler,
479  time_t timeout)
480 {
481  if (!m_is_opened) {
482  m_logger->Error(kLogXrdClHttp, "Cannot stat. URL isn't open");
484  }
485 
486  std::string content_length_str;
487  int64_t content_length;
488  if (!GetProperty("ContentLength", content_length_str)) {
489  m_logger->Error(kLogXrdClHttp, "Content length missing for %s", m_url.c_str());
491  }
492  try {
493  content_length = std::stoll(content_length_str);
494  } catch (...) {
495  m_logger->Error(kLogXrdClHttp, "Content length not an integer for %s", m_url.c_str());
497  }
498  if (content_length < 0) {
499  m_logger->Error(kLogXrdClHttp, "Content length negative for %s", m_url.c_str());
501  }
502 
503  m_logger->Debug(kLogXrdClHttp, "Successful stat operation on %s (size %lld)", m_url.c_str(), static_cast<long long>(content_length));
504  auto stat_info = new XrdCl::StatInfo("nobody", content_length,
505  XrdCl::StatInfo::Flags::IsReadable, time(NULL));
506  auto obj = new XrdCl::AnyObject();
507  obj->Set(stat_info);
508 
509  handler->HandleResponse(new XrdCl::XRootDStatus(), obj);
510  return XrdCl::XRootDStatus();
511 }
512 
515  time_t timeout)
516 {
517  if (!m_is_opened) {
518  m_logger->Error(kLogXrdClHttp, "Cannot run fcntl. URL isn't open");
520  }
521 
522  auto obj = new XrdCl::AnyObject();
523  std::string as = arg.ToString();
524  try
525  {
526  XrdCl::QueryCode::Code code = (XrdCl::QueryCode::Code)std::stoi(as);
527  if (code == XrdCl::QueryCode::XAttr)
528  {
529  nlohmann::json xatt;
530  std::string etagRes;
531  if (GetProperty("ETag", etagRes))
532  {
533  xatt["ETag"] = etagRes;
534  }
535  std::string cc;
536  if (GetProperty("Cache-Control", cc))
537  {
538  if (cc.find("must-revalidate") != std::string::npos)
539  {
540  xatt["revalidate"] = true;
541  }
542  size_t fm = cc.find("max-age=");
543  if (fm != std::string::npos)
544  {
545  fm += 9; // idx of the first character after the make-age= match
546  for (size_t i = fm; i < cc.length(); i++)
547  {
548  if (!std::isdigit(cc[i]))
549  {
550  std::string sa = cc.substr(fm, i);
551  long int a = std::stol(sa);
552  time_t t = time(NULL) + a;
553  xatt["expire"] = t;
554  break;
555  }
556  }
557  }
558  }
559  XrdCl::Buffer *respBuff = new XrdCl::Buffer();
560  m_logger->Debug(kLogXrdClHttp, "Fcntl content %s", xatt.dump().c_str());
561  respBuff->FromString(xatt.dump());
562  obj->Set(respBuff);
563  }
564  //
565  // Query codes supported by XrdCl::File::Fctnl
566  //
567  else {
568  std::string msg;
570  switch (code) {
572  msg = "Server status query not supported.";
573  break;
574  case XrdCl::QueryCode::Checksum: // fallthrough
576  msg = "Checksum query not supported.";
577  break;
579  msg = "Server configuration query not supported.";
580  break;
582  msg = "Local space stats query not supported.";
583  break;
584  case XrdCl::QueryCode::Opaque: // fallthrough
586  // XrdCl implementation dependent
587  msg = "Opaque query not supported.";
588  break;
590  msg = "Prepare status query not supported.";
591  break;
592  default:
593  msg = "Invalid information query type code";
594  }
595  m_logger->Error(kLogXrdClHttp, "%s", msg.c_str());
596  return status;
597  }
598  }
599  catch (const std::exception& e)
600  {
601  m_logger->Warning(kLogXrdClHttp, "Failed to parse query code %s", e.what());
603  }
604 
605  handler->HandleResponse(new XrdCl::XRootDStatus(), obj);
606  return XrdCl::XRootDStatus();
607 }
608 
610 File::Read(uint64_t offset,
611  uint32_t size,
612  void *buffer,
613  XrdCl::ResponseHandler *handler,
614  time_t timeout)
615 {
616  if (!m_is_opened) {
617  m_logger->Error(kLogXrdClHttp, "Cannot read. URL isn't open");
619  }
620  auto [status, ok] = ReadPrefetch(offset, size, buffer, handler, timeout, false);
621  if (ok) {
622  if (status.IsOK()) {
623  m_logger->Debug(kLogXrdClHttp, "Read %s (%d bytes at offset %lld) will be served from prefetch handler", m_url.c_str(), size, static_cast<long long>(offset));
624  } else {
625  m_logger->Warning(kLogXrdClHttp, "Read %s (%d bytes at offset %lld) failed: %s", m_url.c_str(), size, static_cast<long long>(offset), status.GetErrorMessage().c_str());
626  }
627  return status;
628  } else if (m_full_download.load(std::memory_order_relaxed)) {
629  std::unique_lock lock(m_default_prefetch_handler->m_prefetch_mutex);
630  if (m_prefetch_op && m_prefetch_op->IsDone() && (static_cast<off_t>(offset) == m_prefetch_offset.load(std::memory_order_acquire))) {
631  if (handler) {
632  auto ci = new XrdCl::ChunkInfo(offset, 0, buffer);
633  auto obj = new XrdCl::AnyObject();
634  obj->Set(ci);
635  handler->HandleResponse(new XrdCl::XRootDStatus{}, obj);
636  }
637  return XrdCl::XRootDStatus{};
638  }
639  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Non-sequential read detected when in full-download mode");
640  }
641 
642  auto ts = GetHeaderTimeout(timeout);
643  auto url = GetCurrentURL();
644  m_logger->Debug(kLogXrdClHttp, "Read %s (%d bytes at offset %lld with timeout %lld)", url.c_str(), size, static_cast<long long>(offset), static_cast<long long>(ts.tv_sec));
645 
646  std::shared_ptr<XrdClHttp::CurlReadOp> readOp(
648  handler, m_default_prefetch_handler, url, ts, std::make_pair(offset, size),
649  static_cast<char*>(buffer), size, m_logger,
650  GetConnCallout(), &m_default_header_callout
651  )
652  );
653  try {
654  m_queue->Produce(std::move(readOp));
655  } catch (...) {
656  m_logger->Warning(kLogXrdClHttp, "Failed to add read op to queue");
658  }
659 
660  return XrdCl::XRootDStatus();
661 }
662 
663 std::tuple<XrdCl::XRootDStatus, bool>
664 File::ReadPrefetch(uint64_t offset, uint64_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout, bool isPgRead)
665 {
666  // Check if prefetching is enabled; if not, return early.
667  auto prefetch_enabled = m_default_prefetch_handler->m_prefetch_enabled.load(std::memory_order_relaxed);
668  if (!prefetch_enabled) {
669  m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
670  m_logger->Dump(kLogXrdClHttp, "%sRead prefetch skipping due to prefetching being disabled", isPgRead ? "Pg": "");
671  return std::make_tuple(XrdCl::XRootDStatus{}, false);
672  }
673  std::unique_lock lock(m_default_prefetch_handler->m_prefetch_mutex);
674  if (m_prefetch_size == -1) {
675  m_logger->Debug(kLogXrdClHttp, "%sRead prefetch skipping due to unknown file size", isPgRead ? "Pg": "");
676  m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
677  m_default_prefetch_handler->m_prefetch_enabled = false;
678  }
679  prefetch_enabled = m_default_prefetch_handler->m_prefetch_enabled;
680  if (!prefetch_enabled) {
681  m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
682  return std::make_tuple(XrdCl::XRootDStatus{}, false);
683  }
684 
685  if (isPgRead) {
686  handler = new PgReadResponseHandler(handler);
687  }
688 
689  auto url = GetCurrentURL();
690  if (!m_prefetch_op) {
691  auto ts = GetHeaderTimeout(timeout);
692  if (m_prefetch_size == INT64_MAX) {
693  m_logger->Debug(kLogXrdClHttp, "%sRead %s (%llu bytes at offset %lld with timeout %lld; starting prefetch full object)", isPgRead ? "Pg" : "", url.c_str(), static_cast<unsigned long long>(size), static_cast<long long>(offset), static_cast<long long>(ts.tv_sec));
694  } else {
695  m_logger->Debug(kLogXrdClHttp, "%sRead %s (%llu bytes at offset %lld with timeout %lld; starting prefetch of size %lld)", isPgRead ? "Pg" : "", url.c_str(), static_cast<unsigned long long>(size), static_cast<long long>(offset), static_cast<long long>(ts.tv_sec), static_cast<long long>(m_prefetch_size));
696  }
697 
698  try {
699  // Note we don't set m_last_prefetch_handler here; the constructor will do this automatically if necessary.
700  new PrefetchResponseHandler(*this, offset, size, &m_prefetch_offset, static_cast<char *>(buffer), handler, nullptr, timeout);
701  } catch (std::runtime_error &exc) {
702  m_logger->Warning(kLogXrdClHttp, "Failed to create prefetch response handler: %s", exc.what());
703  m_default_prefetch_handler->m_prefetch_enabled = false;
704  m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
705  return std::make_tuple(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError), true);
706  }
707 
708  // If we are prefetching as part of an open (i.e., a "full download"), there's special handling logic
709  // to pass along the response headers as file properties.
710  m_prefetch_op.reset(
711  m_is_opened ?
713  m_last_prefetch_handler, m_default_prefetch_handler, url, ts, std::make_pair(offset, m_prefetch_size),
714  static_cast<char*>(buffer), size, m_logger,
715  GetConnCallout(), &m_default_header_callout
716  )
717  :
719  *this, m_last_prefetch_handler, m_default_prefetch_handler, url, ts,
720  std::make_pair(offset, m_prefetch_size), static_cast<char*>(buffer), size, m_logger,
721  GetConnCallout(), &m_default_header_callout
722  )
723  );
724  lock.unlock();
725  m_prefetch_count.fetch_add(1, std::memory_order_relaxed);
726  m_prefetch_reads_hit.fetch_add(1, std::memory_order_relaxed);
727  m_prefetch_offset.store(offset + size, std::memory_order_release);
728  try {
729  m_queue->Produce(m_prefetch_op);
730  } catch (...) {
731  m_logger->Warning(kLogXrdClHttp, "Failed to add prefetch read op to queue");
732  lock.lock();
733  m_prefetch_op.reset();
734  m_default_prefetch_handler->m_prefetch_enabled = false;
735  m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
736  return std::make_tuple(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError), true);
737  }
738  return std::make_tuple(XrdCl::XRootDStatus{}, true);
739  }
740  if (m_prefetch_op->IsDone()) {
741  // Prefetch operation has completed (maybe failed); cannot re-use it.
742  m_default_prefetch_handler->m_prefetch_enabled = false;
743  m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
744  m_logger->Dump(kLogXrdClHttp, "%sRead prefetch skipping due to prefetching being already complete", isPgRead ? "Pg": "");
745  return std::make_tuple(XrdCl::XRootDStatus{}, false);
746  }
747 
748  auto expected_offset = static_cast<off_t>(offset);
749  if (!m_prefetch_offset.compare_exchange_strong(expected_offset, static_cast<off_t>(offset + size), std::memory_order_acq_rel)) {
750  // Out-of-order read; can't handle the prefetch.
751  m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
752  m_logger->Dump(kLogXrdClHttp, "%sRead prefetch skipping due to out-of-order reads (requested %lld; current offset %lld)", isPgRead ? "Pg": "", static_cast<long long>(offset), static_cast<long long>(expected_offset));
753  return std::make_tuple(XrdCl::XRootDStatus{}, false);
754  }
755  if (m_logger->GetLevel() >= XrdCl::Log::LogLevel::DebugMsg) {
756  m_logger->Debug(kLogXrdClHttp, "%sRead %s (%llu bytes at offset %lld; using ongoing prefetch)", isPgRead ? "Pg" : "", GetCurrentURL().c_str(), static_cast<unsigned long long>(size), static_cast<long long>(offset));
757  }
758  try {
759  // Notice we don't set m_last_prefetch_handler here; as soon as the constructor is invoked, another thread could have
760  // invoked the handler's callback and deleted it.
761  new PrefetchResponseHandler(*this, offset, size, &m_prefetch_offset, static_cast<char *>(buffer), handler, &lock, timeout);
762  } catch (std::runtime_error &exc) {
763  m_logger->Warning(kLogXrdClHttp, "Failed to create prefetch response handler: %s", exc.what());
764  m_default_prefetch_handler->m_prefetch_enabled = false;
765  m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
766  return std::make_tuple(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError), true);
767  }
768 
769  return std::make_tuple(XrdCl::XRootDStatus{}, true);
770 }
771 
773 File::VectorRead(const XrdCl::ChunkList &chunks,
774  void *buffer,
775  XrdCl::ResponseHandler *handler,
776  time_t timeout )
777 {
778  if (!m_is_opened) {
779  m_logger->Error(kLogXrdClHttp, "Cannot do vector read: URL isn't open");
781  } else if (m_full_download.load(std::memory_order_relaxed)) {
782  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Only sequential reads are supported when in full-download mode");
783  }
784  if (chunks.empty()) {
785  if (handler) {
786  auto status = new XrdCl::XRootDStatus();
787  auto vr = std::make_unique<XrdCl::VectorReadInfo>();
788  vr->SetSize(0);
789  auto obj = new XrdCl::AnyObject();
790  obj->Set(vr.release());
791  handler->HandleResponse(status, obj);
792  }
793  return XrdCl::XRootDStatus();
794  }
795 
796  auto ts = GetHeaderTimeout(timeout);
797  auto url = GetCurrentURL();
798  m_logger->Debug(kLogXrdClHttp, "Read %s (%lld chunks; first chunk is %u bytes at offset %lld with timeout %lld)", url.c_str(), static_cast<long long>(chunks.size()), static_cast<unsigned>(chunks[0].GetLength()), static_cast<long long>(chunks[0].GetOffset()), static_cast<long long>(ts.tv_sec));
799 
800  std::shared_ptr<XrdClHttp::CurlVectorReadOp> readOp(
802  handler, url, ts, chunks, m_logger, GetConnCallout(), &m_default_header_callout
803  )
804  );
805  try {
806  m_queue->Produce(std::move(readOp));
807  } catch (...) {
808  m_logger->Warning(kLogXrdClHttp, "Failed to add vector read op to queue");
810  }
811 
812  return XrdCl::XRootDStatus();
813 }
814 
816 File::Write(uint64_t offset,
817  uint32_t size,
818  const void *buffer,
819  XrdCl::ResponseHandler *handler,
820  time_t timeout)
821 {
822  if (!m_is_opened) {
823  m_logger->Error(kLogXrdClHttp, "Cannot write: URL isn't open");
825  } else if (m_full_download.load(std::memory_order_relaxed)) {
826  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Only sequential reads are supported when in full-download mode");
827  }
828  m_default_prefetch_handler->DisablePrefetch();
829 
830  auto ts = GetHeaderTimeout(timeout);
831  auto url = GetCurrentURL();
832  m_logger->Debug(kLogXrdClHttp, "Write %s (%d bytes at offset %lld with timeout %lld)", url.c_str(), size, static_cast<long long>(offset), static_cast<long long>(ts.tv_sec));
833 
834  auto handler_wrapper = m_put_handler.load(std::memory_order_relaxed);
835  if (!handler_wrapper) {
836  handler_wrapper = new PutResponseHandler(handler);
837  PutResponseHandler *expected_value = nullptr;
838  if (!m_put_handler.compare_exchange_strong(expected_value, handler_wrapper, std::memory_order_acq_rel)) {
839  delete handler_wrapper;
840  return expected_value->QueueWrite(std::make_pair(buffer, size), handler);
841  }
842 
843  if (offset != 0) {
844  m_put_handler.store(nullptr, std::memory_order_release);
845  delete handler_wrapper;
846  m_logger->Warning(kLogXrdClHttp, "Cannot start PUT operation at non-zero offset");
847  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidArgs, 0, "HTTP uploads must start at offset 0");
848  }
849  m_put_op.reset(new XrdClHttp::CurlPutOp(
850  handler_wrapper, m_default_put_handler, url, static_cast<const char*>(buffer), size, ts, m_logger,
851  GetConnCallout(), &m_default_header_callout
852  ));
853  handler_wrapper->SetOp(m_put_op);
854  m_put_offset.fetch_add(size, std::memory_order_acq_rel);
855  try {
856  m_queue->Produce(m_put_op);
857  } catch (...) {
858  m_put_handler.store(nullptr, std::memory_order_release);
859  delete handler_wrapper;
860  m_logger->Warning(kLogXrdClHttp, "Failed to add put op to queue");
862  }
863  return XrdCl::XRootDStatus();
864  }
865 
866  auto old_offset = m_put_offset.fetch_add(size, std::memory_order_acq_rel);
867  if (static_cast<off_t>(offset) != old_offset) {
868  m_put_offset.fetch_sub(size, std::memory_order_acq_rel);
869  m_logger->Warning(kLogXrdClHttp, "Requested write offset at %lld does not match current file descriptor offset at %lld",
870  static_cast<long long>(offset), static_cast<long long>(old_offset));
871  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidArgs, 0, "Requested write offset does not match current offset");
872  }
873  return handler_wrapper->QueueWrite(std::make_pair(buffer, size), handler);
874 }
875 
877 File::Write(uint64_t offset,
878  XrdCl::Buffer &&buffer,
879  XrdCl::ResponseHandler *handler,
880  time_t timeout)
881 {
882  if (!m_is_opened) {
883  m_logger->Error(kLogXrdClHttp, "Cannot write: URL isn't open");
885  }
886  m_default_prefetch_handler->DisablePrefetch();
887 
888  auto ts = GetHeaderTimeout(timeout);
889  auto url = GetCurrentURL();
890  m_logger->Debug(kLogXrdClHttp, "Write %s (%d bytes at offset %lld with timeout %lld)", url.c_str(), static_cast<int>(buffer.GetSize()), static_cast<long long>(offset), static_cast<long long>(ts.tv_sec));
891 
892  auto handler_wrapper = m_put_handler.load(std::memory_order_relaxed);
893  if (!handler_wrapper) {
894  handler_wrapper = new PutResponseHandler(handler);
895  PutResponseHandler *expected_value = nullptr;
896  if (!m_put_handler.compare_exchange_strong(expected_value, handler_wrapper, std::memory_order_acq_rel)) {
897  delete handler_wrapper;
898  return expected_value->QueueWrite(std::move(buffer), handler);
899  }
900 
901  if (offset != 0) {
902  m_put_handler.store(nullptr, std::memory_order_release);
903  delete handler_wrapper;
904  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidArgs, 0, "HTTP uploads must start at offset 0");
905  }
906  m_put_op.reset(new XrdClHttp::CurlPutOp(
907  handler_wrapper, m_default_put_handler, url, std::move(buffer), ts, m_logger,
908  GetConnCallout(), &m_default_header_callout
909  ));
910  handler_wrapper->SetOp(m_put_op);
911  m_put_offset.fetch_add(buffer.GetSize(), std::memory_order_acq_rel);
912  try {
913  m_queue->Produce(m_put_op);
914  } catch (...) {
915  m_put_handler.store(nullptr, std::memory_order_release);
916  delete handler_wrapper;
917  m_logger->Warning(kLogXrdClHttp, "Failed to add put op to queue");
919  }
920  return XrdCl::XRootDStatus();
921  }
922 
923  auto old_offset = m_put_offset.fetch_add(buffer.GetSize(), std::memory_order_acq_rel);
924  if (static_cast<off_t>(offset) != old_offset) {
925  m_put_offset.fetch_sub(buffer.GetSize(), std::memory_order_acq_rel);
926  m_logger->Warning(kLogXrdClHttp, "Requested write offset at %lld does not match current file descriptor offset at %lld",
927  static_cast<long long>(offset), static_cast<long long>(old_offset));
928  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidArgs, 0, "Requested write offset does not match current offset");
929  }
930  return handler_wrapper->QueueWrite(std::move(buffer), handler);
931 }
932 
934 File::PgRead(uint64_t offset,
935  uint32_t size,
936  void *buffer,
937  XrdCl::ResponseHandler *handler,
938  time_t timeout)
939 {
940  if (!m_is_opened) {
941  m_logger->Error(kLogXrdClHttp, "Cannot pgread. URL isn't open");
943  }
944  auto [status, ok] = ReadPrefetch(offset, size, buffer, handler, timeout, true);
945  if (ok) {
946  if (status.IsOK()) {
947  m_logger->Debug(kLogXrdClHttp, "PgRead %s (%d bytes at offset %lld) will be served from prefetch handler", m_url.c_str(), size, static_cast<long long>(offset));
948  } else {
949  m_logger->Warning(kLogXrdClHttp, "PgRead %s (%d bytes at offset %lld) failed: %s", m_url.c_str(), size, static_cast<long long>(offset), status.GetErrorMessage().c_str());
950  }
951  return status;
952  } else if (m_full_download.load(std::memory_order_relaxed)) {
953  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Non-sequential read detected when in full-download mode");
954  }
955 
956  auto ts = GetHeaderTimeout(timeout);
957  auto url = GetCurrentURL();
958  m_logger->Debug(kLogXrdClHttp, "PgRead %s (%d bytes at offset %lld)", url.c_str(), size, static_cast<long long>(offset));
959 
960  std::shared_ptr<XrdClHttp::CurlPgReadOp> readOp(
962  handler, m_default_prefetch_handler, url, ts, std::make_pair(offset, size),
963  static_cast<char*>(buffer), size, m_logger,
964  GetConnCallout(), &m_default_header_callout
965  )
966  );
967 
968  try {
969  m_queue->Produce(std::move(readOp));
970  } catch (...) {
971  m_logger->Warning(kLogXrdClHttp, "Failed to add read op to queue");
973  }
974 
975  return XrdCl::XRootDStatus();
976 }
977 
978 bool
979 File::IsOpen() const
980 {
981  return m_is_opened;
982 }
983 
984 bool
985 File::GetProperty(const std::string &name,
986  std::string &value) const
987 {
988  if (name == "CurrentURL") {
989  value = GetCurrentURL();
990  return true;
991  }
992 
993  if (name == "IsPrefetching") {
994  value = m_default_prefetch_handler->IsPrefetching() ? "true" : "false";
995  return true;
996  }
997 
998  std::shared_lock lock(m_properties_mutex);
999  if (name == "LastURL") {
1000  value = m_last_url;
1001  return true;
1002  }
1003 
1004  const auto p = m_properties.find(name);
1005  if (p == std::end(m_properties)) {
1006  return false;
1007  }
1008 
1009  value = p->second;
1010  return true;
1011 }
1012 
1013 bool File::SendResponseInfo() const {
1014  std::string val;
1015  return GetProperty(ResponseInfoProperty, val) && val == "true";
1016 }
1017 
1018 bool
1019 File::SetProperty(const std::string &name,
1020  const std::string &value)
1021 {
1022  if (name == "XrdClHttpHeaderCallout") {
1023  long long pointer;
1024  try {
1025  pointer = std::stoll(value, nullptr, 16);
1026  } catch (...) {
1027  pointer = 0;
1028  }
1029  m_header_callout.store(reinterpret_cast<XrdClHttp::HeaderCallout*>(pointer), std::memory_order_release);
1030  } else if (name == "XrdClHttpFullDownload") {
1031  if (value == "true") {
1032  auto prefetch_handler = m_default_prefetch_handler;
1033  if (prefetch_handler) {
1034  std::unique_lock lock(prefetch_handler->m_prefetch_mutex);
1035  prefetch_handler->m_prefetch_enabled.store(true, std::memory_order_relaxed);
1036  }
1037  m_full_download.store(true, std::memory_order_relaxed);
1038  }
1039  }
1040 
1041  std::unique_lock lock(m_properties_mutex);
1042 
1043  m_properties[name] = value;
1044  if (name == "LastURL") {
1045  m_last_url = value;
1046  m_url_current = "";
1047  }
1048  else if (name == "XrdClHttpQueryParam") {
1049  CalculateCurrentURL(value);
1050  }
1051  else if (name == "XrdClHttpMaintenancePeriod") {
1052  unsigned period;
1053  auto ec = std::from_chars(value.c_str(), value.c_str() + value.size(), period);
1054  if ((ec.ec == std::errc()) && (ec.ptr == value.c_str() + value.size()) && period > 0) {
1055  m_logger->Debug(kLogXrdClHttp, "Setting maintenance period to %u", period);
1057  }
1058  }
1059  else if (name == "XrdClHttpStallTimeout") {
1060  std::string errmsg;
1061  timespec ts;
1062  if (!ParseTimeout(value, ts, errmsg)) {
1063  m_logger->Debug(kLogXrdClHttp, "Failed to parse timeout value (%s): %s", value.c_str(), errmsg.c_str());
1064  } else {
1065  CurlOperation::SetStallTimeout(std::chrono::seconds{ts.tv_sec} + std::chrono::nanoseconds{ts.tv_nsec});
1066  }
1067  }
1068  else if (name == "XrdClHttpPrefetchSize") {
1069  off_t size;
1070  auto ec = std::from_chars(value.c_str(), value.c_str() + value.size(), size);
1071  if ((ec.ec == std::errc()) && (ec.ptr == value.c_str() + value.size())) {
1072  lock.unlock();
1073  std::unique_lock lock2(m_default_prefetch_handler->m_prefetch_mutex);
1074  m_prefetch_size = size;
1075  } else {
1076  m_logger->Debug(kLogXrdClHttp, "XrdClHttpPrefetchSize value (%s) was not parseable", value.c_str());
1077  }
1078  }
1079  return true;
1080 }
1081 
1082 const std::string
1083 File::GetCurrentURL() const {
1084  {
1085  std::shared_lock lock(m_properties_mutex);
1086 
1087  if (!m_url_current.empty()) {
1088  return m_url_current;
1089  } else if (m_url.empty() && m_last_url.empty()) {
1090  return "";
1091  }
1092  }
1093  std::unique_lock lock(m_properties_mutex);
1094 
1095  auto iter = m_properties.find("XrdClHttpQueryParam");
1096  if (iter == m_properties.end()) {
1097  return m_last_url.empty() ? m_url : m_last_url;
1098  }
1099  CalculateCurrentURL(iter->second);
1100 
1101  return m_url_current;
1102 }
1103 
1104 void
1105 File::CalculateCurrentURL(const std::string &value) const {
1106  const auto &last_url = m_last_url.empty() ? m_url : m_last_url;
1107  if (value.empty()) {
1108  m_url_current = last_url;
1109  } else {
1110  auto loc = last_url.find('?');
1111  if (loc == std::string::npos) {
1112  m_url_current = last_url + '?' + value;
1113  } else {
1114  XrdCl::URL url(last_url);
1115  auto map = url.GetParams(); // Make a copy of the pre-existing parameters
1116  url.SetParams(value); // Parse the new value
1117  auto update_map = url.GetParams();
1118  for (const auto &entry : map) {
1119  if (update_map.find(entry.first) == update_map.end()) {
1120  update_map[entry.first] = entry.second;
1121  }
1122  }
1123  bool first = true;
1124  std::stringstream ss;
1125  for (const auto &entry : update_map) {
1126  ss << (first ? "?" : "&") << entry.first << "=" << entry.second;
1127  first = false;
1128  }
1129  m_url_current = last_url.substr(0, loc) + ss.str();
1130  }
1131  }
1132 }
1133 
1134 File::PrefetchResponseHandler::PrefetchResponseHandler(
1135  File &parent, off_t offset, size_t size, std::atomic<off_t> *prefetch_offset, char *buffer,
1136  XrdCl::ResponseHandler *handler, std::unique_lock<std::mutex> *lock, time_t timeout
1137 )
1138  : m_parent(parent),
1139  m_handler(handler),
1140  m_buffer(buffer),
1141  m_size(size),
1142  m_offset(offset),
1143  m_prefetch_offset(prefetch_offset),
1144  m_timeout(timeout)
1145 {
1146  if (parent.m_last_prefetch_handler) {
1147  parent.m_last_prefetch_handler->m_next = this;
1148  parent.m_last_prefetch_handler = this;
1149  } else {
1150  m_parent.m_last_prefetch_handler = this;
1151  // If lock is nullptr, then we are guaranteed that this is called during the creation
1152  // of the m_prefetch_op and can skip this check.
1153  if (lock && m_parent.m_prefetch_op) {
1154  // If continuing the prefetch operation fails, then the failure callback
1155  // will be invoked; the callback requires the mutex and hence we need to unlock it
1156  // here to avoid a deadlock.
1157  lock->unlock();
1158  if (!parent.m_prefetch_op->Continue(parent.m_prefetch_op, this, buffer, size)) {
1159  lock->lock();
1160  // As soon as we unlock the lock, another thread could have used finished the
1161  // operation (which deletes the object); we must be careful to not touch the
1162  // object (reference m_*) in the meantime.
1163  if (parent.m_last_prefetch_handler == this)
1164  parent.m_last_prefetch_handler = nullptr;
1165  throw std::runtime_error("Failed to continue prefetch operation");
1166  }
1167  }
1168  }
1169 }
1170 
1171 void
1172 File::PrefetchResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
1173  // Ensure that we are deleted once the callback is done.
1174  std::unique_ptr<PrefetchResponseHandler> owner(this);
1175 
1176  bool mismatched_size = false;
1177  if (status) {
1178  if (status->IsOK() && response) {
1179  XrdCl::ChunkInfo *ci = nullptr;
1180  response->Get(ci);
1181  if (ci) {
1182  auto missing_bytes = m_size - ci->GetLength();
1183  if (missing_bytes) {
1184  mismatched_size = true;
1185  m_prefetch_offset->fetch_sub(missing_bytes, std::memory_order_relaxed);
1186  }
1187  m_prefetch_bytes_used.fetch_add(ci->GetLength(), std::memory_order_relaxed);
1188  }
1189  } else if (!status->IsOK()) {
1190  m_prefetch_failed_count.fetch_add(1, std::memory_order_relaxed);
1191  }
1192  }
1193 
1194  PrefetchResponseHandler *next;
1195  {
1196  std::unique_lock lock(m_parent.m_default_prefetch_handler->m_prefetch_mutex);
1197  next = m_next;
1198  }
1199  if (next) {
1200  if (status && status->IsOK() && !mismatched_size) {
1201  m_parent.m_prefetch_op->Continue(m_parent.m_prefetch_op, next, next->m_buffer, next->m_size);
1202  } else {
1203  // On failure resubmit subsequent operations.
1204  // All the subsequent ops also depend on us having the expected read length (otherwise the
1205  // file offsets are incorrect). If there's a mismatched read size (shorter actual bytes available
1206  // than what is originally requested), then that's another sign of potential issue and we disable
1207  // the prefetch mechanism.
1208  m_parent.m_default_prefetch_handler->DisablePrefetch();
1209  next->ResubmitOperation();
1210  }
1211  }
1212 
1213  {
1214  std::unique_lock lock(m_parent.m_default_prefetch_handler->m_prefetch_mutex);
1215  if (m_parent.m_last_prefetch_handler == this) {
1216  m_parent.m_last_prefetch_handler = nullptr;
1217  }
1218  if (!status || !status->IsOK()) {
1219  m_parent.m_prefetch_op.reset();
1220  m_parent.m_default_prefetch_handler->m_prefetch_enabled = false;
1221  }
1222  }
1223 
1224  if (m_handler) m_handler->HandleResponse(status, response);
1225  else delete response;
1226 }
1227 
1228 void
1229 File::PrefetchResponseHandler::ResubmitOperation()
1230 {
1231  m_parent.m_logger->Debug(kLogXrdClHttp, "Resubmitting waiting prefetch operations as new reads due to prefetch failure");
1232  PrefetchResponseHandler *next = this;
1233  while (next) {
1234  auto cur = next;
1235  auto st = next->m_parent.Read(next->m_offset, next->m_size, next->m_buffer, next->m_handler, next->m_timeout);
1236  if (!st.IsOK() && next->m_handler) {
1237  next->m_handler->HandleResponse(new XrdCl::XRootDStatus(st), nullptr);
1238  }
1239  {
1240  std::unique_lock lock(next->m_parent.m_default_prefetch_handler->m_prefetch_mutex);
1241  next = next->m_next;
1242  }
1243  delete cur;
1244  }
1245 }
1246 
1247 void
1248 File::PrefetchDefaultHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) {
1249  std::unique_ptr<XrdCl::AnyObject> response(response_raw);
1250  std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
1251  if (status && !status->IsOK()) {
1252  if ((status->code == XrdCl::errOperationExpired) && (status->GetErrorMessage().find("Transfer stalled for too long") != std::string::npos)) {
1253  m_prefetch_expired_count.fetch_add(1, std::memory_order_relaxed);
1254  m_logger->Debug(kLogXrdClHttp, "Prefetch data for %s went unused; disabling.", m_url.c_str());
1255  } else {
1256  m_prefetch_failed_count.fetch_add(1, std::memory_order_relaxed);
1257  m_logger->Warning(kLogXrdClHttp, "Disabling prefetch of %s due to error: %s", m_url.c_str(), status->ToStr().c_str());
1258  }
1259  }
1260  DisablePrefetch();
1261 }
1262 
1263 void
1264 File::PutDefaultHandler::HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
1265  delete response;
1266  if (status) {
1267  m_logger->Warning(kLogXrdClHttp, "Failing future write calls due to error: %s", status->ToStr().c_str());
1268  delete status;
1269  }
1270 }
1271 
1272 std::shared_ptr<XrdClHttp::HeaderCallout::HeaderList>
1273 File::HeaderCallout::GetHeaders(const std::string &verb,
1274  const std::string &url,
1275  const HeaderList &headers)
1276 {
1277  auto parent_callout = m_parent.m_header_callout.load(std::memory_order_acquire);
1278  std::shared_ptr<std::vector<std::pair<std::string, std::string>>> result_headers;
1279  if (parent_callout != nullptr) {
1280  result_headers = parent_callout->GetHeaders(verb, url, headers);
1281  } else {
1282  result_headers.reset(new std::vector<std::pair<std::string, std::string>>{});
1283  for (const auto & info : headers) {
1284  result_headers->emplace_back(info.first, info.second);
1285  }
1286  }
1287  if (m_parent.m_asize >= 0 && verb == "PUT") {
1288  if (!result_headers) {
1289  result_headers.reset(new std::vector<std::pair<std::string, std::string>>{});
1290  }
1291  auto iter = std::find_if(result_headers->begin(), result_headers->end(),
1292  [](const auto &pair) { return !strcasecmp(pair.first.c_str(), "Content-Length"); });
1293  if (iter == result_headers->end()) {
1294  result_headers->emplace_back("Content-Length", std::to_string(m_parent.m_asize));
1295  }
1296  } else if (!result_headers) {
1297  result_headers.reset(new std::vector<std::pair<std::string, std::string>>{});
1298  }
1299  return result_headers;
1300 }
1301 
1302 File::PutResponseHandler::PutResponseHandler(XrdCl::ResponseHandler *handler)
1303  : m_active_handler(handler)
1304 {}
1305 
1306 void
1307 File::PutResponseHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw)
1308 {
1309  std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
1310  std::unique_ptr<XrdCl::AnyObject> response(response_raw);
1311 
1312  // Note: if the handler owns the file object (as in the case of Pelican's writeback
1313  // response handler), then the callback may cause the file to be deleted - and hence
1314  // this instance of PutResponseHandler to be deleted. However, if m_active is true,
1315  // the destructor will wait until it's set to false; that cannot occur until we clear
1316  // m_active (in ProcessQueue or in the cleanup path for pending writes).
1317  //
1318  // Hence, we must ensure that we clear m_active and run any queue logic before invoking
1319  // callback handlers, which may delete this object or generate work in other threads.
1320 
1321  XrdCl::ResponseHandler *current_handler = nullptr;
1322  if (!status->IsOK()) {
1323  // Fail remaining (pending) handlers with the same error
1324  // Any writes attempts by the client after failure are set
1325  // are prompty declined
1326  std::vector<XrdCl::ResponseHandler *> pending_handlers;
1327  {
1328  std::lock_guard<std::mutex> lg(m_mutex);
1329  current_handler = m_active_handler;
1330  for (auto &[_, h] : m_pending_writes) {
1331  if (h) pending_handlers.push_back(h);
1332  }
1333 
1334  m_pending_writes.clear();
1335  m_active = false;
1336  m_active_handler = nullptr;
1337  m_cv.notify_all();
1338  }
1339 
1340  XrdCl::XRootDStatus status_copy(*status);
1341  if (current_handler) {
1342  current_handler->HandleResponse(status.release(), response.release());
1343  }
1344 
1345  for (auto *h : pending_handlers) {
1346  h->HandleResponse(new XrdCl::XRootDStatus(status_copy), nullptr);
1347  }
1348  return;
1349  }
1350 
1351  current_handler = m_active_handler;
1352  if (ProcessQueue() && current_handler) {
1353  current_handler->HandleResponse(status.release(), response.release());
1354  }
1355 }
1356 
1358 File::PutResponseHandler::QueueWrite(std::variant<std::pair<const void *, size_t>, XrdCl::Buffer> buffer, XrdCl::ResponseHandler *handler)
1359 {
1360  if (m_op->HasFailed()) {
1361  auto sc = m_op->GetStatusCode();
1362  if (HTTPStatusIsError(sc)){
1363  auto httpErr = HTTPStatusConvert(sc);
1364  auto err_msg = m_op->GetCurlErrorMessage();
1365  if (err_msg.empty()) {
1366  err_msg = m_op->GetStatusMessage();
1367  }
1368  return XrdCl::XRootDStatus(XrdCl::stError, httpErr.first, httpErr.second, err_msg);
1369  }
1370  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Cannot continue writing to open file after error");
1371  }
1372  std::lock_guard<std::mutex> lg(m_mutex);
1373  if (!m_active) {
1374  m_active = true;
1375  m_active_handler = handler;
1376  if (std::holds_alternative<XrdCl::Buffer>(buffer)) {
1377  if (!m_op->Continue(m_op, this, std::move(std::get<XrdCl::Buffer>(buffer)))) {
1378  m_active = false;
1379  m_cv.notify_all();
1380  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError, ENOSPC, "Cannot continue PUT operation");
1381  }
1382  } else {
1383  auto buffer_info = std::get<std::pair<const void *, size_t>>(buffer);
1384  if (!m_op->Continue(m_op, this, static_cast<const char *>(buffer_info.first), buffer_info.second)) {
1385  m_active = false;
1386  m_cv.notify_all();
1387  return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError, ENOSPC, "Cannot continue PUT operation");
1388  }
1389  }
1390  } else {
1391  m_pending_writes.emplace_back(std::move(buffer), handler);
1392  }
1393  return XrdCl::XRootDStatus{};
1394 }
1395 
1396 // Start the next pending write operation.
1397 bool
1398 File::PutResponseHandler::ProcessQueue() {
1399  std::lock_guard<std::mutex> lg(m_mutex);
1400  if (m_pending_writes.empty()) {
1401  // No pending writes; mark the operation as inactive.
1402  m_active = false;
1403  m_active_handler = nullptr;
1404  m_cv.notify_all();
1405  return true;
1406  }
1407 
1408  // Start the next pending write.
1409  auto & [buffer, handler] = m_pending_writes.front();
1410  bool rv;
1411  m_active_handler = handler;
1412  if (std::holds_alternative<XrdCl::Buffer>(buffer)) {
1413  rv = m_op->Continue(m_op, this, std::move(std::get<XrdCl::Buffer>(buffer)));
1414  } else {
1415  auto buffer_info = std::get<std::pair<const void *, size_t>>(buffer);
1416  rv = m_op->Continue(m_op, this, static_cast<const char *>(buffer_info.first), buffer_info.second);
1417  }
1418  m_pending_writes.pop_front();
1419  if (!rv) {
1420  // The continuation failed; mark the operation as inactive and
1421  // invoke all pending handlers with the error.
1422  if (m_active_handler) {
1423  m_active_handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError, ENOSPC, "Cannot continue PUT operation"), nullptr);
1424  }
1425  for (auto& [_, h] : m_pending_writes) {
1426  if (h) {
1427  h->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError, ENOSPC, "Cannot continue PUT operation"), nullptr);
1428  }
1429  }
1430  m_active = false;
1431  m_cv.notify_all();
1432  return false;
1433  }
1434  return true;
1435 }
1436 
1437 void
1438 File::PutResponseHandler::WaitForCompletion() {
1439  std::unique_lock lock(m_mutex);
1440  m_cv.wait(lock, [&]{return !m_active;});
1441 }
static std::string ts()
timestamp output for logging messages
Definition: XrdCephOss.cc:53
static void parent()
#define ResponseInfoProperty
nlohmann::json json
XrdOucString File
static void SetStallTimeout(int stall_interval)
static void SetMaintenancePeriod(unsigned maint)
struct timespec GetHeaderTimeout(time_t oper_timeout) const
virtual bool IsOpen() const override
virtual XrdCl::XRootDStatus Stat(bool force, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Open(const std::string &url, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode mode, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Fcntl(const XrdCl::Buffer &arg, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual bool GetProperty(const std::string &name, std::string &value) const override
static const struct timespec & GetMinimumHeaderTimeout()
virtual XrdCl::XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual ~File() noexcept
virtual XrdCl::XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual bool SetProperty(const std::string &name, const std::string &value) override
static const struct timespec & GetDefaultHeaderTimeout()
virtual XrdCl::XRootDStatus Close(XrdCl::ResponseHandler *handler, time_t timeout) override
static std::string GetMonitoringJson()
virtual XrdCl::XRootDStatus VectorRead(const XrdCl::ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
static struct timespec ParseHeaderTimeout(const std::string &header_value, XrdCl::Log *logger)
virtual XrdCl::XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
Definition: XrdClBuffer.hh:34
void FromString(const std::string str)
Fill the buffer from a string.
Definition: XrdClBuffer.hh:205
std::string ToString() const
Convert the buffer to a string.
Definition: XrdClBuffer.hh:215
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition: XrdClLog.hh:258
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Object stat info.
URL representation.
Definition: XrdClURL.hh:31
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
std::pair< uint16_t, uint32_t > HTTPStatusConvert(unsigned status)
bool ParseTimeout(const std::string &duration, struct timespec &, std::string &errmsg)
bool HTTPStatusIsError(unsigned status)
std::string MarshalDuration(const struct timespec &timeout)
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint16_t errOSError
Definition: XrdClStatus.hh:61
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
const uint64_t kLogXrdClHttp
Mode
Access mode.
Describe a data chunk for vector read.
uint64_t GetOffset() const
Get the offset.
uint32_t GetLength() const
Get the data length.
void * GetBuffer()
Get the buffer.
Flags
Open flags, may be or'd when appropriate.
@ Write
Open only for writing.
Code
XRootD query request codes.
@ OpaqueFile
Implementation dependent.
@ XAttr
Query file extended attributes.
@ Opaque
Implementation dependent.
@ Config
Query server configuration.
@ Stats
Query server stats.
@ ChecksumCancel
Query file checksum cancellation.
@ Checksum
Query file checksum.
@ Space
Query logical space stats.
@ Prepare
Query prepare status.
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97