XRootD
XrdClHttpOpReadV.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3 /* */
4 /* This file is part of the XrdClHttp client plugin for XRootD. */
5 /* */
6 /* XRootD is free software: you can redistribute it and/or modify it under */
7 /* the terms of the GNU Lesser General Public License as published by the */
8 /* Free Software Foundation, either version 3 of the License, or (at your */
9 /* option) any later version. */
10 /* */
11 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14 /* License for more details. */
15 /* */
16 /* The copyright holder's institutional names and contributor's names may not */
17 /* be used to endorse or promote products derived from this software without */
18 /* specific prior written permission of the institution or contributor. */
19 /******************************************************************************/
20 
21 #include "XrdClHttpOps.hh"
22 
23 #include <XrdCl/XrdClLog.hh>
25 #include <XrdOuc/XrdOucCRC.hh>
26 #include <XrdSys/XrdSysPageSize.hh>
27 
28 using namespace XrdClHttp;
29 
30 CurlVectorReadOp::CurlVectorReadOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
31  const XrdCl::ChunkList &op_list, XrdCl::Log *logger, CreateConnCalloutType callout,
32  HeaderCallout *header_callout) :
33  CurlOperation(handler, url, timeout, logger, callout, header_callout),
34  m_vr(new XrdCl::VectorReadInfo()),
35  m_chunk_list(op_list)
36  {}
37 
38 bool
40 {
41  if (!CurlOperation::Setup(curl, worker)) return false;
42  curl_easy_setopt(m_curl.get(), CURLOPT_WRITEFUNCTION, CurlVectorReadOp::WriteCallback);
43  curl_easy_setopt(m_curl.get(), CURLOPT_WRITEDATA, this);
44 
45  std::stringstream ss;
46  auto multiple = false;
47  for (const auto &chunk : m_chunk_list) {
48  if (!chunk.GetLength()) continue;
49  if (multiple) {ss << ",";}
50  ss << chunk.GetOffset() << "-" << chunk.GetOffset() + chunk.GetLength() - 1;
51  multiple = true;
52  }
53  auto byte_range_val = ss.str();
54  if (byte_range_val.size()) {
55  m_headers_list.emplace_back("Range", "bytes=" + byte_range_val);
56  }
57  return true;
58 }
59 
60 void
61 CurlVectorReadOp::Fail(uint16_t errCode, uint32_t errNum, const std::string &msg)
62 {
63  std::string custom_msg = msg;
64  SetDone(true);
65  if (m_handler == nullptr) {return;}
66  std::string offset = "(unknown)";
67  std::string length = "(unknown)";
68  if (!m_chunk_list.empty()) {
69  offset = std::to_string(m_chunk_list[0].GetOffset());
70  length = std::to_string(m_chunk_list[0].GetLength());
71  }
72  if (!custom_msg.empty()) {
73  m_logger->Debug(kLogXrdClHttp, "curl operation with vector starting offset %s / length %s failed with message: %s", offset.c_str(), length.c_str(), custom_msg.c_str());
74  custom_msg += " (vector read operation starting at offset " + offset + " / length " + length + ")";
75  } else {
76  m_logger->Debug(kLogXrdClHttp, "curl vector operation starting at offset %s / length %s failed with status code %d", offset.c_str(), length.c_str(), errNum);
77  }
78  auto status = new XrdCl::XRootDStatus(XrdCl::stError, errCode, errNum, custom_msg);
79  auto handle = m_handler;
80  m_handler = nullptr;
81  handle->HandleResponse(status, nullptr);
82 }
83 
84 void
86 {
87  SetDone(false);
88  if (m_handler == nullptr) {return;}
89 
90  // If there's a partial last response, give it to the client.
91  if (m_chunk_buffer_idx) {
92  auto &chunk = m_chunk_list[m_response_idx];
93  m_vr->GetChunks().emplace_back(chunk.GetOffset(), m_chunk_buffer_idx, chunk.GetBuffer());
95  }
96 
97  auto status = new XrdCl::XRootDStatus();
98  m_vr->SetSize(m_bytes_consumed);
99  auto obj = new XrdCl::AnyObject();
100  obj->Set(m_vr.release());
101  auto handle = m_handler;
102  m_handler = nullptr;
103  handle->HandleResponse(status, obj);
104 }
105 
106 void
108 {
109  if (m_curl == nullptr) return;
110  curl_easy_setopt(m_curl.get(), CURLOPT_WRITEFUNCTION, nullptr);
111  curl_easy_setopt(m_curl.get(), CURLOPT_WRITEDATA, nullptr);
112  curl_easy_setopt(m_curl.get(), CURLOPT_HTTPHEADER, nullptr);
113  curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, nullptr);
114  curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETDATA, nullptr);
115  curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTFUNCTION, nullptr);
116  curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTDATA, nullptr);
118 }
119 
120 size_t
121 CurlVectorReadOp::WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr)
122 {
123  return static_cast<CurlVectorReadOp*>(this_ptr)->Write(buffer, size * nitems);
124 }
125 
126 // Given a buffer of data from curl, parse it and write it to the response buffers.
127 size_t
128 CurlVectorReadOp::Write(char *orig_buffer, size_t orig_length)
129 {
130  UpdateBytes(orig_length);
131  //m_logger->Debug(kLogXrdClHttp, "Received a write of size %ld with contents:\n%s", static_cast<long>(orig_length), std::string(orig_buffer, orig_length).c_str());
132 
133  // Handle the (hopefully uncommon) cases where the server responds to a vector read op
134  // with a single response. We set the length of the response to the max as we
135  // don't care how many bytes the server actually sends.
136  if (GetStatusCode() == 200) {
137  m_current_op.first = 0;
138  m_current_op.second = std::numeric_limits<off_t>::max();
139  } else if (HTTPStatusIsError(GetStatusCode())) {
140  return orig_length;
141  } else if (!m_headers.IsMultipartByterange()) {
142  m_current_op.first = m_headers.GetOffset();
143  m_current_op.second = std::numeric_limits<off_t>::max();
144  }
145 
146  auto buffer = orig_buffer;
147  auto length = orig_length;
148 
149  while (length) {
150  // If we're in the middle of a response chunk, copy as much data as possible.
151  if (m_current_op.first != -1 && m_current_op.second != -1) {
152  //m_logger->Debug(kLogXrdClHttp, "Processing response buffer of (%lld, %lld)", static_cast<long long>(m_current_op.first), static_cast<long long>(m_current_op.second));
153  if (m_skip_bytes) {
154  //m_logger->Debug(kLogXrdClHttp, "Skipping %lld bytes", static_cast<long long>(m_skip_bytes));
155  auto to_skip = (m_skip_bytes < length) ? m_skip_bytes : length;
156  buffer += to_skip;
157  length -= to_skip;
158  m_skip_bytes -= to_skip;
159  continue;
160  } else {
161  auto &chunk = m_chunk_list[m_response_idx];
162  auto remaining = static_cast<off_t>(chunk.GetLength()) - m_chunk_buffer_idx;
163  if (remaining < 0) {
164  return FailCallback(kXR_ServerError, "Invalid chunk framing");
165  }
166  auto to_copy = (static_cast<size_t>(remaining) < length) ? static_cast<size_t>(remaining) : length;
167  //m_logger->Debug(kLogXrdClHttp, "Copying %lld bytes to request buffer %ld at offset %lld", static_cast<long long>(to_copy), m_response_idx, static_cast<long long>(m_chunk_buffer_idx));
168  memcpy(static_cast<char *>(chunk.GetBuffer()) + m_chunk_buffer_idx, buffer, to_copy);
169  m_chunk_buffer_idx += to_copy;
170  buffer += to_copy;
171  length -= to_copy;
172  // Handle cases where the requested or response chunk is complete
173  if (chunk.GetLength() == m_chunk_buffer_idx) {
174  m_vr->GetChunks().emplace_back(chunk.GetOffset(), m_chunk_buffer_idx, chunk.GetBuffer());
176  m_chunk_buffer_idx = 0;
177  m_response_idx++;
178  if (m_current_op.second == chunk.GetLength()) {
179  m_current_op.first = m_current_op.second = -1;
180  } else {
181  // We may need to skip the remaining bytes or, potentially, the server
182  // coalesced two adjacent requests into one larger response.
183  m_current_op.first += chunk.GetLength();
184  m_current_op.second -= chunk.GetLength();
185  CalculateNextBuffer();
186  continue;
187  }
188  } else if (m_current_op.second == m_chunk_buffer_idx) {
189  // There are no more bytes in the response but the requested chunk hasn't finished.
190  // Add what we have to the results and create a new chunk on the request list from the remainder; perhaps
191  // the server will send it in the future.
192  m_chunk_list.emplace_back(chunk.GetOffset() + m_chunk_buffer_idx, chunk.GetLength() - m_chunk_buffer_idx, static_cast<char*>(chunk.GetBuffer()) + m_chunk_buffer_idx);
193  m_vr->GetChunks().emplace_back(chunk.GetOffset(), m_chunk_buffer_idx, chunk.GetBuffer());
195  m_chunk_buffer_idx = 0;
196  m_current_op.first = m_current_op.second = -1;
197  m_response_idx++;
198  }
199  }
200  }
201  if (m_skip_bytes) {
202  continue;
203  }
204 
205  // We are at the boundary between chunks; we must parse header lines to understand the
206  // next thing to do.
207 
208  // The following lambda function returns a string view to the next complete header line,
209  // potentially partially from the previous buffer from curl. If the second item in
210  // the returned pair is false, then we ran out of buffer from curl before finding a
211  // complete line.
212  auto get_next_line = [&]() {
213  std::string_view chunk_header(buffer, length);
214  auto pos = chunk_header.find("\r\n");
215  if (pos == std::string_view::npos) {
216  m_response_headers += chunk_header;
217  length = 0;
218  return std::make_pair(std::string_view(), false);
219  } else {
220  auto line = chunk_header.substr(0, pos);
221  if (!m_response_headers.empty()) {
222  m_response_headers += line;
223  line = m_response_headers;
224  }
225  buffer += pos + 2;
226  length -= pos + 2;
227  return std::make_pair(line, true);
228  }
229  };
230 
231  // Consume the boundary line.
232  bool last_segment = false;
233  while (true) {
234  auto [line, ok] = get_next_line();
235  if (!ok) {
236  return orig_length;
237  }
238  // Per RFC7233, Appendix A, Implementation note 1, multiple CRLF might precede the
239  // first boundary string in the body. However, the XRootD server appears to have an
240  // extra CRLF in front of every boundary string.
241  if (line.empty()) {continue;}
242  if (line == m_headers.MultipartSeparator()) {
243  break;
244  }
245  if (line == m_headers.MultipartSeparator() + "--") {
246  last_segment = true;
247  break;
248  }
249  std::stringstream ss;
250  ss << "Server has responded with an invalid boundary line: '" << line << "' (expected '" << m_headers.MultipartSeparator() << "')";
251  return FailCallback(kXR_ServerError, ss.str());
252  }
253  if (last_segment) {
254  length = 0;
255  break;
256  }
257  // Consume the header lines
258  while (true) {
259  auto [line, ok] = get_next_line();
260  if (!ok) {
261  return orig_length;
262  }
263  if (line.empty()) {
264  break;
265  }
266  auto header_name_end = line.find(':');
267  if (header_name_end == std::string_view::npos) {
268  std::stringstream ss; ss << "Invalid header line in response from server: " << line;
269  return FailCallback(kXR_ServerError, ss.str());
270  }
271  auto header_name = line.substr(0, header_name_end);
272  // Cannot use strcasecmp here as a string_view's data is not necessarily nul-terminated.
273  // len("content-type") == 13
274  if (header_name.size() != 13 || strncasecmp(header_name.data(), "content-range", 13)) {
275  continue;
276  }
277  // We are parsing a Content-Range value.
278  // Example: Content-Range: bytes 7000-7999/8000
279  auto value = line.substr(header_name_end + 1);
280 
281  // Advance whitespace
282  while (!value.empty() && value[0] == ' ') {
283  value = value.substr(1);
284  }
285 
286  if (value.substr(0, 5) != "bytes") {
287  std::stringstream ss; ss << "Invalid Content-Range value (no 'bytes' unit): " << value;
288  return FailCallback(kXR_ServerError, ss.str());
289  }
290 
291  value = value.substr(5);
292  while (!value.empty() && value[0] == ' ') {
293  value = value.substr(1);
294  }
295 
296  // Example: 500-999/8000
297  size_t count;
298  long long bytes_val;
299  try {
300  // It may seem strange to see the string_view data being passed to std::stoll here
301  // as it's not guaranteed to be null-terminated. However, by this point, we do know
302  // there's a CRLF in the buffer -- that is sufficient to guarantee the stoll search
303  // terminates before it goes out-of-bounds.
304  bytes_val = std::stoll(value.data(), &count);
305  } catch (std::invalid_argument &) {
306  std::stringstream ss; ss << "Invalid Content-Range value (no integer in range start): " << value;
307  return FailCallback(kXR_ServerError, ss.str());
308  } catch (std::out_of_range &) {
309  std::stringstream ss; ss << "Invalid Content-Range value (out of range): " << value;
310  return FailCallback(kXR_ServerError, ss.str());
311  }
312  if (value.size() <= count || value[count] != '-') {
313  std::stringstream ss; ss << "Invalid Content-Range value (no dash in range): " << value;
314  return FailCallback(kXR_ServerError, ss.str());
315  }
316  m_current_op.first = bytes_val;
317  value = value.substr(count + 1);
318  try {
319  bytes_val = std::stoll(value.data(), &count);
320  } catch (std::invalid_argument &) {
321  std::stringstream ss; ss << "Invalid Content-Range value (no integer in range end): " << value;
322  return FailCallback(kXR_ServerError, ss.str());
323  } catch (std::out_of_range &) {
324  std::stringstream ss; ss << "Invalid Content-Range value (out of range in range end): " << value;
325  return FailCallback(kXR_ServerError, ss.str());
326  }
327  if (value.size() <= count || value[count] != '/') {
328  std::stringstream ss; ss << "Invalid Content-Range value (no trailing /): " << value;
329  return FailCallback(kXR_ServerError, ss.str());
330  }
331  auto length = bytes_val + 1 - m_current_op.first;
332  if (length < 0) {
333  std::stringstream ss; ss << "Invalid Content-Range value (negative length): " << line;
334  return FailCallback(kXR_ServerError, ss.str());
335  }
336  if (length > std::numeric_limits<decltype(m_current_op.second)>::max()) {
337  std::stringstream ss; ss << "Invalid Content-Range value (length too long): " << line;
338  return FailCallback(kXR_ServerError, ss.str());
339  }
340  m_current_op.second = length;
341 
342  // We now have a valid response range; locate a buffer where we will copy the bytes into.
343  CalculateNextBuffer();
344  }
345  // Check to see if the Content-Range was missing.
346  if (!last_segment && (m_current_op.first == -1 || m_current_op.second == -1)) {
347  return FailCallback(kXR_ServerError, "Response segment is missing a Content-Range header");
348  }
349  }
350  return orig_length;
351 }
352 
353 void CurlVectorReadOp::CalculateNextBuffer() {
354  // Strategy is to select the index where we will throw away the fewest bytes.
355  off_t distance = std::numeric_limits<off_t>::max();
356  auto starting_idx = m_response_idx;
357  for (decltype(m_chunk_list)::size_type ctr=0; ctr<m_chunk_list.size(); ctr++) {
358  auto idx = (starting_idx + ctr) % m_chunk_list.size();
359  if (static_cast<uint64_t>(m_current_op.first) == m_chunk_list[idx].GetOffset()) {
360  m_response_idx = idx;
361  distance = 0;
362  break;
363  }
364  off_t bytes_to_skip = static_cast<off_t>(m_chunk_list[idx].GetOffset()) - m_current_op.first;
365  //m_logger->Debug(kLogXrdClHttp, "Using client request at index %lu would require us to skip %lld bytes", idx, static_cast<long long>(bytes_to_skip));
366  if (bytes_to_skip > 0 && bytes_to_skip < distance) {
367  distance = bytes_to_skip;
368  m_response_idx = idx;
369  // Note we don't break; some other request might be a better fit.
370  }
371  }
372  m_chunk_buffer_idx = 0;
373  if (distance > 0) {
374  m_skip_bytes = distance;
375  } else {
376  m_skip_bytes = 0;
377  }
378 }
@ kXR_ServerError
Definition: XProtocol.hh:1044
void CURL
if(Avsz)
void SetDone(bool has_failed)
int FailCallback(XErrorCode ecode, const std::string &emsg)
std::unique_ptr< CURL, void(*)(CURL *)> m_curl
virtual void ReleaseHandle()
void UpdateBytes(uint64_t bytes)
std::vector< std::pair< std::string, std::string > > m_headers_list
XrdCl::ResponseHandler * m_handler
virtual bool Setup(CURL *curl, CurlWorker &)
size_t Write(char *buffer, size_t size)
CurlVectorReadOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, const XrdCl::ChunkList &op_list, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
XrdCl::ChunkList m_chunk_list
std::pair< off_t, off_t > m_current_op
std::unique_ptr< XrdCl::VectorReadInfo > m_vr
bool Setup(CURL *curl, CurlWorker &) override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
uint64_t GetOffset() const
bool IsMultipartByterange() const
const std::string & MultipartSeparator() const
Handle diagnostics.
Definition: XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
bool HTTPStatusIsError(unsigned status)
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
std::vector< ChunkInfo > ChunkList
List of chunks.
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
const uint64_t kLogXrdClHttp