XRootD
XrdHttpTpcMultistream.cc
Go to the documentation of this file.
1 
5 #include "XrdHttpTpcTPC.hh"
6 #include "XrdHttpTpcState.hh"
7 
8 #include "XrdSys/XrdSysError.hh"
9 
10 #include <curl/curl.h>
11 
12 #include <algorithm>
13 #include <sstream>
14 #include <stdexcept>
15 
16 
17 using namespace TPC;
18 
19 class CurlHandlerSetupError : public std::runtime_error {
20 public:
21  CurlHandlerSetupError(const std::string &msg) :
22  std::runtime_error(msg)
23  {}
24 
25  virtual ~CurlHandlerSetupError() noexcept {}
26 };
27 
28 namespace {
29 class MultiCurlHandler {
30 public:
31  MultiCurlHandler(std::vector<State*> &states, XrdSysError &log) :
32  m_handle(curl_multi_init()),
33  m_states(states),
34  m_log(log),
35  m_bytes_transferred(0),
36  m_error_code(0),
37  m_status_code(0)
38  {
39  if (m_handle == NULL) {
40  throw CurlHandlerSetupError("Failed to initialize a libcurl multi-handle");
41  }
42  m_avail_handles.reserve(states.size());
43  m_active_handles.reserve(states.size());
44  for (std::vector<State*>::const_iterator state_iter = states.begin();
45  state_iter != states.end();
46  state_iter++) {
47  m_avail_handles.push_back((*state_iter)->GetHandle());
48  }
49  }
50 
51  ~MultiCurlHandler()
52  {
53  if (!m_handle) {return;}
54  for (std::vector<CURL *>::const_iterator it = m_active_handles.begin();
55  it != m_active_handles.end();
56  it++) {
57  curl_multi_remove_handle(m_handle, *it);
58  }
59  curl_multi_cleanup(m_handle);
60  }
61 
62  MultiCurlHandler(const MultiCurlHandler &) = delete;
63 
64  CURLM *Get() const {return m_handle;}
65 
66  void FinishCurlXfer(CURL *curl) {
67  CURLMcode mres = curl_multi_remove_handle(m_handle, curl);
68  if (mres) {
69  std::stringstream ss;
70  ss << "Failed to remove transfer from set: "
71  << curl_multi_strerror(mres);
72  throw std::runtime_error(ss.str());
73  }
74  for (std::vector<State*>::iterator state_iter = m_states.begin();
75  state_iter != m_states.end();
76  state_iter++) {
77  if (curl == (*state_iter)->GetHandle()) {
78  m_bytes_transferred += (*state_iter)->BytesTransferred();
79  int error_code = (*state_iter)->GetErrorCode();
80  if (error_code && !m_error_code) {
81  m_error_code = error_code;
82  m_error_message = (*state_iter)->GetErrorMessage();
83  }
84  int status_code = (*state_iter)->GetStatusCode();
85  if (status_code >= 400 && !m_status_code) {
86  m_status_code = status_code;
87  m_error_message = (*state_iter)->GetErrorMessage();
88  }
89  (*state_iter)->ResetAfterRequest();
90  break;
91  }
92  }
93  for (std::vector<CURL *>::iterator iter = m_active_handles.begin();
94  iter != m_active_handles.end();
95  ++iter)
96  {
97  if (*iter == curl) {
98  m_active_handles.erase(iter);
99  break;
100  }
101  }
102  m_avail_handles.push_back(curl);
103  }
104 
105  off_t StartTransfers(off_t current_offset, off_t content_length, size_t block_size,
106  int &running_handles) {
107  bool started_new_xfer = false;
108  do {
109  size_t xfer_size = std::min(content_length - current_offset, static_cast<off_t>(block_size));
110  if (xfer_size == 0) {return current_offset;}
111  if (!(started_new_xfer = StartTransfer(current_offset, xfer_size))) {
112  // In this case, we need to start new transfers but weren't able to.
113  if (running_handles == 0) {
114  if (!CanStartTransfer(true)) {
115  m_log.Emsg("StartTransfers", "Unable to start transfers.");
116  }
117  }
118  break;
119  } else {
120  running_handles += 1;
121  }
122  current_offset += xfer_size;
123  } while (true);
124  return current_offset;
125  }
126 
127  int Flush() {
128  int last_error = 0;
129  for (std::vector<State*>::iterator state_it = m_states.begin();
130  state_it != m_states.end();
131  state_it++)
132  {
133  int error = (*state_it)->Flush();
134  if (error) {last_error = error;}
135  }
136  return last_error;
137  }
138 
139  off_t BytesTransferred() const {
140  return m_bytes_transferred;
141  }
142 
143  int GetStatusCode() const {
144  return m_status_code;
145  }
146 
147  int GetErrorCode() const {
148  return m_error_code;
149  }
150 
151  void SetErrorCode(int error_code) {
152  m_error_code = error_code;
153  }
154 
155  std::string GetErrorMessage() const {
156  return m_error_message;
157  }
158 
159  void SetErrorMessage(const std::string &error_msg) {
160  m_error_message = error_msg;
161  }
162 
163 private:
164 
165  bool StartTransfer(off_t offset, size_t size) {
166  if (!CanStartTransfer(false)) {return false;}
167  for (std::vector<CURL*>::const_iterator handle_it = m_avail_handles.begin();
168  handle_it != m_avail_handles.end();
169  handle_it++) {
170  for (std::vector<State*>::iterator state_it = m_states.begin();
171  state_it != m_states.end();
172  state_it++) {
173  if ((*state_it)->GetHandle() == *handle_it) { // This state object represents an idle handle.
174  (*state_it)->SetTransferParameters(offset, size);
175  ActivateHandle(**state_it);
176  return true;
177  }
178  }
179  }
180  return false;
181  }
182 
183  void ActivateHandle(State &state) {
184  CURL *curl = state.GetHandle();
185  m_active_handles.push_back(curl);
186  CURLMcode mres;
187  mres = curl_multi_add_handle(m_handle, curl);
188  if (mres) {
189  std::stringstream ss;
190  ss << "Failed to add transfer to libcurl multi-handle"
191  << curl_multi_strerror(mres);
192  throw std::runtime_error(ss.str());
193  }
194  for (auto iter = m_avail_handles.begin();
195  iter != m_avail_handles.end();
196  ++iter)
197  {
198  if (*iter == curl) {
199  m_avail_handles.erase(iter);
200  break;
201  }
202  }
203  }
204 
205  bool CanStartTransfer(bool log_reason) const {
206  size_t idle_handles = m_avail_handles.size();
207  size_t transfer_in_progress = 0;
208  for (std::vector<State*>::const_iterator state_iter = m_states.begin();
209  state_iter != m_states.end();
210  state_iter++) {
211  for (std::vector<CURL*>::const_iterator handle_iter = m_active_handles.begin();
212  handle_iter != m_active_handles.end();
213  handle_iter++) {
214  if (*handle_iter == (*state_iter)->GetHandle()) {
215  transfer_in_progress += (*state_iter)->BodyTransferInProgress();
216  break;
217  }
218  }
219  }
220  if (!idle_handles) {
221  if (log_reason) {
222  m_log.Emsg("CanStartTransfer", "Unable to start transfers as no idle CURL handles are available.");
223  }
224  return false;
225  }
226  ssize_t available_buffers = m_states[0]->AvailableBuffers();
227  // To be conservative, set aside buffers for any transfers that have been activated
228  // but don't have their first responses back yet.
229  available_buffers -= (m_active_handles.size() - transfer_in_progress);
230  if (log_reason && (available_buffers == 0)) {
231  std::stringstream ss;
232  ss << "Unable to start transfers as no buffers are available. Available buffers: " <<
233  m_states[0]->AvailableBuffers() << ", Active curl handles: " << m_active_handles.size()
234  << ", Transfers in progress: " << transfer_in_progress;
235  m_log.Emsg("CanStartTransfer", ss.str().c_str());
236  if (m_states[0]->AvailableBuffers() == 0) {
237  m_states[0]->DumpBuffers();
238  }
239  }
240  return available_buffers > 0;
241  }
242 
243  CURLM *m_handle;
244  std::vector<CURL *> m_avail_handles;
245  std::vector<CURL *> m_active_handles;
246  std::vector<State*> &m_states;
247  XrdSysError &m_log;
248  off_t m_bytes_transferred;
249  int m_error_code;
250  int m_status_code;
251  std::string m_error_message;
252 };
253 }
254 
255 
256 int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state,
257  size_t streams, std::vector<State*> &handles,
258  std::vector<ManagedCurlHandle> &curl_handles, TPCLogRecord &rec)
259 {
260  bool success;
261  // The content-length was set thanks to the call to GetContentLengthTPCPull() before calling this function
262  off_t content_size = state.GetContentLength();
263  off_t current_offset = 0;
264 
265  size_t concurrency = streams * m_pipelining_multiplier;
266 
267  handles.reserve(concurrency);
268  handles.push_back(new State());
269  handles[0]->Move(state);
270  for (size_t idx = 1; idx < concurrency; idx++) {
271  handles.push_back(handles[0]->Duplicate());
272  curl_handles.emplace_back(handles.back()->GetHandle());
273  }
274 
275  // Notify the packet marking manager that the transfer will start after this point
276  rec.pmarkManager.startTransfer();
277 
278  // Create the multi-handle and add in the current transfer to it.
279  MultiCurlHandler mch(handles, m_log);
280  CURLM *multi_handle = mch.Get();
281 
282  curl_multi_setopt(multi_handle, CURLMOPT_PIPELINING, 1);
283  curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS, streams);
284 
285  // Start response to client prior to the first call to curl_multi_perform
286  int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
287  if (retval) {
288  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
289  "Failed to send the initial response to the TPC client");
290  return retval;
291  } else {
292  logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
293  "Initial transfer response sent to the TPC client");
294  }
295 
296  // Start assigning transfers
297  int running_handles = 0;
298  current_offset = mch.StartTransfers(current_offset, content_size, m_block_size, running_handles);
299 
300  // Transfer loop: use curl to actually run the transfer, but periodically
301  // interrupt things to send back performance updates to the client.
302  time_t last_marker = 0;
303  // Track the time since the transfer last made progress
304  off_t last_advance_bytes = 0;
305  time_t last_advance_time = time(NULL);
306  time_t transfer_start = last_advance_time;
307  CURLcode res = static_cast<CURLcode>(-1);
308  CURLMcode mres = CURLM_OK;
309  do {
310  time_t now = time(NULL);
311  time_t next_marker = last_marker + m_marker_period;
312  if (now >= next_marker) {
313  if (current_offset > last_advance_bytes) {
314  last_advance_bytes = current_offset;
315  last_advance_time = now;
316  }
317  if (SendPerfMarker(req, rec, handles, current_offset)) {
318  logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
319  "Failed to send a perf marker to the TPC client");
320  return -1;
321  }
322  int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
323  if (now > last_advance_time + timeout) {
324  const char *log_prefix = rec.log_prefix.c_str();
325  bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
326 
327  mch.SetErrorCode(10);
328  std::stringstream ss;
329  ss << "Transfer failed because no bytes have been "
330  << (tpc_pull ? "received from the source (pull mode) in "
331  : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
332  mch.SetErrorMessage(ss.str());
333  break;
334  }
335  last_marker = now;
336  }
337 
338  mres = curl_multi_perform(multi_handle, &running_handles);
339  if (mres == CURLM_CALL_MULTI_PERFORM) {
340  // curl_multi_perform should be called again immediately. On newer
341  // versions of curl, this is no longer used.
342  continue;
343  } else if (mres != CURLM_OK) {
344  break;
345  }
346 
347  rec.pmarkManager.beginPMarks();
348 
349 
350  // Harvest any messages, looking for CURLMSG_DONE.
351  CURLMsg *msg;
352  do {
353  int msgq = 0;
354  msg = curl_multi_info_read(multi_handle, &msgq);
355  if (msg && (msg->msg == CURLMSG_DONE)) {
356  CURL *easy_handle = msg->easy_handle;
357  res = msg->data.result;
358  mch.FinishCurlXfer(easy_handle);
359  // If any requests fail, cut off the entire transfer.
360  if (res != CURLE_OK) {
361  break;
362  }
363  }
364  } while (msg);
365  if (res != static_cast<CURLcode>(-1) && res != CURLE_OK) {
366  std::stringstream ss;
367  ss << "Breaking loop due to failed curl transfer: " << curl_easy_strerror(res);
368  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_CURL_FAILURE",
369  ss.str());
370  break;
371  }
372 
373  if (running_handles < static_cast<int>(concurrency)) {
374  // Issue new transfers if there is still pending work to do.
375  // Otherwise, continue running until there are no handles left.
376  if (current_offset != content_size) {
377  current_offset = mch.StartTransfers(current_offset, content_size,
378  m_block_size, running_handles);
379  if (!running_handles) {
380  std::stringstream ss;
381  ss << "No handles are able to run. Streams=" << streams << ", concurrency="
382  << concurrency;
383 
384  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE", ss.str());
385  }
386  } else if (running_handles == 0) {
387  logTransferEvent(LogMask::Debug, rec, "MULTISTREAM_IDLE",
388  "Unable to start new transfers; breaking loop.");
389  break;
390  }
391  }
392 
393  int64_t max_sleep_time = next_marker - time(NULL);
394  if (max_sleep_time <= 0) {
395  continue;
396  }
397  int fd_count;
398  mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000,
399  &fd_count);
400  if (mres != CURLM_OK) {
401  break;
402  }
403  } while (running_handles);
404 
405  if (mres != CURLM_OK) {
406  std::stringstream ss;
407  ss << "Internal libcurl multi-handle error: "
408  << curl_multi_strerror(mres);
409  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", ss.str());
410  throw std::runtime_error(ss.str());
411  }
412 
413  // Harvest any messages, looking for CURLMSG_DONE.
414  CURLMsg *msg;
415  do {
416  int msgq = 0;
417  msg = curl_multi_info_read(multi_handle, &msgq);
418  if (msg && (msg->msg == CURLMSG_DONE)) {
419  CURL *easy_handle = msg->easy_handle;
420  mch.FinishCurlXfer(easy_handle);
421  if (res == CURLE_OK || res == static_cast<CURLcode>(-1))
422  res = msg->data.result; // Transfer result will be examined below.
423  }
424  } while (msg);
425 
426  if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
427  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
428  "Internal state error in libcurl");
429  throw std::runtime_error("Internal state error in libcurl");
430  }
431 
432  mch.Flush();
433 
434  rec.bytes_transferred = mch.BytesTransferred();
435  rec.tpc_status = mch.GetStatusCode();
436 
437  // Generate the final response back to the client.
438  std::stringstream ss;
439  success = false;
440  if (mch.GetStatusCode() >= 400) {
441  std::string err = mch.GetErrorMessage();
442  std::stringstream ss2;
443  ss2 << "Remote side failed with status code " << mch.GetStatusCode();
444  if (!err.empty()) {
445  std::replace(err.begin(), err.end(), '\n', ' ');
446  ss2 << "; error message: \"" << err << "\"";
447  }
448  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss.str());
449  ss << generateClientErr(ss2, rec);
450  } else if (mch.GetErrorCode()) {
451  std::string err = mch.GetErrorMessage();
452  if (err.empty()) {err = "(no error message provided)";}
453  else {std::replace(err.begin(), err.end(), '\n', ' ');}
454  std::stringstream ss2;
455  ss2 << "Error when interacting with local filesystem: " << err;
456  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
457  ss << generateClientErr(ss2, rec);
458  } else if (res != CURLE_OK) {
459  std::stringstream ss2;
460  ss2 << "Request failed when processing";
461  std::stringstream ss3;
462  ss3 << ss2.str() << ":" << curl_easy_strerror(res);
463  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss3.str());
464  ss << generateClientErr(ss2, rec, res);
465  } else if (current_offset != content_size) {
466  std::stringstream ss2;
467  ss2 << "Internal logic error led to early abort; current offset is " <<
468  current_offset << " while full size is " << content_size;
469  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_FAIL", ss2.str());
470  ss << generateClientErr(ss2, rec);
471  } else {
472  if (!handles[0]->Finalize()) {
473  std::stringstream ss2;
474  ss2 << "Failed to finalize and close file handle.";
475  ss << generateClientErr(ss2, rec);
476  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR",
477  ss2.str());
478  } else {
479  ss << "success: Created";
480  success = true;
481  }
482  }
483 
484  if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
485  logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
486  "Failed to send last update to remote client");
487  return retval;
488  } else if (success) {
489  logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
490  rec.status = 0;
491  }
492  return req.ChunkResp(NULL, 0);
493 }
494 
495 
496 int TPCHandler::RunCurlWithStreams(XrdHttpExtReq &req, State &state,
497  size_t streams, TPCLogRecord &rec)
498 {
499  std::vector<ManagedCurlHandle> curl_handles;
500  std::vector<State*> handles;
501  std::stringstream err_ss;
502  try {
503  int retval = RunCurlWithStreamsImpl(req, state, streams, handles, curl_handles, rec);
504  for (std::vector<State*>::iterator state_iter = handles.begin();
505  state_iter != handles.end();
506  state_iter++) {
507  delete *state_iter;
508  }
509  return retval;
510  } catch (CurlHandlerSetupError &e) {
511  for (std::vector<State*>::iterator state_iter = handles.begin();
512  state_iter != handles.end();
513  state_iter++) {
514  delete *state_iter;
515  }
516 
517  rec.status = 500;
518  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
519  std::stringstream ss;
520  ss << e.what();
521  err_ss << generateClientErr(ss, rec);
522  return req.SendSimpleResp(rec.status, NULL, NULL, e.what(), 0);
523  } catch (std::runtime_error &e) {
524  for (std::vector<State*>::iterator state_iter = handles.begin();
525  state_iter != handles.end();
526  state_iter++) {
527  delete *state_iter;
528  }
529 
530  logTransferEvent(LogMask::Error, rec, "MULTISTREAM_ERROR", e.what());
531  std::stringstream ss;
532  ss << e.what();
533  err_ss << generateClientErr(ss, rec);
534  int retval;
535  if ((retval = req.ChunkResp(err_ss.str().c_str(), 0))) {
536  return retval;
537  }
538  return req.ChunkResp(NULL, 0);
539  }
540 }
void CURL
#define Duplicate(x, y)
bool Debug
@ Error
CurlHandlerSetupError(const std::string &msg)
virtual ~CurlHandlerSetupError() noexcept
CURL * GetHandle() const
int GetErrorCode() const
off_t GetContentLength() const
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.