XRootD
XrdClHttpOpRead.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3 /* */
4 /* This file is part of the XrdClHttp client plugin for XRootD. */
5 /* */
6 /* XRootD is free software: you can redistribute it and/or modify it under */
7 /* the terms of the GNU Lesser General Public License as published by the */
8 /* Free Software Foundation, either version 3 of the License, or (at your */
9 /* option) any later version. */
10 /* */
11 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14 /* License for more details. */
15 /* */
16 /* The copyright holder's institutional names and contributor's names may not */
17 /* be used to endorse or promote products derived from this software without */
18 /* specific prior written permission of the institution or contributor. */
19 /******************************************************************************/
20 
21 #include "XrdClHttpOps.hh"
22 
23 #include <XrdCl/XrdClLog.hh>
25 #include <XrdOuc/XrdOucCRC.hh>
26 #include <XrdSys/XrdSysPageSize.hh>
27 
28 using namespace XrdClHttp;
29 
30 CurlReadOp::CurlReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
31  const std::string &url, struct timespec timeout, const std::pair<uint64_t, uint64_t> &op,
32  char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout,
33  HeaderCallout *header_callout) :
34  CurlOperation(handler, url, timeout, logger, callout, header_callout),
35  m_default_handler(default_handler),
36  m_op(op),
37  m_buffer(buffer),
38  m_buffer_size(sz)
39  {}
40 
41 bool
42 CurlReadOp::Continue(std::shared_ptr<CurlOperation> op, XrdCl::ResponseHandler *handler, char *buffer, size_t buffer_size)
43 {
44  if (op.get() != this) {
45  m_logger->Debug(kLogXrdClHttp, "Interface error: must provide shared pointer to self");
46  Fail(XrdCl::errInternal, 0, "Interface error: must provide shared pointer to self");
47  return false;
48  }
49  m_handler = handler;
50  m_buffer = buffer;
51  m_buffer_size = buffer_size;
52  m_written = 0;
53 
54  if (!m_prefetch_buffer.empty()) {
55  auto prefetch_remaining = m_prefetch_buffer.size() - m_prefetch_buffer_offset;
56  auto to_copy = prefetch_remaining > buffer_size ? buffer_size : prefetch_remaining;
57  m_written += to_copy;
58  memcpy(buffer, m_prefetch_buffer.data() + m_prefetch_buffer_offset, to_copy);
59  m_prefetch_buffer_offset += to_copy;
60  if (m_prefetch_buffer_offset == m_prefetch_buffer.size()) {
61  m_prefetch_buffer.clear();
62  m_prefetch_buffer_offset = 0;
63  }
64  }
65 
66  // This handles the case where the transfer finished but its last WriteCallback
67  // produced more data than the client buffer could hold, the excess being stored
68  // in the prefetch buffer
69  // we just need to deliver the response without re-queuing to the continue queue
70  if (op->IsDone()) {
71  DeliverResponse();
72  } else {
73  try {
74  m_continue_queue->Produce(op);
75  } catch (...) {
76  Fail(XrdCl::errInternal, ENOMEM, "Failed to continue the curl operation");
77  return false;
78  }
79  }
80 
81  return true;
82 }
83 
84 bool
86 {
87  if (IsDone()) {
88  return false;
89  }
90  if (!m_curl) {
91  return false;
92  }
93 
94  CURLcode rc;
95  if ((rc = curl_easy_pause(m_curl.get(), CURLPAUSE_CONT)) != CURLE_OK) {
96  m_logger->Error(kLogXrdClHttp, "Failed to continue a paused handle: %s", curl_easy_strerror(rc));
97  return false;
98  }
99  SetPaused(false);
100  return m_curl.get();
101  }
102 
103 bool
105 {
106  if (!CurlOperation::Setup(curl, worker)) {return false;}
107 
108  curl_easy_setopt(m_curl.get(), CURLOPT_WRITEFUNCTION, CurlReadOp::WriteCallback);
109  curl_easy_setopt(m_curl.get(), CURLOPT_WRITEDATA, this);
110 
111  // Note: range requests are inclusive of the end byte, meaning "bytes=0-1023" is a 1024-byte request.
112  // This is why we subtract '1' off the end.
113  if (m_op.second == 0) {
114  Success();
115  return true;
116  }
117  if (m_op.second >= 1024*1024) {
118  curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024);
119  }
120  else if (m_op.second >= 256*1024) {
121  curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 64*1024);
122  }
123  else if (m_op.second >= 128*1024) {
124  curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 32*1024);
125  }
126  // If the requested read size is UINT64_MAX, it means read the entire object;
127  // in this case, we do not set the Range header.
128  if (m_op.second != UINT64_MAX) {
129  auto range_req = "bytes=" + std::to_string(m_op.first) + "-" + std::to_string(m_op.first + m_op.second - 1);
130  m_headers_list.emplace_back("Range", range_req);
131  }
132 
133  return true;
134 }
135 
136 void
137 CurlReadOp::Fail(uint16_t errCode, uint32_t errNum, const std::string &msg)
138 {
139  std::string custom_msg = msg;
140  SetDone(true);
141  if (m_handler == nullptr && m_default_handler == nullptr) {return;}
142  if (!custom_msg.empty()) {
143  m_logger->Debug(kLogXrdClHttp, "curl operation at offset %llu failed with message: %s%s", static_cast<long long unsigned>(m_op.first), msg.c_str(), m_err_msg.empty() ? "" : (", server message: " + m_err_msg).c_str());
144  custom_msg += " (read operation at offset " + std::to_string(static_cast<long long unsigned>(m_op.first)) + ")";
145  } else {
146  m_logger->Debug(kLogXrdClHttp, "curl operation at offset %llu failed with status code %d%s", static_cast<long long unsigned>(m_op.first), errNum, m_err_msg.empty() ? "" : (", server message: " + m_err_msg).c_str());
147  }
148  auto status = new XrdCl::XRootDStatus(XrdCl::stError, errCode, errNum, custom_msg);
149  auto handle = m_handler;
150  m_handler = nullptr;
151  if (handle) handle->HandleResponse(status, nullptr);
152  else m_default_handler->HandleResponse(status, nullptr);
153 }
154 
155 void
156 CurlReadOp::DeliverResponse()
157 {
158  if (m_handler == nullptr) {return;}
159  auto handle = m_handler;
160  auto status = new XrdCl::XRootDStatus();
161 
162  auto chunk_info = new XrdCl::ChunkInfo(m_op.first + m_prefetch_object_offset, m_written, m_buffer);
163  m_prefetch_object_offset += m_written;
164  auto obj = new XrdCl::AnyObject();
165  obj->Set(chunk_info);
166 
167  // Reset the internal buffers to avoid writes to locations we do not own
168  m_buffer = nullptr;
169  m_buffer_size = 0;
170 
171  m_handler = nullptr;
172  // Note: As soon as this is invoked, another thread may continue and start to manipulate
173  // the CurlReadOp object. To avoid race conditions, all reads/writes to member data must
174  // be done *before* the callback is invoked.
175  handle->HandleResponse(status, obj);
176 }
177 
178 void
180 {
181  SetPaused(true);
182  if (m_handler == nullptr) {
183  m_logger->Warning(kLogXrdClHttp, "Get operation paused with no callback handler");
184  return;
185  }
186  DeliverResponse();
187 }
188 
189 void
191 {
192  SetDone(false);
193  if (m_handler == nullptr) {return;}
194  auto status = new XrdCl::XRootDStatus();
195  auto chunk_info = new XrdCl::ChunkInfo(m_op.first + m_prefetch_object_offset, m_written, m_buffer);
196  m_prefetch_object_offset += m_written;
197  auto obj = new XrdCl::AnyObject();
198  obj->Set(chunk_info);
199  auto handle = m_handler;
200  m_handler = nullptr;
201  handle->HandleResponse(status, obj);
202 }
203 
204 void
206 {
207  if (m_curl == nullptr) return;
208  curl_easy_setopt(m_curl.get(), CURLOPT_WRITEFUNCTION, nullptr);
209  curl_easy_setopt(m_curl.get(), CURLOPT_WRITEDATA, nullptr);
210  curl_easy_setopt(m_curl.get(), CURLOPT_HTTPHEADER, nullptr);
211  curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, nullptr);
212  curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETDATA, nullptr);
213  curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTFUNCTION, nullptr);
214  curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTDATA, nullptr);
216 }
217 
218 size_t
219 CurlReadOp::WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr)
220 {
221  return static_cast<CurlReadOp*>(this_ptr)->Write(buffer, size * nitems);
222 }
223 
224 size_t
225 CurlReadOp::Write(char *buffer, size_t length)
226 {
227  //m_logger->Debug(kLogXrdClHttp, "Received a write of size %ld with offset %lld; total received is %ld; remaining is %ld", static_cast<long>(length), static_cast<long long>(m_op.first), static_cast<long>(length + m_written), static_cast<long>(m_op.second - length - m_written));
229  return FailCallback(kXR_ServerError, "Server responded with a multipart byterange which is not supported");
230  }
231  if (m_written == 0 && (m_headers.GetOffset() != m_op.first)) {
232  return FailCallback(kXR_ServerError, "Server did not return content with correct offset");
233  }
234  // If the operation failed, do not copy the body of the response into the buffer; it is likely
235  // an error message and not what we want to provide to the consumer buffer.
236  if (m_headers.GetStatusCode() > 299) {
237  // Record error message; prevent the server from spamming overly-long responses as we
238  // buffer them in memory.
239  if (m_err_msg.size() < 4*1024) {
240  m_err_msg.append(buffer, length);
241  }
242  UpdateBytes(length);
243  return length;
244  }
245  // The write callback is "all or nothing". Either you accept the whole thing (buffering
246  // in m_prefetch_buffer any data that the client-provided buffer is too small to accept)
247  // or you return CURL_WRITEFUNC_PAUSE and the delivery will be retried the next time the
248  // handle is unpaused.
249  //
250  // If `m_buffer` is nullptr, then it indicates we are unpaused while there is no ongoing
251  // File::Read operation; this typically happens when the transfer timeout occurs. Simply
252  // re-pause the transfer to go through the libcurl state machine and trigger the failure.
253  if (!m_buffer || (m_buffer_size == m_written)) {
254  Pause();
255  return CURL_WRITEFUNC_PAUSE;
256  }
257  UpdateBytes(length);
258  auto output_remaining = m_buffer_size - m_written;
259  auto larger_than_result_buffer = length > output_remaining;
260  auto to_copy = larger_than_result_buffer ? output_remaining : length;
261  memcpy(m_buffer + m_written, buffer, to_copy);
262  m_written += to_copy;
263  // We don't have enough space in the buffer to write the response and this is a single-shot
264  // read request
265  if ((m_op.second <= m_buffer_size) && larger_than_result_buffer) {
266  return FailCallback(kXR_ServerError, "Server sent back more data than requested");
267  } else if (larger_than_result_buffer) {
268  auto input_remaining = length - output_remaining;
269  m_prefetch_buffer.append(buffer + to_copy, input_remaining);
270  m_prefetch_buffer_offset = 0;
271  }
272  return length;
273 }
274 
275 void
277 {
278  SetDone(false);
279  if (m_handler == nullptr) {return;}
280  auto status = new XrdCl::XRootDStatus();
281 
282  std::vector<uint32_t> cksums;
283  size_t nbpages = m_written / XrdSys::PageSize;
284  if (m_written % XrdSys::PageSize) ++nbpages;
285  cksums.reserve(nbpages);
286 
287  auto buffer = m_buffer;
288  size_t size = m_written;
289  for (size_t pg=0; pg<nbpages; ++pg)
290  {
291  auto pgsize = static_cast<size_t>(XrdSys::PageSize);
292  if (pgsize > size) pgsize = size;
293  cksums.push_back(XrdOucCRC::Calc32C(buffer, pgsize));
294  buffer += pgsize;
295  size -= pgsize;
296  }
297 
298  auto page_info = new XrdCl::PageInfo(m_op.first, m_written, m_buffer, std::move(cksums));
299  auto obj = new XrdCl::AnyObject();
300  obj->Set(page_info);
301  auto handle = m_handler;
302  m_handler = nullptr;
303  handle->HandleResponse(status, obj);
304 }
@ kXR_ServerError
Definition: XProtocol.hh:1044
void CURL
void SetDone(bool has_failed)
int FailCallback(XErrorCode ecode, const std::string &emsg)
std::unique_ptr< CURL, void(*)(CURL *)> m_curl
virtual void ReleaseHandle()
void UpdateBytes(uint64_t bytes)
std::vector< std::pair< std::string, std::string > > m_headers_list
XrdCl::ResponseHandler * m_handler
void SetPaused(bool paused)
virtual bool Setup(CURL *curl, CurlWorker &)
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
std::pair< uint64_t, uint64_t > m_op
void Success() override
bool Setup(CURL *curl, CurlWorker &) override
std::shared_ptr< XrdClHttp::HandlerQueue > m_continue_queue
bool ContinueHandle() override
CurlReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, struct timespec timeout, const std::pair< uint64_t, uint64_t > &op, char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
bool Continue(std::shared_ptr< CurlOperation > op, XrdCl::ResponseHandler *handler, char *buffer, size_t buffer_size)
void ReleaseHandle() override
uint64_t GetOffset() const
bool IsMultipartByterange() const
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
static const int PageSize
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
const uint64_t kLogXrdClHttp
Describe a data chunk for vector read.