XRootD
XrdHttpTpcState.cc
Go to the documentation of this file.
1 
2 #include <algorithm>
3 #include <sstream>
4 #include <stdexcept>
5 
6 #include "XrdVersion.hh"
9 #include "XrdOuc/XrdOucTUtils.hh"
10 
11 #include <curl/curl.h>
12 
13 #include "XrdHttpTpcState.hh"
14 #include "XrdHttpTpcStream.hh"
15 
17 
18 using namespace TPC;
19 
20 
22  if (m_headers) {
23  curl_slist_free_all(m_headers);
24  m_headers = NULL;
25  if (m_curl) {curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers);}
26  }
27 }
28 
29 
30 void State::Move(State &other)
31 {
32  m_push = other.m_push;
33  m_recv_status_line = other.m_recv_status_line;
34  m_recv_all_headers = other.m_recv_all_headers;
35  m_offset = other.m_offset;
36  m_start_offset = other.m_start_offset;
37  m_status_code = other.m_status_code;
38  m_content_length = other.m_content_length;
39  m_push_length = other.m_push_length;
40  m_stream = other.m_stream;
41  m_curl = other.m_curl;
42  m_headers = other.m_headers;
43  m_headers_copy = other.m_headers_copy;
44  m_resp_protocol = other.m_resp_protocol;
45  m_is_transfer_state = other.m_is_transfer_state;
46  curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this);
47  if (m_is_transfer_state) {
48  if (m_push) {
49  curl_easy_setopt(m_curl, CURLOPT_READDATA, this);
50  } else {
51  curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this);
52  }
53  }
54  tpcForwardCreds = other.tpcForwardCreds;
55  other.m_headers_copy.clear();
56  other.m_curl = NULL;
57  other.m_headers = NULL;
58  other.m_stream = NULL;
59  other.m_repr_digests = m_repr_digests;
60 }
61 
62 
63 bool State::InstallHandlers(CURL *curl) {
64  curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XrdVERSION);
65  curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &State::HeaderCB);
66  curl_easy_setopt(curl, CURLOPT_HEADERDATA, this);
67  if(m_is_transfer_state) {
68  if (m_push) {
69  curl_easy_setopt(curl, CURLOPT_UPLOAD, 1);
70  curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB);
71  curl_easy_setopt(curl, CURLOPT_READDATA, this);
72  curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::PushRespCB);
73  curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
74  struct stat buf;
75  if (SFS_OK == m_stream->Stat(&buf)) {
76  m_push_length = buf.st_size;
77  curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size);
78  }
79  } else {
80  curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB);
81  curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
82  }
83  }
84  curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
85  if(tpcForwardCreds) {
86  curl_easy_setopt(curl,CURLOPT_UNRESTRICTED_AUTH,1L);
87  }
88 
89  // Only use low-speed limits with libcurl v7.38 or later.
90  // Older versions have poor transfer performance, corrected in curl commit cacdc27f.
91  curl_version_info_data *curl_ver = curl_version_info(CURLVERSION_NOW);
92  if (curl_ver->age > 0 && curl_ver->version_num >= 0x072600) {
93  // Require a minimum speed from the transfer: 2 minute average must at least 10KB/s
94  curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 2*60);
95  curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 10*1024);
96  }
97  return true;
98 }
99 
108  struct curl_slist *list = NULL;
109  for (const auto & [header,value]: req.headers) {
110  if (!strncasecmp(header.c_str(),"copy-header", 11)) {
111  list = curl_slist_append(list, value.c_str());
112  m_headers_copy.emplace_back(value);
113  }
114  // Note: len("TransferHeader") == 14
115  if (!strncasecmp(header.c_str(),"transferheader",14)) {
116  std::stringstream ss;
117  ss << header.substr(14) << ": " << value;
118  list = curl_slist_append(list, ss.str().c_str());
119  m_headers_copy.emplace_back(ss.str());
120  }
121  }
122 
123  if(m_is_transfer_state && !m_push && !req.mReprDigest.empty()) {
124  size_t reprDigestSize = req.mReprDigest.size();
125  std::stringstream ss;
126  ss << "Want-Repr-Digest: ";
127  size_t cpt = 1;
128  for (const auto &kv: req.mReprDigest) {
129  // We put the same weight for the digest names as we do not have any way, according to the specs,
130  // to give priority to a digest name in particular
131  ss << kv.first << '=' << 5;
132  if(cpt < reprDigestSize) {
133  ss << ',';
134  }
135  cpt++;
136  }
137  list = curl_slist_append(list, ss.str().c_str());
138  m_headers_copy.emplace_back(ss.str());
139  }
140 
141  if (m_is_transfer_state && m_push && m_push_length > 0) {
142  // On libcurl 8.5.0 - 8.9.1, we've observed bugs causing failures whenever
143  // `Expect: 100-continue` is not used. Older versions of libcurl unconditionally
144  // set `Expect` whenever PUT is used (likely an older bug). To workaround the issue,
145  // we force `Expect` to be set, triggering the older libcurl behavior.
146  // See: https://github.com/xrootd/xrootd/issues/2470
147  // See: https://github.com/curl/curl/issues/17004
148  list = curl_slist_append(list, "Expect: 100-continue");
149  // Add Repr-Digest header to PUT request (PUSH)
150  auto reprDigest = XrdOucTUtils::caseInsensitiveFind(req.headers,"repr-digest");
151  if(reprDigest != req.headers.end()) {
152  std::string reprDigestHeader {"Repr-Digest: " + reprDigest->second};
153  curl_slist_append(list,reprDigestHeader.c_str());
154  }
155  }
156 
157  if (list != nullptr) {
158  curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, list);
159  m_headers = list;
160  }
161 }
162 
164  struct curl_slist *list = NULL;
165  for (const auto & [header,value]: req.headers) {
166  if (!strncasecmp(header.c_str(),"copy-header", 11)) {
167  list = curl_slist_append(list, value.c_str());
168  }
169  // Note: len("TransferHeader") == 14
170  if (!strncasecmp(header.c_str(),"transferheader",14)) {
171  std::stringstream ss;
172  ss << header.substr(14) << ": " << value;
173  list = curl_slist_append(list, ss.str().c_str());
174  }
175  }
176  if(!req.mReprDigest.empty()) {
177  size_t reprDigestSize = req.mReprDigest.size();
178  std::stringstream ss;
179  ss << "Want-Repr-Digest: ";
180  size_t cpt = 1;
181  for (const auto &kv: req.mReprDigest) {
182  // We put the same weight for the digest names as we do not have any way, according to the specs,
183  // to give priority to a digest name in particular
184  ss << kv.first << '=' << 5;
185  if(cpt < reprDigestSize) {
186  ss << ',';
187  }
188  cpt++;
189  }
190  list = curl_slist_append(list, ss.str().c_str());
191  }
192 
193  if (list != nullptr) {
194  curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, list);
195  }
196 }
197 
199  m_offset = 0;
200  m_status_code = -1;
201  m_content_length = -1;
202  m_push_length = -1;
203  m_recv_all_headers = false;
204  m_recv_status_line = false;
205  m_repr_digests.clear();
206 }
207 
208 size_t State::HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata)
209 {
210  State *obj = static_cast<State*>(userdata);
211  std::string header(buffer, size*nitems);
212  return obj->Header(header);
213 }
214 
215 int State::Header(const std::string &header) {
216  //printf("Received remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str());
217  if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect?
218  m_recv_all_headers = false;
219  m_recv_status_line = false;
220  }
221  if (!m_recv_status_line) {
222  std::stringstream ss(header);
223  std::string item;
224  if (!std::getline(ss, item, ' ')) return 0;
225  m_resp_protocol = item;
226  //printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str());
227  if (!std::getline(ss, item, ' ')) return 0;
228  try {
229  m_status_code = std::stol(item);
230  } catch (...) {
231  return 0;
232  }
233  m_recv_status_line = true;
234  } else if (header.size() == 0 || header == "\n" || header == "\r\n") {
235  m_recv_all_headers = true;
236  }
237  else if (header != "\r\n") {
238  // Parse the header
239  std::size_t found = header.find(":");
240  if (found != std::string::npos) {
241  std::string header_name = header.substr(0, found);
242  std::transform(header_name.begin(), header_name.end(), header_name.begin(), ::tolower);
243  std::string header_value = header.substr(found+1);
244  if (header_name == "content-length")
245  {
246  try {
247  m_content_length = std::stoll(header_value);
248  } catch (...) {
249  // Header unparseable -- not a great sign, fail request.
250  //printf("Content-length header unparseable\n");
251  return 0;
252  }
253  }
254  if(header_name == "repr-digest") {
255  XrdHttpHeaderUtils::parseReprDigest(header_value,m_repr_digests);
256  }
257  } else {
258  // Non-empty header that isn't the status line, but no ':' present --
259  // malformed request?
260  //printf("Malformed header: %s\n", header.c_str());
261  return 0;
262  }
263  }
264  return header.size();
265 }
266 
267 size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata) {
268  State *obj = static_cast<State*>(userdata);
269  if (obj->GetStatusCode() < 0) {
270  return 0;
271  } // malformed request - got body before headers.
272  if (obj->GetStatusCode() >= 400) {
273  obj->m_error_buf += std::string(static_cast<char*>(buffer),
274  std::min(static_cast<size_t>(1024), size*nitems));
275  // Record error messages until we hit a KB; at that point, fail out.
276  if (obj->m_error_buf.size() >= 1024)
277  return 0;
278  else
279  return size*nitems;
280  } // Status indicates failure.
281  return obj->Write(static_cast<char*>(buffer), size*nitems);
282 }
283 
289 size_t State::PushRespCB(void *buffer, size_t size, size_t nitems, void *userdata) {
290  State *obj = static_cast<State*>(userdata);
291  // Note: The obj's status code is set by the HeaderCB once there's a reply from the passive server
292  if (obj->GetStatusCode() < 0) {
293  return 0;
294  } // malformed request - got body before headers.
295  if (obj->GetStatusCode() >= 400) {
296  obj->m_error_buf += std::string(static_cast<char*>(buffer),
297  std::min(static_cast<size_t>(1024), size*nitems));
298  // Record error messages until we hit a KB; at that point, fail out.
299  if (obj->m_error_buf.size() >= 1024)
300  return 0;
301  else
302  return size*nitems;
303  }
304  return size*nitems;
305 }
306 
307 ssize_t State::Write(char *buffer, size_t size) {
308  ssize_t retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false);
309  if (retval == SFS_ERROR) {
310  m_error_buf = m_stream->GetErrorMessage();
311  m_error_code = 1;
312  return -1;
313  }
314  m_offset += retval;
315  return retval;
316 }
317 
319  if (m_push) {
320  return 0;
321  }
322 
323  ssize_t retval = m_stream->Write(m_start_offset + m_offset, 0, 0, true);
324  if (retval == SFS_ERROR) {
325  m_error_buf = m_stream->GetErrorMessage();
326  m_error_code = 2;
327  return -1;
328  }
329  m_offset += retval;
330  return retval;
331 }
332 
333 size_t State::ReadCB(void *buffer, size_t size, size_t nitems, void *userdata) {
334  State *obj = static_cast<State*>(userdata);
335  if (obj->GetStatusCode() < 0) {return 0;} // malformed request - got body before headers.
336  if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure.
337  return obj->Read(static_cast<char*>(buffer), size*nitems);
338 }
339 
340 int State::Read(char *buffer, size_t size) {
341  int retval = m_stream->Read(m_start_offset + m_offset, buffer, size);
342  if (retval == SFS_ERROR) {
343  return -1;
344  }
345  m_offset += retval;
346  //printf("Read a total of %ld bytes.\n", m_offset);
347  return retval;
348 }
349 
351  CURL *curl = curl_easy_duphandle(m_curl);
352  if (!curl) {
353  throw std::runtime_error("Failed to duplicate existing curl handle.");
354  }
355 
356  State *state = new State(0, *m_stream, curl, m_push, tpcForwardCreds);
357 
358  if (m_headers) {
359  state->m_headers_copy.reserve(m_headers_copy.size());
360  for (std::vector<std::string>::const_iterator header_iter = m_headers_copy.begin();
361  header_iter != m_headers_copy.end();
362  header_iter++) {
363  state->m_headers = curl_slist_append(state->m_headers, header_iter->c_str());
364  state->m_headers_copy.push_back(*header_iter);
365  }
366  curl_easy_setopt(curl, CURLOPT_HTTPHEADER, NULL);
367  curl_easy_setopt(curl, CURLOPT_HTTPHEADER, state->m_headers);
368  }
369 
370  return state;
371 }
372 
373 void State::SetTransferParameters(off_t offset, size_t size) {
374  m_start_offset = offset;
375  m_offset = 0;
376  m_content_length = size;
377  std::stringstream ss;
378  ss << offset << "-" << (offset+size-1);
379  curl_easy_setopt(m_curl, CURLOPT_RANGE, ss.str().c_str());
380 }
381 
383 {
384  return m_stream->AvailableBuffers();
385 }
386 
387 void State::DumpBuffers() const
388 {
389  m_stream->DumpBuffers();
390 }
391 
393 {
394  if (!m_stream->Finalize()) {
395  m_error_buf = m_stream->GetErrorMessage();
396  m_error_code = 3;
397  return false;
398  }
399  return true;
400 }
401 
403 {
404  // CURLINFO_PRIMARY_PORT is only defined for 7.21.0 or later; on older
405  // library versions, simply omit this information.
406 #if LIBCURL_VERSION_NUM >= 0x071500
407  char *curl_ip = NULL;
408  CURLcode rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_IP, &curl_ip);
409  if ((rc != CURLE_OK) || !curl_ip) {
410  return "";
411  }
412  long curl_port = 0;
413  rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_PORT, &curl_port);
414  if ((rc != CURLE_OK) || !curl_port) {
415  return "";
416  }
417  std::stringstream ss;
418  // libcurl returns IPv6 addresses of the form:
419  // 2600:900:6:1301:5054:ff:fe0b:9cba:8000
420  // However the HTTP-TPC spec says to use the form
421  // [2600:900:6:1301:5054:ff:fe0b:9cba]:8000
422  // Hence, we add '[' and ']' whenever a ':' is seen.
423  if (NULL == strchr(curl_ip, ':'))
424  ss << "tcp:" << curl_ip << ":" << curl_port;
425  else
426  ss << "tcp:[" << curl_ip << "]:" << curl_port;
427  return ss.str();
428 #else
429  return "";
430 #endif
431 }
void CURL
#define stat(a, b)
Definition: XrdPosix.hh:105
void getline(uchar *buff, int blen)
#define SFS_ERROR
#define SFS_OK
State * Duplicate()
void Move(State &other)
int GetStatusCode() const
void DumpBuffers() const
void ResetAfterRequest()
void SetTransferParameters(off_t offset, size_t size)
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
void SetupHeadersForHEAD(XrdHttpExtReq &req)
int AvailableBuffers() const
int Read(off_t offset, char *buffer, size_t size)
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
void DumpBuffers() const
std::string GetErrorMessage() const
size_t AvailableBuffers() const
int Stat(struct stat *)
std::map< std::string, std::string > & headers
std::map< std::string, std::string > mReprDigest
Repr-Digest map where the key is the digest name and the value is the base64 encoded digest value.
static void parseReprDigest(const std::string &value, std::map< std::string, std::string > &output)
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
Definition: XrdOucTUtils.hh:79