XRootD
XrdHttpTpcTPC.cc
Go to the documentation of this file.
2 #include "XrdNet/XrdNetAddr.hh"
3 #include "XrdNet/XrdNetUtils.hh"
4 #include "XrdOuc/XrdOucEnv.hh"
5 #include "XrdSec/XrdSecEntity.hh"
8 #include "XrdSys/XrdSysFD.hh"
9 #include "XrdVersion.hh"
10 
12 #include "XrdOuc/XrdOucTUtils.hh"
14 #include "XrdHttp/XrdHttpUtils.hh"
15 
16 #include <curl/curl.h>
17 
18 #include <dlfcn.h>
19 #include <fcntl.h>
20 
21 #include <algorithm>
22 #include <memory>
23 #include <sstream>
24 #include <stdexcept>
25 #include <thread>
26 
27 #include "XrdHttpTpcState.hh"
28 #include "XrdHttpTpcStream.hh"
29 #include "XrdHttpTpcTPC.hh"
30 #include <fstream>
31 
32 using namespace TPC;
33 
34 XrdXrootdTpcMon* TPCHandler::TPCLogRecord::tpcMonitor = 0;
35 
36 uint64_t TPCHandler::m_monid{0};
37 int TPCHandler::m_marker_period = 5;
38 size_t TPCHandler::m_block_size = 16*1024*1024;
39 size_t TPCHandler::m_small_block_size = 1*1024*1024;
40 XrdSysMutex TPCHandler::m_monid_mutex;
41 
43 
44 /******************************************************************************/
45 /* T P C H a n d l e r : : T P C L o g R e c o r d D e s t r u c t o r */
46 /******************************************************************************/
47 
48 TPCHandler::TPCLogRecord::~TPCLogRecord()
49 {
50 // Record monitoring data is enabled
51 //
52  if (tpcMonitor)
53  {XrdXrootdTpcMon::TpcInfo monInfo;
54 
55  monInfo.clID = clID.c_str();
56  monInfo.begT = begT;
57  gettimeofday(&monInfo.endT, 0);
58 
59  if (mTpcType == TpcType::Pull)
60  {monInfo.dstURL = local.c_str();
61  monInfo.srcURL = remote.c_str();
62  } else {
63  monInfo.dstURL = remote.c_str();
64  monInfo.srcURL = local.c_str();
66  }
67 
68  if (!status) monInfo.endRC = 0;
69  else if (tpc_status > 0) monInfo.endRC = tpc_status;
70  else monInfo.endRC = 1;
71  monInfo.strm = static_cast<unsigned char>(streams);
72  monInfo.fSize = (bytes_transferred < 0 ? 0 : bytes_transferred);
73  if (!isIPv6) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
74 
75  tpcMonitor->Report(monInfo);
76  }
77 }
78 
79 /******************************************************************************/
80 /* C u r l D e l e t e r : : o p e r a t o r ( ) */
81 /******************************************************************************/
82 
84 {
85  if (curl) curl_easy_cleanup(curl);
86 }
87 
88 /******************************************************************************/
89 /* s o c k o p t _ s e t c l o e x e c _ c a l l b a c k */
90 /******************************************************************************/
91 
100 int TPCHandler::sockopt_callback(void *clientp, curl_socket_t curlfd, curlsocktype purpose) {
101  TPCLogRecord * rec = (TPCLogRecord *)clientp;
102  if (purpose == CURLSOCKTYPE_IPCXN && rec && rec->pmarkManager.isEnabled()) {
103  // We will not reach this callback if the corresponding socket could not have been connected
104  // the socket is already connected only if the packet marking is enabled
105  return CURL_SOCKOPT_ALREADY_CONNECTED;
106  }
107  return CURL_SOCKOPT_OK;
108 }
109 
110 /******************************************************************************/
111 /* o p e n s o c k e t _ c a l l b a c k */
112 /******************************************************************************/
113 
114 
119 int TPCHandler::opensocket_callback(void *clientp,
120  curlsocktype purpose,
121  struct curl_sockaddr *aInfo)
122 {
123  /* CURLSOCKTYPE_IPCXN (for IP based connections) is the only type currently known by curl,
124  * so let's make sure to reject other types if they appear in the furure */
125  if (purpose != CURLSOCKTYPE_IPCXN)
126  return CURL_SOCKET_BAD;
127 
128  if (!aInfo)
129  return CURL_SOCKET_BAD;
130 
131  // Create the socket (note that O_CLOEXEC flag will be set)
132  int fd = XrdSysFD_Socket(aInfo->family, aInfo->socktype, aInfo->protocol);
133 
134  if (fd < 0) {
135  return CURL_SOCKET_BAD;
136  }
137 
138  if (!clientp)
139  return fd;
140 
141  XrdNetAddr thePeer(&(aInfo->addr));
142  TPCLogRecord *rec = static_cast<TPCLogRecord*>(clientp);
143 
144  /* Reject attempts to connect to local/private addresses unless allowed by configuration */
145  if ((!rec->allow_private && thePeer.isPrivate()) || (!rec->allow_local && thePeer.isLocal())) {
146  rec->tpc_status = 403; // Forbidden
147  rec->m_log->Emsg(rec->log_prefix.c_str(),
148  "Connection to local/private address is forbidden");
149  close(fd);
150  return CURL_SOCKET_BAD;
151  }
152 
153  rec->isIPv6 = (thePeer.isIPType(XrdNetAddrInfo::IPv6) && !thePeer.isMapped());
154 
155  std::stringstream connectErrMsg;
156  if(!rec->pmarkManager.connect(fd, &(aInfo->addr), aInfo->addrlen, CONNECT_TIMEOUT, connectErrMsg)) {
157  rec->m_log->Emsg(rec->log_prefix.c_str(), "Unable to connect socket: ", connectErrMsg.str().c_str());
158  close(fd);
159  return CURL_SOCKET_BAD;
160  }
161 
162  return fd;
163 }
164 
165 int TPCHandler::closesocket_callback(void *clientp, curl_socket_t fd) {
166  TPCLogRecord * rec = (TPCLogRecord *)clientp;
167 
168  // Destroy the PMark handle associated to the file descriptor before closing it.
169  // Otherwise, we would lose the socket usage information if the socket is closed before
170  // the PMark handle is closed.
171  rec->pmarkManager.endPmark(fd);
172 
173  return close(fd);
174 }
175 
176 /******************************************************************************/
177 /* p r e p a r e U R L */
178 /******************************************************************************/
179 
180 // See XrdHttpTpcUtils::prepareOpenURL() documentation
181 std::string TPCHandler::prepareURL(XrdHttpExtReq &req) {
182  return XrdHttpTpcUtils::prepareOpenURL(req.resource, req.headers,hdr2cgimap);
183 }
184 
185 /******************************************************************************/
186 /* e n c o d e _ x r o o t d _ o p a q u e _ t o _ u r i */
187 /******************************************************************************/
188 
189 // When processing a redirection from the filesystem layer, it is permitted to return
190 // some xrootd opaque data. The quoting rules for xrootd opaque data are significantly
191 // more permissive than a URI (basically, only '&' and '=' are disallowed while some
192 // URI parsers may dislike characters like '"'). This function takes an opaque string
193 // (e.g., foo=1&bar=2&baz=") and makes it safe for all URI parsers.
194 std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
195 {
196  std::stringstream parser(opaque);
197  std::string sequence;
198  std::stringstream output;
199  bool first = true;
200  while (getline(parser, sequence, '&')) {
201  if (sequence.empty()) {continue;}
202  size_t equal_pos = sequence.find('=');
203  char *val = NULL;
204  if (equal_pos != std::string::npos)
205  val = curl_easy_escape(curl, sequence.c_str() + equal_pos + 1, sequence.size() - equal_pos - 1);
206  // Do not emit parameter if value exists and escaping failed.
207  if (!val && equal_pos != std::string::npos) {continue;}
208 
209  if (!first) output << "&";
210  first = false;
211  output << sequence.substr(0, equal_pos);
212  if (val) {
213  output << "=" << val;
214  curl_free(val);
215  }
216  }
217  return output.str();
218 }
219 
220 /******************************************************************************/
221 /* T P C H a n d l e r : : C o n f i g u r e C u r l C A */
222 /******************************************************************************/
223 
224 void
225 TPCHandler::ConfigureCurlCA(CURL *curl)
226 {
227  auto ca_filename = m_ca_file ? m_ca_file->CAFilename() : "";
228  auto crl_filename = m_ca_file ? m_ca_file->CRLFilename() : "";
229  if (!ca_filename.empty() && !crl_filename.empty()) {
230  curl_easy_setopt(curl, CURLOPT_CAINFO, ca_filename.c_str());
231  //Check that the CRL file contains at least one entry before setting this option to curl
232  //Indeed, an empty CRL file will make curl unhappy and therefore will fail
233  //all HTTP TPC transfers (https://github.com/xrootd/xrootd/issues/1543)
234  std::ifstream in(crl_filename, std::ifstream::ate | std::ifstream::binary);
235  if(in.tellg() > 0 && m_ca_file->atLeastOneValidCRLFound()){
236  curl_easy_setopt(curl, CURLOPT_CRLFILE, crl_filename.c_str());
237  } else {
238  std::ostringstream oss;
239  oss << "No valid CRL file has been found in the file " << crl_filename << ". Disabling CRL checking.";
240  m_log.Log(Warning,"TpcHandler",oss.str().c_str());
241  }
242  }
243  else if (!m_cadir.empty()) {
244  curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
245  }
246  if (!m_cafile.empty()) {
247  curl_easy_setopt(curl, CURLOPT_CAINFO, m_cafile.c_str());
248  }
249 }
250 
251 
252 bool TPCHandler::MatchesPath(const char *verb, const char *path) {
253  return !strcmp(verb, "COPY") || !strcmp(verb, "OPTIONS");
254 }
255 
256 /******************************************************************************/
257 /* P r e p a r e U R L */
258 /******************************************************************************/
259 
260 static std::string PrepareURL(const std::string &url)
261 {
262  const std::string replace_schemes[] = { "davs://", "s3://", "s3s://" };
263 
264  for (const auto& s : replace_schemes)
265  if (url.compare(0, s.size(), s) == 0)
266  return "https://" + url.substr(s.size());
267 
268  return url;
269 }
270 
271 static bool IsAllowedScheme(const std::string& url)
272 {
273  const std::string allowed_schemes[] = { "https://", "http://" };
274 
275  for (const auto& s : allowed_schemes)
276  if (url.compare(0, s.size(), s) == 0)
277  return true;
278 
279  return false;
280 }
281 
282 /******************************************************************************/
283 /* T P C H a n d l e r : : P r o c e s s R e q */
284 /******************************************************************************/
285 
287  if (req.verb == "OPTIONS") {
288  return ProcessOptionsReq(req);
289  }
290  auto header = XrdOucTUtils::caseInsensitiveFind(req.headers,"credential");
291  if (header != req.headers.end()) {
292  if (header->second != "none") {
293  m_log.Emsg("ProcessReq", "COPY requested an unsupported credential type: ", header->second.c_str());
294  return req.SendSimpleResp(400, NULL, NULL, "COPY requestd an unsupported Credential type", 0);
295  }
296  }
297  header = XrdOucTUtils::caseInsensitiveFind(req.headers,"source");
298  if (header != req.headers.end()) {
299  std::string src = PrepareURL(header->second);
300  if (!IsAllowedScheme(src)) {
301  const char *error_src = "COPY rejected: disallowed scheme in source URL";
302  m_log.Emsg("ProcessReq", error_src, src.c_str());
303  return req.SendSimpleResp(400, NULL, NULL, error_src, 0);
304  }
305  return ProcessPullReq(src, req);
306  }
307  header = XrdOucTUtils::caseInsensitiveFind(req.headers,"destination");
308  if (header != req.headers.end()) {
309  const std::string& dst = header->second;
310  if (!IsAllowedScheme(dst)) {
311  const char *error_dst = "COPY rejected: disallowed scheme in destination URL";
312  m_log.Emsg("ProcessReq", error_dst, dst.c_str());
313  return req.SendSimpleResp(400, NULL, NULL, error_dst, 0);
314  }
315  return ProcessPushReq(header->second, req);
316  }
317  m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified.");
318  return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0);
319 }
320 
321 /******************************************************************************/
322 /* T P C H a n d l e r D e s t r u c t o r */
323 /******************************************************************************/
324 
326  m_sfs = NULL;
327 }
328 
329 /******************************************************************************/
330 /* T P C H a n d l e r C o n s t r u c t o r */
331 /******************************************************************************/
332 
333 TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) :
334  m_allow_local(false),
335  m_allow_private(true),
336  m_desthttps(false),
337  m_fixed_route(false),
338  m_timeout(60),
339  m_first_timeout(120),
340  m_log(log->logger(), "TPC_"),
341  m_sfs(NULL)
342 {
343  if (!Configure(config, myEnv)) {
344  throw std::runtime_error("Failed to configure the HTTP third-party-copy handler.");
345  }
346 
347 // Extract out the TPC monitoring object (we share it with xrootd).
348 //
349  XrdXrootdGStream *gs = (XrdXrootdGStream*)myEnv->GetPtr("Tpc.gStream*");
350  if (gs)
351  TPCLogRecord::tpcMonitor = new XrdXrootdTpcMon("http",log->logger(),*gs);
352 }
353 
354 /******************************************************************************/
355 /* T P C H a n d l e r : : P r o c e s s O p t i o n s R e q */
356 /******************************************************************************/
357 
361 int TPCHandler::ProcessOptionsReq(XrdHttpExtReq &req) {
362  return req.SendSimpleResp(200, NULL, (char *) "DAV: 1\r\nDAV: <http://apache.org/dav/propset/fs/1>\r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0);
363 }
364 
365 /******************************************************************************/
366 /* T P C H a n d l e r : : G e t A u t h z */
367 /******************************************************************************/
368 
369 std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) {
370  std::string authz;
371  auto authz_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"authorization");
372  if (authz_header != req.headers.end()) {
373  std::stringstream ss;
374  ss << "authz=" << encode_str(authz_header->second);
375  authz += ss.str();
376  }
377  return authz;
378 }
379 
380 /******************************************************************************/
381 /* T P C H a n d l e r : : R e d i r e c t T r a n s f e r */
382 /******************************************************************************/
383 
384 int TPCHandler::RedirectTransfer(CURL *curl, const std::string &redirect_resource,
385  XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &rec)
386 {
387  int port;
388  const char *ptr = error.getErrText(port);
389  if ((ptr == NULL) || (*ptr == '\0') || (port == 0)) {
390  rec.status = 500;
391  std::stringstream ss;
392  ss << "Internal error: redirect without hostname";
393  logTransferEvent(LogMask::Error, rec, "REDIRECT_INTERNAL_ERROR", ss.str());
394  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
395  }
396 
397  // Construct redirection URL taking into consideration any opaque info
398  std::string rdr_info = ptr;
399  std::string host, opaque;
400  size_t pos = rdr_info.find('?');
401  host = rdr_info.substr(0, pos);
402 
403  if (pos != std::string::npos) {
404  opaque = rdr_info.substr(pos + 1);
405  }
406 
407  std::stringstream ss;
408  ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << redirect_resource;
409 
410  if (!opaque.empty()) {
411  ss << "?" << encode_xrootd_opaque_to_uri(curl, opaque);
412  }
413 
414  rec.status = 307;
415  logTransferEvent(LogMask::Info, rec, "REDIRECT", ss.str());
416  return req.SendSimpleResp(rec.status, NULL, const_cast<char *>(ss.str().c_str()),
417  NULL, 0);
418 }
419 
420 /******************************************************************************/
421 /* T P C H a n d l e r : : O p e n W a i t S t a l l */
422 /******************************************************************************/
423 
424 int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource,
425  int mode, int openMode, const XrdSecEntity &sec,
426  const std::string &authz)
427 {
428  int open_result;
429  while (1) {
430  int orig_ucap = fh.error.getUCap();
431  fh.error.setUCap(orig_ucap | XrdOucEI::uIPv64);
432  std::string opaque;
433  size_t pos = resource.find('?');
434  // Extract the path and opaque info from the resource
435  std::string path = resource.substr(0, pos);
436 
437  if (pos != std::string::npos) {
438  opaque = resource.substr(pos + 1);
439  }
440 
441  // Append the authz information if there are some
442  if(!authz.empty()) {
443  opaque += (opaque.empty() ? "" : "&");
444  opaque += authz;
445  }
446  open_result = fh.open(path.c_str(), mode, openMode, &sec, opaque.c_str());
447 
448  if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) {
449  int secs_to_stall = fh.error.getErrInfo();
450  if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
451  std::this_thread::sleep_for (std::chrono::seconds(secs_to_stall));
452  }
453  break;
454  }
455  return open_result;
456 }
457 
458 /******************************************************************************/
459 /* T P C H a n d l e r : : D e t e r m i n e X f e r S i z e */
460 /******************************************************************************/
461 
462 
463 
467 int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state,
468  bool &success, TPCLogRecord &rec, bool shouldReturnErrorToClient) {
469  success = false;
470  curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
471  // Set a custom timeout of 60 seconds (= CONNECT_TIMEOUT for convenience) for the HEAD request
472  curl_easy_setopt(curl, CURLOPT_TIMEOUT, CONNECT_TIMEOUT);
473  CURLcode res;
474  res = curl_easy_perform(curl);
475  //Immediately set the CURLOPT_NOBODY flag to 0 as we anyway
476  //don't want the next curl call to do be a HEAD request
477  curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
478  // Reset the CURLOPT_TIMEOUT to no timeout (default)
479  curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);
480  curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
481 
482  std::stringstream ss;
483 
484  if (state.GetStatusCode() >= 400)
485  res = CURLE_HTTP_RETURNED_ERROR;
486 
487  if (res != CURLE_OK) { /* curl failed */
488  ss << curl_easy_strerror(res);
489  switch (res) {
490  case CURLE_HTTP_RETURNED_ERROR: /* remote side may have returned an error */
491  rec.tpc_status = state.GetStatusCode(); /* relay status received from remote side to the client */
492  ss << ": remote host returned '" << rec.tpc_status << " "
493  << httpStatusToString(rec.tpc_status) << "' while fetching file size";
494  break;
495  case CURLE_COULDNT_CONNECT: /* socket callback may have failed */
496  switch (rec.tpc_status) {
497  case 403:
498  ss << ": connection to local/private addresses is forbidden";
499  break;
500  default:
501  ss << ": internal server failure";
502  rec.tpc_status = 500;
503  }
504  break;
505  default:
506  rec.tpc_status = 500;
507  state.SetErrorCode(500);
508  }
509  }
510 
511  if (rec.tpc_status >= 400) {
512  logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
513  return shouldReturnErrorToClient ? req.SendSimpleResp(rec.tpc_status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
514  }
515 
516  success = true;
517  ss << "Successfully determined remote size for pull request: " << state.GetContentLength();
518  logTransferEvent(LogMask::Debug, rec, "SIZE_SUCCESS", ss.str());
519  return 0;
520 }
521 
522 int TPCHandler::GetContentLengthTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength, bool & success, TPCLogRecord &rec) {
523  State state(curl,req.tpcForwardCreds);
524  //Don't forget to copy the headers of the client's request before doing the HEAD call. Otherwise, if there is a need for authentication,
525  //it will fail
526  state.SetupHeaders(req);
527  int result;
528  //In case we cannot get the content length, we return the error to the client
529  if ((result = DetermineXferSize(curl, req, state, success, rec)) || !success) {
530  return result;
531  }
532  contentLength = state.GetContentLength();
533  return result;
534 }
535 
536 /******************************************************************************/
537 /* T P C H a n d l e r : : S e n d P e r f M a r k e r */
538 /******************************************************************************/
539 
540 int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state) {
541  std::stringstream ss;
542  const std::string crlf = "\n";
543  ss << "Perf Marker" << crlf;
544  ss << "Timestamp: " << time(NULL) << crlf;
545  ss << "Stripe Index: 0" << crlf;
546  ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
547  ss << "Total Stripe Count: 1" << crlf;
548  // Include the TCP connection associated with this transfer; used by
549  // the TPC client for monitoring purposes.
550  std::string desc = state.GetConnectionDescription();
551  if (!desc.empty())
552  ss << "RemoteConnections: " << desc << crlf;
553  ss << "End" << crlf;
554  rec.bytes_transferred = state.BytesTransferred();
555  logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
556 
557  return req.ChunkResp(ss.str().c_str(), 0);
558 }
559 
560 /******************************************************************************/
561 /* T P C H a n d l e r : : S e n d P e r f M a r k e r */
562 /******************************************************************************/
563 
564 int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
565  off_t bytes_transferred)
566 {
567  // The 'performance marker' format is largely derived from how GridFTP works
568  // (e.g., the concept of `Stripe` is not quite so relevant here). See:
569  // https://twiki.cern.ch/twiki/bin/view/LCG/HttpTpcTechnical
570  // Example marker:
571  // Perf Marker\n
572  // Timestamp: 1537788010\n
573  // Stripe Index: 0\n
574  // Stripe Bytes Transferred: 238745\n
575  // Total Stripe Count: 1\n
576  // RemoteConnections: tcp:129.93.3.4:1234,tcp:[2600:900:6:1301:268a:7ff:fef6:a590]:2345\n
577  // End\n
578  //
579  std::stringstream ss;
580  const std::string crlf = "\n";
581  ss << "Perf Marker" << crlf;
582  ss << "Timestamp: " << time(NULL) << crlf;
583  ss << "Stripe Index: 0" << crlf;
584  ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf;
585  ss << "Total Stripe Count: 1" << crlf;
586  // Build a list of TCP connections associated with this transfer; used by
587  // the TPC client for monitoring purposes.
588  bool first = true;
589  std::stringstream ss2;
590  for (std::vector<State*>::const_iterator iter = state.begin();
591  iter != state.end(); iter++)
592  {
593  std::string desc = (*iter)->GetConnectionDescription();
594  if (!desc.empty()) {
595  ss2 << (first ? "" : ",") << desc;
596  first = false;
597  }
598  }
599  if (!first)
600  ss << "RemoteConnections: " << ss2.str() << crlf;
601  ss << "End" << crlf;
602  rec.bytes_transferred = bytes_transferred;
603  logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
604 
605  return req.ChunkResp(ss.str().c_str(), 0);
606 }
607 
608 /******************************************************************************/
609 /* T P C H a n d l e r : : R u n C u r l W i t h U p d a t e s */
610 /******************************************************************************/
611 
612 int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
613  TPCLogRecord &rec)
614 {
615  // Create the multi-handle and add in the current transfer to it.
616  CURLM *multi_handle = curl_multi_init();
617  if (!multi_handle) {
618  rec.status = 500;
619  logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL",
620  "Failed to initialize a libcurl multi-handle");
621  std::stringstream ss;
622  ss << "Failed to initialize internal server memory";
623  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
624  }
625 
626  //curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024);
627 
628  CURLMcode mres;
629  mres = curl_multi_add_handle(multi_handle, curl);
630  if (mres) {
631  rec.status = 500;
632  std::stringstream ss;
633  ss << "Failed to add transfer to libcurl multi-handle: HTTP library failure=" << curl_multi_strerror(mres);
634  logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", ss.str());
635  curl_multi_cleanup(multi_handle);
636  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
637  }
638 
639  // Start response to client prior to the first call to curl_multi_perform
640  int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
641  if (retval) {
642  curl_multi_cleanup(multi_handle);
643  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
644  "Failed to send the initial response to the TPC client");
645  return retval;
646  } else {
647  logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
648  "Initial transfer response sent to the TPC client");
649  }
650 
651  // Transfer loop: use curl to actually run the transfer, but periodically
652  // interrupt things to send back performance updates to the client.
653  int running_handles = 1;
654  time_t last_marker = 0;
655  // Track how long it's been since the last time we recorded more bytes being transferred.
656  off_t last_advance_bytes = 0;
657  time_t last_advance_time = time(NULL);
658  time_t transfer_start = last_advance_time;
659  CURLcode res = static_cast<CURLcode>(-1);
660  do {
661  time_t now = time(NULL);
662  time_t next_marker = last_marker + m_marker_period;
663  if (now >= next_marker) {
664  off_t bytes_xfer = state.BytesTransferred();
665  if (bytes_xfer > last_advance_bytes) {
666  last_advance_bytes = bytes_xfer;
667  last_advance_time = now;
668  }
669  if (SendPerfMarker(req, rec, state)) {
670  curl_multi_remove_handle(multi_handle, curl);
671  curl_multi_cleanup(multi_handle);
672  logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
673  "Failed to send a perf marker to the TPC client");
674  return -1;
675  }
676  int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
677  if (now > last_advance_time + timeout) {
678  const char *log_prefix = rec.log_prefix.c_str();
679  bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
680 
681  state.SetErrorCode(10);
682  std::stringstream ss;
683  ss << "Transfer failed because no bytes have been "
684  << (tpc_pull ? "received from the source (pull mode) in "
685  : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
686  state.SetErrorMessage(ss.str());
687  curl_multi_remove_handle(multi_handle, curl);
688  curl_multi_cleanup(multi_handle);
689  break;
690  }
691  last_marker = now;
692  }
693  // The transfer will start after this point, notify the packet marking manager
694  rec.pmarkManager.startTransfer();
695  mres = curl_multi_perform(multi_handle, &running_handles);
696  if (mres == CURLM_CALL_MULTI_PERFORM) {
697  // curl_multi_perform should be called again immediately. On newer
698  // versions of curl, this is no longer used.
699  continue;
700  } else if (mres != CURLM_OK) {
701  break;
702  } else if (running_handles == 0) {
703  break;
704  }
705 
706  rec.pmarkManager.beginPMarks();
707  //printf("There are %d running handles\n", running_handles);
708 
709  // Harvest any messages, looking for CURLMSG_DONE.
710  CURLMsg *msg;
711  do {
712  int msgq = 0;
713  msg = curl_multi_info_read(multi_handle, &msgq);
714  if (msg && (msg->msg == CURLMSG_DONE)) {
715  CURL *easy_handle = msg->easy_handle;
716  res = msg->data.result;
717  curl_multi_remove_handle(multi_handle, easy_handle);
718  }
719  } while (msg);
720 
721  int64_t max_sleep_time = next_marker - time(NULL);
722  if (max_sleep_time <= 0) {
723  continue;
724  }
725  int fd_count;
726  mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count);
727  if (mres != CURLM_OK) {
728  break;
729  }
730  } while (running_handles);
731 
732  if (mres != CURLM_OK) {
733  std::stringstream ss;
734  ss << "Internal libcurl multi-handle error: HTTP library failure=" << curl_multi_strerror(mres);
735  logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
736 
737  curl_multi_remove_handle(multi_handle, curl);
738  curl_multi_cleanup(multi_handle);
739 
740  if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
741  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
742  "Failed to send error message to the TPC client");
743  return retval;
744  }
745  return req.ChunkResp(NULL, 0);
746  }
747 
748  // Harvest any messages, looking for CURLMSG_DONE.
749  CURLMsg *msg;
750  do {
751  int msgq = 0;
752  msg = curl_multi_info_read(multi_handle, &msgq);
753  if (msg && (msg->msg == CURLMSG_DONE)) {
754  CURL *easy_handle = msg->easy_handle;
755  res = msg->data.result;
756  curl_multi_remove_handle(multi_handle, easy_handle);
757  }
758  } while (msg);
759 
760  if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
761  curl_multi_remove_handle(multi_handle, curl);
762  curl_multi_cleanup(multi_handle);
763  std::stringstream ss;
764  ss << "Internal state error in libcurl";
765  logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
766 
767  if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
768  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
769  "Failed to send error message to the TPC client");
770  return retval;
771  }
772  return req.ChunkResp(NULL, 0);
773  }
774  curl_multi_cleanup(multi_handle);
775 
776  state.Flush();
777 
778  rec.bytes_transferred = state.BytesTransferred();
779  rec.tpc_status = state.GetStatusCode();
780 
781  // Explicitly finalize the stream (which will close the underlying file
782  // handle) before the response is sent. In some cases, subsequent HTTP
783  // requests can occur before the filesystem is done closing the handle -
784  // and those requests may occur against partial data.
785  state.Finalize();
786 
787  // Generate the final response back to the client.
788  std::stringstream ss;
789  bool success = false;
790  if (state.GetStatusCode() >= 400) {
791  std::string err = state.GetErrorMessage();
792  std::stringstream ss2;
793  ss2 << "Remote side failed with status code " << state.GetStatusCode();
794  if (!err.empty()) {
795  std::replace(err.begin(), err.end(), '\n', ' ');
796  ss2 << "; error message: \"" << err << "\"";
797  }
798  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
799  ss << generateClientErr(ss2, rec);
800  } else if (state.GetErrorCode()) {
801  std::string err = state.GetErrorMessage();
802  if (err.empty()) {err = "(no error message provided)";}
803  else {std::replace(err.begin(), err.end(), '\n', ' ');}
804  std::stringstream ss2;
805  ss2 << "Error when interacting with local filesystem: " << err;
806  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
807  ss << generateClientErr(ss2, rec);
808  } else if (res != CURLE_OK) {
809  std::stringstream ss2;
810  ss2 << "Internal transfer failure";
811  std::stringstream ss3;
812  ss3 << ss2.str() << ": " << curl_easy_strerror(res);
813  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss3.str());
814  ss << generateClientErr(ss2, rec, res);
815  } else {
816  ss << "success: Created";
817  success = true;
818  }
819 
820  if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
821  logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
822  "Failed to send last update to remote client");
823  return retval;
824  } else if (success) {
825  logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
826  rec.status = 0;
827  }
828  return req.ChunkResp(NULL, 0);
829 }
830 
831 /******************************************************************************/
832 /* T P C H a n d l e r : : P r o c e s s P u s h R e q */
833 /******************************************************************************/
834 
835 int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) {
836  TPCLogRecord rec(req, TpcType::Push);
837  rec.allow_local = m_allow_local;
838  rec.allow_private = m_allow_private;
839  rec.log_prefix = "PushRequest";
840  rec.local = req.resource;
841  rec.remote = resource;
842  rec.m_log = &m_log;
843  char *name = req.GetSecEntity().name;
844  req.GetClientID(rec.clID);
845  if (name) rec.name = name;
846  logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request");
847 
848  ManagedCurlHandle curlPtr(curl_easy_init());
849  auto curl = curlPtr.get();
850  if (!curl) {
851  std::stringstream ss;
852  ss << "Failed to initialize internal transfer resources";
853  rec.status = 500;
854  logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", ss.str());
855  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
856  }
857  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
858  curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
859 #if CURL_AT_LEAST_VERSION(7, 85, 0)
860  curl_easy_setopt(curl, CURLOPT_PROTOCOLS_STR, "https,http");
861  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS_STR, "https,http");
862 #else
863  long protocols = CURLPROTO_HTTP | CURLPROTO_HTTPS;
864  curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
865  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
866 #endif
867  curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
868  curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
869  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
870  curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
871  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
872  curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
873 
874  auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
875  std::string redirect_resource = req.resource;
876  if (query_header != req.headers.end()) {
877  redirect_resource = query_header->second;
878  }
879 
880  AtomicBeg(m_monid_mutex);
881  uint64_t file_monid = AtomicInc(m_monid);
882  AtomicEnd(m_monid_mutex);
883  std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
884  if (!fh.get()) {
885  rec.status = 500;
886  std::stringstream ss;
887  ss << "Failed to initialize internal transfer file handle";
888  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL",
889  ss.str());
890  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
891  }
892  std::string full_url = prepareURL(req);
893 
894  std::string authz = GetAuthz(req);
895 
896  int open_results = OpenWaitStall(*fh, full_url, SFS_O_RDONLY, 0644,
897  req.GetSecEntity(), authz);
898  if (SFS_REDIRECT == open_results) {
899  int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
900  return result;
901  } else if (SFS_OK != open_results) {
902  int code;
903  std::stringstream ss;
904  const char *msg = fh->error.getErrText(code);
905  if (msg == NULL) ss << "Failed to open local resource";
906  else ss << msg;
907  rec.status = mapErrNoToHttp(code);
908  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", msg);
909  int resp_result = req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
910  fh->close();
911  return resp_result;
912  }
913  ConfigureCurlCA(curl);
914  curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
915 
916  Stream stream(std::move(fh), 0, 0, m_log);
917  State state(0, stream, curl, true, req.tpcForwardCreds);
918  state.SetupHeaders(req);
919 
920  return RunCurlWithUpdates(curl, req, state, rec);
921 }
922 
923 /******************************************************************************/
924 /* T P C H a n d l e r : : P r o c e s s P u l l R e q */
925 /******************************************************************************/
926 
927 int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) {
928  TPCLogRecord rec(req,TpcType::Pull);
929  rec.allow_local = m_allow_local;
930  rec.allow_private = m_allow_private;
931  rec.log_prefix = "PullRequest";
932  rec.local = req.resource;
933  rec.remote = resource;
934  rec.m_log = &m_log;
935  char *name = req.GetSecEntity().name;
936  req.GetClientID(rec.clID);
937  if (name) rec.name = name;
938  logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a pull request");
939 
940  ManagedCurlHandle curlPtr(curl_easy_init());
941  auto curl = curlPtr.get();
942  if (!curl) {
943  std::stringstream ss;
944  ss << "Failed to initialize internal transfer resources";
945  rec.status = 500;
946  logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
947  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
948  }
949 
950  // ddavila 2023-01-05:
951  // The following change was required by the Rucio/SENSE project where
952  // multiple IP addresses, each from a different subnet, are assigned to a
953  // single server and routed differently by SENSE.
954  // The above requires the server to utilize the same IP, that was used to
955  // start the TPC, for the resolution of the given TPC instead of
956  // using any of the IPs available.
957  if (m_fixed_route){
958  XrdNetAddr *nP;
959  int numIP = 0;
960  char buff[1024];
961  char * ip;
962 
963  // Get the hostname used to contact the server from the http header
964  auto host_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"host");
965  std::string host_used;
966  if (host_header != req.headers.end()) {
967  host_used = host_header->second;
968  }
969 
970  // Get the IP addresses associated with the above hostname
971  XrdNetUtils::GetAddrs(host_used.c_str(), &nP, numIP, XrdNetUtils::prefAuto, 0);
972  int ip_size = nP[0].Format(buff, 1024, XrdNetAddrInfo::fmtAddr,XrdNetAddrInfo::noPort);
973  ip = (char *)malloc(ip_size-1);
974 
975  // Substring to get only the address, remove brackets and garbage
976  memcpy(ip, buff+1, ip_size-2);
977  ip[ip_size-2]='\0';
978  logTransferEvent(LogMask::Info, rec, "LOCAL IP", ip);
979 
980  curl_easy_setopt(curl, CURLOPT_INTERFACE, ip);
981  }
982  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
983  curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
984 #if CURL_AT_LEAST_VERSION(7, 85, 0)
985  curl_easy_setopt(curl, CURLOPT_PROTOCOLS_STR, "https,http");
986  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS_STR, "https,http");
987 #else
988  long protocols = CURLPROTO_HTTP | CURLPROTO_HTTPS;
989  curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
990  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
991 #endif
992  curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
993  curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
994  curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
995  curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA , &rec);
996  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
997  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
998  curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
999  std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
1000  if (!fh.get()) {
1001  std::stringstream ss;
1002  ss << "Failed to initialize internal transfer file handle";
1003  rec.status = 500;
1004  logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
1005  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1006  }
1007  auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
1008  std::string redirect_resource = req.resource;
1009  if (query_header != req.headers.end()) {
1010  redirect_resource = query_header->second;
1011  }
1013  auto overwrite_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"overwrite");
1014  if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) {
1015  if (! usingEC) mode = SFS_O_TRUNC;
1016  }
1017  int streams = 1;
1018  {
1019  auto streams_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"x-number-of-streams");
1020  if (streams_header != req.headers.end()) {
1021  int stream_req = -1;
1022  try {
1023  stream_req = std::stol(streams_header->second);
1024  } catch (...) { // Handled below
1025  }
1026  if (stream_req < 0 || stream_req > 100) {
1027  std::stringstream ss;
1028  ss << "Invalid request for number of streams";
1029  rec.status = 400;
1030  logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", ss.str());
1031  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1032  }
1033  streams = stream_req == 0 ? 1 : stream_req;
1034  }
1035  }
1036  rec.streams = streams;
1037  std::string full_url = prepareURL(req);
1038  std::string authz = GetAuthz(req);
1039  curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
1040  ConfigureCurlCA(curl);
1041  uint64_t sourceFileContentLength = 0;
1042  {
1043  //Get the content-length of the source file and pass it to the OSS layer
1044  //during the open
1045  bool success;
1046  GetContentLengthTPCPull(curl, req, sourceFileContentLength, success, rec);
1047  if(success) {
1048  //In the case we cannot get the information from the source server (offline or other error)
1049  //we just don't add the size information to the opaque of the local file to open
1050  full_url += "&oss.asize=" + std::to_string(sourceFileContentLength);
1051  } else {
1052  // In the case the GetContentLength is not successful, an error will be returned to the client
1053  // just exit here so we don't open the file!
1054  return 0;
1055  }
1056  }
1057  int open_result = OpenWaitStall(*fh, full_url, mode|SFS_O_WRONLY,
1058  0644 | SFS_O_MKPTH,
1059  req.GetSecEntity(), authz);
1060  if (SFS_REDIRECT == open_result) {
1061  int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
1062  return result;
1063  } else if (SFS_OK != open_result) {
1064  int code;
1065  std::stringstream ss;
1066  const char *msg = fh->error.getErrText(code);
1067  if ((msg == NULL) || (*msg == '\0')) ss << "Failed to open local resource";
1068  else ss << msg;
1069  rec.status = mapErrNoToHttp(code);
1070  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", ss.str());
1071  int resp_result = req.SendSimpleResp(rec.status, NULL, NULL,
1072  generateClientErr(ss, rec).c_str(), 0);
1073  fh->close();
1074  return resp_result;
1075  }
1076  Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
1077  State state(0, stream, curl, false, req.tpcForwardCreds);
1078  state.SetupHeaders(req);
1079  state.SetContentLength(sourceFileContentLength);
1080 
1081  if (streams > 1) {
1082  return RunCurlWithStreams(req, state, streams, rec);
1083  } else {
1084  return RunCurlWithUpdates(curl, req, state, rec);
1085  }
1086 }
1087 
1088 /******************************************************************************/
1089 /* T P C H a n d l e r : : l o g T r a n s f e r E v e n t */
1090 /******************************************************************************/
1091 
1092 void TPCHandler::logTransferEvent(LogMask mask, const TPCLogRecord &rec,
1093  const std::string &event, const std::string &message)
1094 {
1095  if (!(m_log.getMsgMask() & mask)) {return;}
1096 
1097  std::stringstream ss;
1098  ss << "event=" << event << ", local=" << rec.local << ", remote=" << rec.remote;
1099  if (rec.name.empty())
1100  ss << ", user=(anonymous)";
1101  else
1102  ss << ", user=" << rec.name;
1103  if (rec.streams != 1)
1104  ss << ", streams=" << rec.streams;
1105  if (rec.bytes_transferred >= 0)
1106  ss << ", bytes_transferred=" << rec.bytes_transferred;
1107  if (rec.status >= 0)
1108  ss << ", status=" << rec.status;
1109  if (rec.tpc_status >= 0)
1110  ss << ", tpc_status=" << rec.tpc_status;
1111  if (!message.empty())
1112  ss << "; " << message;
1113  m_log.Log(mask, rec.log_prefix.c_str(), ss.str().c_str());
1114 }
1115 
1116 std::string TPCHandler::generateClientErr(std::stringstream &err_ss, const TPCLogRecord &rec, CURLcode cCode) {
1117  std::stringstream ssret;
1118  ssret << "failure: " << err_ss.str() << ", local=" << rec.local <<", remote=" << rec.remote;
1119  if(cCode != CURLcode::CURLE_OK) {
1120  ssret << ", HTTP library failure=" << curl_easy_strerror(cCode);
1121  }
1122  return ssret.str();
1123 }
1124 /******************************************************************************/
1125 /* X r d H t t p G e t E x t H a n d l e r */
1126 /******************************************************************************/
1127 
1128 extern "C" {
1129 
1130 XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) {
1131  if (curl_global_init(CURL_GLOBAL_DEFAULT)) {
1132  log->Emsg("TPCInitialize", "libcurl failed to initialize");
1133  return NULL;
1134  }
1135 
1136  TPCHandler *retval{NULL};
1137  if (!config) {
1138  log->Emsg("TPCInitialize", "TPC handler requires a config filename in order to load");
1139  return NULL;
1140  }
1141  try {
1142  log->Emsg("TPCInitialize", "Will load configuration for the TPC handler from", config);
1143  retval = new TPCHandler(log, config, myEnv);
1144  } catch (std::runtime_error &re) {
1145  log->Emsg("TPCInitialize", "Encountered a runtime failure when loading ", re.what());
1146  //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*"));
1147  }
1148  return retval;
1149 }
1150 
1151 }
void CURL
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC)
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdSysError *log, const char *config, const char *, XrdOucEnv *myEnv)
static std::string PrepareURL(const std::string &url)
std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
static bool IsAllowedScheme(const std::string &url)
int mapErrNoToHttp(int errNo)
std::string httpStatusToString(int status)
Utility functions for XrdHTTP.
std::string encode_str(const std::string &str)
#define close(a)
Definition: XrdPosix.hh:48
bool Debug
void getline(uchar *buff, int blen)
#define SFS_REDIRECT
#define SFS_O_MKPTH
#define SFS_STALL
#define SFS_O_RDONLY
#define SFS_STARTED
#define SFS_O_WRONLY
#define SFS_O_CREAT
int XrdSfsFileOpenMode
#define SFS_OK
#define SFS_O_TRUNC
#define AtomicInc(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
@ Error
int GetStatusCode() const
off_t BytesTransferred() const
void SetErrorMessage(const std::string &error_msg)
int GetErrorCode() const
std::string GetErrorMessage() const
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
void SetContentLength(const off_t content_length)
off_t GetContentLength() const
void SetErrorCode(int error_code)
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
virtual int ProcessReq(XrdHttpExtReq &req)
virtual ~TPCHandler()
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
void GetClientID(std::string &clid)
std::map< std::string, std::string > & headers
std::string resource
std::string verb
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
const XrdSecEntity & GetSecEntity() const
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
static std::string prepareOpenURL(const std::string &reqResource, std::map< std::string, std::string > &reqHeaders, const std::map< std::string, std::string > &hdr2cgimap)
static const int noPort
Do not add port number.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
static const char * GetAddrs(const char *hSpec, XrdNetAddr *aListP[], int &aListN, AddrOpts opts=allIPMap, int pNum=PortInSpec)
Definition: XrdNetUtils.cc:274
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
const char * getErrText()
void setUCap(int ucval)
Set user capabilties.
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
Definition: XrdOucTUtils.hh:79
char * name
Entity's name.
Definition: XrdSecEntity.hh:69
virtual XrdSfsFile * newFile(char *user=0, int MonID=0)=0
XrdOucErrInfo & error
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client=0, const char *opaque=0)=0
virtual int close()=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:141
int getMsgMask()
Definition: XrdSysError.hh:156
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
Definition: XrdSysError.hh:133
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
@ Warning
void operator()(CURL *curl)
static const int uIPv64
ucap: Supports only IPv4 info
static const int isaPush