XRootD
XrdHttpTpcTPC.cc
Go to the documentation of this file.
2 #include "XrdNet/XrdNetAddr.hh"
3 #include "XrdNet/XrdNetUtils.hh"
4 #include "XrdOuc/XrdOucEnv.hh"
5 #include "XrdSec/XrdSecEntity.hh"
8 #include "XrdSys/XrdSysFD.hh"
9 #include "XrdVersion.hh"
10 
12 #include "XrdOuc/XrdOucTUtils.hh"
14 #include "XrdHttp/XrdHttpUtils.hh"
15 
16 #include <curl/curl.h>
17 
18 #include <dlfcn.h>
19 #include <fcntl.h>
20 
21 #include <algorithm>
22 #include <memory>
23 #include <sstream>
24 #include <stdexcept>
25 #include <thread>
26 
27 #include "XrdHttpTpcState.hh"
28 #include "XrdHttpTpcStream.hh"
29 #include "XrdHttpTpcTPC.hh"
30 #include <fstream>
31 
32 using namespace TPC;
33 
34 XrdXrootdTpcMon* TPCHandler::TPCLogRecord::tpcMonitor = 0;
35 
36 uint64_t TPCHandler::m_monid{0};
37 int TPCHandler::m_marker_period = 5;
38 size_t TPCHandler::m_block_size = 16*1024*1024;
39 size_t TPCHandler::m_small_block_size = 1*1024*1024;
40 XrdSysMutex TPCHandler::m_monid_mutex;
41 bool TPCHandler::allowMissingCRL = false;
42 
44 
45 /******************************************************************************/
46 /* T P C H a n d l e r : : T P C L o g R e c o r d D e s t r u c t o r */
47 /******************************************************************************/
48 
49 TPCHandler::TPCLogRecord::~TPCLogRecord()
50 {
51 // Record monitoring data is enabled
52 //
53  if (tpcMonitor)
54  {XrdXrootdTpcMon::TpcInfo monInfo;
55 
56  monInfo.clID = clID.c_str();
57  monInfo.begT = begT;
58  gettimeofday(&monInfo.endT, 0);
59 
60  if (mTpcType == TpcType::Pull)
61  {monInfo.dstURL = local.c_str();
62  monInfo.srcURL = remote.c_str();
63  } else {
64  monInfo.dstURL = remote.c_str();
65  monInfo.srcURL = local.c_str();
67  }
68 
69  if (!status) monInfo.endRC = 0;
70  else if (tpc_status > 0) monInfo.endRC = tpc_status;
71  else monInfo.endRC = 1;
72  monInfo.strm = static_cast<unsigned char>(streams);
73  monInfo.fSize = (bytes_transferred < 0 ? 0 : bytes_transferred);
74  if (!isIPv6) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
75 
76  tpcMonitor->Report(monInfo);
77  }
78 }
79 
80 /******************************************************************************/
81 /* C u r l D e l e t e r : : o p e r a t o r ( ) */
82 /******************************************************************************/
83 
85 {
86  if (curl) curl_easy_cleanup(curl);
87 }
88 
89 /******************************************************************************/
90 /* s o c k o p t _ s e t c l o e x e c _ c a l l b a c k */
91 /******************************************************************************/
92 
101 int TPCHandler::sockopt_callback(void *clientp, curl_socket_t curlfd, curlsocktype purpose) {
102  TPCLogRecord * rec = (TPCLogRecord *)clientp;
103  if (purpose == CURLSOCKTYPE_IPCXN && rec && rec->pmarkManager.isEnabled()) {
104  // We will not reach this callback if the corresponding socket could not have been connected
105  // the socket is already connected only if the packet marking is enabled
106  return CURL_SOCKOPT_ALREADY_CONNECTED;
107  }
108  return CURL_SOCKOPT_OK;
109 }
110 
111 /******************************************************************************/
112 /* o p e n s o c k e t _ c a l l b a c k */
113 /******************************************************************************/
114 
115 
120 int TPCHandler::opensocket_callback(void *clientp,
121  curlsocktype purpose,
122  struct curl_sockaddr *aInfo)
123 {
124  /* CURLSOCKTYPE_IPCXN (for IP based connections) is the only type currently known by curl,
125  * so let's make sure to reject other types if they appear in the furure */
126  if (purpose != CURLSOCKTYPE_IPCXN)
127  return CURL_SOCKET_BAD;
128 
129  if (!aInfo)
130  return CURL_SOCKET_BAD;
131 
132  // Create the socket (note that O_CLOEXEC flag will be set)
133  int fd = XrdSysFD_Socket(aInfo->family, aInfo->socktype, aInfo->protocol);
134 
135  if (fd < 0) {
136  return CURL_SOCKET_BAD;
137  }
138 
139  if (!clientp)
140  return fd;
141 
142  XrdNetAddr thePeer(&(aInfo->addr));
143  TPCLogRecord *rec = static_cast<TPCLogRecord*>(clientp);
144 
145  /* Reject attempts to connect to local/private addresses unless allowed by configuration */
146  if ((!rec->allow_private && thePeer.isPrivate()) || (!rec->allow_local && thePeer.isLocal())) {
147  rec->tpc_status = 403; // Forbidden
148  rec->m_log->Emsg(rec->log_prefix.c_str(),
149  "Connection to local/private address is forbidden");
150  close(fd);
151  return CURL_SOCKET_BAD;
152  }
153 
154  rec->isIPv6 = (thePeer.isIPType(XrdNetAddrInfo::IPv6) && !thePeer.isMapped());
155 
156  std::stringstream connectErrMsg;
157  if(!rec->pmarkManager.connect(fd, &(aInfo->addr), aInfo->addrlen, CONNECT_TIMEOUT, connectErrMsg)) {
158  rec->m_log->Emsg(rec->log_prefix.c_str(), "Unable to connect socket: ", connectErrMsg.str().c_str());
159  close(fd);
160  return CURL_SOCKET_BAD;
161  }
162 
163  return fd;
164 }
165 
166 int TPCHandler::closesocket_callback(void *clientp, curl_socket_t fd) {
167  TPCLogRecord * rec = (TPCLogRecord *)clientp;
168 
169  // Destroy the PMark handle associated to the file descriptor before closing it.
170  // Otherwise, we would lose the socket usage information if the socket is closed before
171  // the PMark handle is closed.
172  rec->pmarkManager.endPmark(fd);
173 
174  return close(fd);
175 }
176 
177 /******************************************************************************/
178 /* s s l _ c t x _ c a l l b a c k */
179 /******************************************************************************/
180 
187 int TPCHandler::ssl_ctx_callback(CURL *curl, void *ssl_ctx, void *clientp) {
188  //TPCLogRecord * rec = (TPCLogRecord *)clientp;
189  SSL_CTX* ctx = static_cast<SSL_CTX*>(ssl_ctx);
190  SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, verify_callback);
191  return CURL_SOCKOPT_OK;
192 }
193 
194 int TPCHandler::verify_callback(int preverify_ok, X509_STORE_CTX* ctx) {
195  if (preverify_ok == 1) return 1;
196 
197  int err = X509_STORE_CTX_get_error(ctx);
198 
199  if (err == X509_V_ERR_UNABLE_TO_GET_CRL) {
200  X509_STORE_CTX_set_error(ctx, X509_V_OK);
201  return 1;
202  }
203 
204  return 0;
205 }
206 
207 /******************************************************************************/
208 /* p r e p a r e U R L */
209 /******************************************************************************/
210 
211 // See XrdHttpTpcUtils::prepareOpenURL() documentation
212 std::string TPCHandler::prepareURL(XrdHttpExtReq &req) {
213  XrdHttpTpcUtils::PrepareOpenURLParams parms {req.resource, req.headers, hdr2cgimap,req.mReprDigest};
214  return XrdHttpTpcUtils::prepareOpenURL(parms);
215 }
216 
217 bool TPCHandler::mismatchReprDigest(const std::map<std::string, std::string> & passiveSrvReprDigest, XrdHttpExtReq &req,
218  TPCLogRecord &rec) {
219  if(passiveSrvReprDigest.size()) {
220  for (const auto & [digestName, digestValue]: passiveSrvReprDigest) {
221  auto clientDigestMatch = req.mReprDigest.find(digestName);
222  if (clientDigestMatch != req.mReprDigest.end()) {
223  // We found a checksum type match between the client-provided one and the source server-provided one
224  if (clientDigestMatch->second != digestValue) {
225  // The checksum value does not match, return an error to the client 412 PRECONDITION_FAILED
226  std::stringstream errMsg;
227  errMsg << "Mismatch between client-provided and remote server checksums:"
228  << " client = (" << clientDigestMatch->first << "=" << clientDigestMatch->second << ")"
229  << " server = (" << digestName << "=" << digestValue << ")";
230  logTransferEvent(LogMask::Error, rec, "REPRDIGEST_VERIFY_FAIL", errMsg.str());
231  rec.status=412;
232  req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(errMsg, rec, CURLcode::CURLE_OK).c_str(), 0);
233  return true;
234  }
235  }
236  }
237  }
238  return false;
239 }
240 
241 /******************************************************************************/
242 /* e n c o d e _ x r o o t d _ o p a q u e _ t o _ u r i */
243 /******************************************************************************/
244 
245 // When processing a redirection from the filesystem layer, it is permitted to return
246 // some xrootd opaque data. The quoting rules for xrootd opaque data are significantly
247 // more permissive than a URI (basically, only '&' and '=' are disallowed while some
248 // URI parsers may dislike characters like '"'). This function takes an opaque string
249 // (e.g., foo=1&bar=2&baz=") and makes it safe for all URI parsers.
250 std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
251 {
252  std::stringstream parser(opaque);
253  std::string sequence;
254  std::stringstream output;
255  bool first = true;
256  while (getline(parser, sequence, '&')) {
257  if (sequence.empty()) {continue;}
258  size_t equal_pos = sequence.find('=');
259  char *val = NULL;
260  if (equal_pos != std::string::npos)
261  val = curl_easy_escape(curl, sequence.c_str() + equal_pos + 1, sequence.size() - equal_pos - 1);
262  // Do not emit parameter if value exists and escaping failed.
263  if (!val && equal_pos != std::string::npos) {continue;}
264 
265  if (!first) output << "&";
266  first = false;
267  output << sequence.substr(0, equal_pos);
268  if (val) {
269  output << "=" << val;
270  curl_free(val);
271  }
272  }
273  return output.str();
274 }
275 
276 /******************************************************************************/
277 /* T P C H a n d l e r : : C o n f i g u r e C u r l C A */
278 /******************************************************************************/
279 
280 void
281 TPCHandler::ConfigureCurlCA(CURL *curl)
282 {
283  auto ca_filename = m_ca_file ? m_ca_file->CAFilename() : "";
284  auto crl_filename = m_ca_file ? m_ca_file->CRLFilename() : "";
285  if (!ca_filename.empty() && !crl_filename.empty()) {
286  curl_easy_setopt(curl, CURLOPT_CAINFO, ca_filename.c_str());
287  //Check that the CRL file contains at least one entry before setting this option to curl
288  //Indeed, an empty CRL file will make curl unhappy and therefore will fail
289  //all HTTP TPC transfers (https://github.com/xrootd/xrootd/issues/1543)
290  std::ifstream in(crl_filename, std::ifstream::ate | std::ifstream::binary);
291  if(in.tellg() > 0 && m_ca_file->atLeastOneValidCRLFound()){
292  curl_easy_setopt(curl, CURLOPT_CRLFILE, crl_filename.c_str());
293  if (allowMissingCRL) {
294  // No need to set the callback if there is no need to do it
295  curl_easy_setopt(curl, CURLOPT_SSL_CTX_FUNCTION, ssl_ctx_callback);
296  }
297  } else {
298  std::ostringstream oss;
299  oss << "No valid CRL file has been found in the file " << crl_filename << ". Disabling CRL checking.";
300  m_log.Log(Warning,"TpcHandler",oss.str().c_str());
301  }
302  }
303  else if (!m_cadir.empty()) {
304  curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
305  }
306  if (!m_cafile.empty()) {
307  curl_easy_setopt(curl, CURLOPT_CAINFO, m_cafile.c_str());
308  }
309 }
310 
311 
312 bool TPCHandler::MatchesPath(const char *verb, const char *path) {
313  return !strcmp(verb, "COPY") || !strcmp(verb, "OPTIONS");
314 }
315 
316 /******************************************************************************/
317 /* P r e p a r e U R L */
318 /******************************************************************************/
319 
320 static std::string PrepareURL(const std::string &url)
321 {
322  const std::string replace_schemes[] = { "davs://", "s3://", "s3s://" };
323 
324  for (const auto& s : replace_schemes)
325  if (url.compare(0, s.size(), s) == 0)
326  return "https://" + url.substr(s.size());
327 
328  return url;
329 }
330 
331 static bool IsAllowedScheme(const std::string& url)
332 {
333  const std::string allowed_schemes[] = { "https://", "http://" };
334 
335  for (const auto& s : allowed_schemes)
336  if (url.compare(0, s.size(), s) == 0)
337  return true;
338 
339  return false;
340 }
341 
342 /******************************************************************************/
343 /* T P C H a n d l e r : : P r o c e s s R e q */
344 /******************************************************************************/
345 
347  if (req.verb == "OPTIONS") {
348  return ProcessOptionsReq(req);
349  }
350  auto header = XrdOucTUtils::caseInsensitiveFind(req.headers,"credential");
351  if (header != req.headers.end()) {
352  if (header->second != "none") {
353  m_log.Emsg("ProcessReq", "COPY requested an unsupported credential type: ", header->second.c_str());
354  return req.SendSimpleResp(400, NULL, NULL, "COPY requestd an unsupported Credential type", 0);
355  }
356  }
357  header = XrdOucTUtils::caseInsensitiveFind(req.headers,"source");
358  if (header != req.headers.end()) {
359  std::string src = PrepareURL(header->second);
360  if (!IsAllowedScheme(src)) {
361  const char *error_src = "COPY rejected: disallowed scheme in source URL";
362  m_log.Emsg("ProcessReq", error_src, src.c_str());
363  return req.SendSimpleResp(400, NULL, NULL, error_src, 0);
364  }
365  return ProcessPullReq(src, req);
366  }
367  header = XrdOucTUtils::caseInsensitiveFind(req.headers,"destination");
368  if (header != req.headers.end()) {
369  const std::string& dst = header->second;
370  if (!IsAllowedScheme(dst)) {
371  const char *error_dst = "COPY rejected: disallowed scheme in destination URL";
372  m_log.Emsg("ProcessReq", error_dst, dst.c_str());
373  return req.SendSimpleResp(400, NULL, NULL, error_dst, 0);
374  }
375  return ProcessPushReq(header->second, req);
376  }
377  m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified.");
378  return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0);
379 }
380 
381 /******************************************************************************/
382 /* T P C H a n d l e r D e s t r u c t o r */
383 /******************************************************************************/
384 
386  m_sfs = NULL;
387 }
388 
389 /******************************************************************************/
390 /* T P C H a n d l e r C o n s t r u c t o r */
391 /******************************************************************************/
392 
393 TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) :
394  m_allow_local(false),
395  m_allow_private(true),
396  m_desthttps(false),
397  m_fixed_route(false),
398  m_timeout(60),
399  m_first_timeout(120),
400  m_log(log->logger(), "TPC_"),
401  m_sfs(NULL)
402 {
403  if (!Configure(config, myEnv)) {
404  throw std::runtime_error("Failed to configure the HTTP third-party-copy handler.");
405  }
406 
407 // Extract out the TPC monitoring object (we share it with xrootd).
408 //
409  XrdXrootdGStream *gs = (XrdXrootdGStream*)myEnv->GetPtr("Tpc.gStream*");
410  if (gs)
411  TPCLogRecord::tpcMonitor = new XrdXrootdTpcMon("http",log->logger(),*gs);
412 }
413 
414 /******************************************************************************/
415 /* T P C H a n d l e r : : P r o c e s s O p t i o n s R e q */
416 /******************************************************************************/
417 
421 int TPCHandler::ProcessOptionsReq(XrdHttpExtReq &req) {
422  return req.SendSimpleResp(200, NULL, (char *) "DAV: 1\r\nDAV: <http://apache.org/dav/propset/fs/1>\r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0);
423 }
424 
425 /******************************************************************************/
426 /* T P C H a n d l e r : : G e t A u t h z */
427 /******************************************************************************/
428 
429 std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) {
430  std::string authz;
431  auto authz_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"authorization");
432  if (authz_header != req.headers.end()) {
433  std::stringstream ss;
434  ss << "authz=" << encode_str(authz_header->second);
435  authz += ss.str();
436  }
437  return authz;
438 }
439 
440 /******************************************************************************/
441 /* T P C H a n d l e r : : R e d i r e c t T r a n s f e r */
442 /******************************************************************************/
443 
444 int TPCHandler::RedirectTransfer(CURL *curl, const std::string &redirect_resource,
445  XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &rec)
446 {
447  int port;
448  const char *ptr = error.getErrText(port);
449  if ((ptr == NULL) || (*ptr == '\0') || (port == 0)) {
450  rec.status = 500;
451  std::stringstream ss;
452  ss << "Internal error: redirect without hostname";
453  logTransferEvent(LogMask::Error, rec, "REDIRECT_INTERNAL_ERROR", ss.str());
454  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
455  }
456 
457  // Construct redirection URL taking into consideration any opaque info
458  std::string rdr_info = ptr;
459  std::string host, opaque;
460  size_t pos = rdr_info.find('?');
461  host = rdr_info.substr(0, pos);
462 
463  if (pos != std::string::npos) {
464  opaque = rdr_info.substr(pos + 1);
465  }
466 
467  std::stringstream ss;
468  ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << redirect_resource;
469 
470  if (!opaque.empty()) {
471  ss << "?" << encode_xrootd_opaque_to_uri(curl, opaque);
472  }
473 
474  rec.status = 307;
475  logTransferEvent(LogMask::Info, rec, "REDIRECT", ss.str());
476  return req.SendSimpleResp(rec.status, NULL, const_cast<char *>(ss.str().c_str()),
477  NULL, 0);
478 }
479 
480 /******************************************************************************/
481 /* T P C H a n d l e r : : O p e n W a i t S t a l l */
482 /******************************************************************************/
483 
484 int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource,
485  int mode, int openMode, const XrdSecEntity &sec,
486  const std::string &authz)
487 {
488  int open_result;
489  while (1) {
490  int orig_ucap = fh.error.getUCap();
491  fh.error.setUCap(orig_ucap | XrdOucEI::uIPv64);
492  std::string opaque;
493  size_t pos = resource.find('?');
494  // Extract the path and opaque info from the resource
495  std::string path = resource.substr(0, pos);
496 
497  if (pos != std::string::npos) {
498  opaque = resource.substr(pos + 1);
499  }
500 
501  // Append the authz information if there are some
502  if(!authz.empty()) {
503  opaque += (opaque.empty() ? "" : "&");
504  opaque += authz;
505  }
506  open_result = fh.open(path.c_str(), mode, openMode, &sec, opaque.c_str());
507 
508  if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) {
509  int secs_to_stall = fh.error.getErrInfo();
510  if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
511  std::this_thread::sleep_for (std::chrono::seconds(secs_to_stall));
512  }
513  break;
514  }
515  return open_result;
516 }
517 
518 /******************************************************************************/
519 /* T P C H a n d l e r : : D e t e r m i n e X f e r S i z e */
520 /******************************************************************************/
521 
522 
523 
527 int TPCHandler::PerformHEADRequest(CURL *curl, XrdHttpExtReq &req, State &state,
528  bool &success, TPCLogRecord &rec, bool shouldReturnErrorToClient) {
529  success = false;
530  curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
531  // Set a custom timeout of 60 seconds (= CONNECT_TIMEOUT for convenience) for the HEAD request
532  curl_easy_setopt(curl, CURLOPT_TIMEOUT, CONNECT_TIMEOUT);
533  CURLcode res;
534  res = curl_easy_perform(curl);
535  //Immediately set the CURLOPT_NOBODY flag to 0 as we anyway
536  //don't want the next curl call to do be a HEAD request
537  curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
538  // Reset the CURLOPT_TIMEOUT to no timeout (default)
539  curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);
540  curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
541 
542  std::stringstream ss;
543 
544  if (state.GetStatusCode() >= 400)
545  res = CURLE_HTTP_RETURNED_ERROR;
546 
547  if (res != CURLE_OK) { /* curl failed */
548  ss << curl_easy_strerror(res);
549  switch (res) {
550  case CURLE_HTTP_RETURNED_ERROR: /* remote side may have returned an error */
551  rec.tpc_status = state.GetStatusCode(); /* relay status received from remote side to the client */
552  ss << ": remote host returned '" << rec.tpc_status << " "
553  << httpStatusToString(rec.tpc_status) << "' while fetching file size";
554  break;
555  case CURLE_COULDNT_CONNECT: /* socket callback may have failed */
556  switch (rec.tpc_status) {
557  case 403:
558  ss << ": connection to local/private addresses is forbidden";
559  break;
560  default:
561  ss << ": internal server failure";
562  rec.tpc_status = 500;
563  }
564  break;
565  default:
566  rec.tpc_status = 500;
567  state.SetErrorCode(500);
568  }
569  }
570 
571  if (rec.tpc_status >= 400) {
572  logTransferEvent(LogMask::Error, rec, "HEAD_FAIL", ss.str());
573  return shouldReturnErrorToClient ? req.SendSimpleResp(rec.tpc_status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
574  }
575 
576  success = true;
577  ss << "Successfully determined remote file information for pull request: "
578  << "size=" << state.GetContentLength();
579  if(state.GetReprDigest().size()) {
580  unsigned int cksumIndex = 1;
581  for(const auto & [cksumType,cksumValue]: state.GetReprDigest()) {
582  ss << " chksum" << cksumIndex << "=(" << cksumType << "," << cksumValue << ")";
583  cksumIndex++;
584  }
585  }
586  logTransferEvent(LogMask::Debug, rec, "HEAD_SUCCESS", ss.str());
587  return 0;
588 }
589 
590 int TPCHandler::GetRemoteFileInfoTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength, std::map<std::string,std::string> & reprDigest, bool & success, TPCLogRecord &rec) {
591  State state(curl,req.tpcForwardCreds);
592  //Don't forget to copy the headers of the client's request before doing the HEAD call. Otherwise, if there is a need for authentication,
593  //it will fail
594  state.SetupHeadersForHEAD(req);
595  int result;
596  //In case we cannot get the file HEAD request, we return the error to the client
597  if ((result = PerformHEADRequest(curl, req, state, success, rec)) || !success) {
598  return result;
599  }
600  contentLength = state.GetContentLength();
601  reprDigest = state.GetReprDigest();
602  return result;
603 }
604 
605 /******************************************************************************/
606 /* T P C H a n d l e r : : S e n d P e r f M a r k e r */
607 /******************************************************************************/
608 
609 int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state) {
610  std::stringstream ss;
611  const std::string crlf = "\n";
612  ss << "Perf Marker" << crlf;
613  ss << "Timestamp: " << time(NULL) << crlf;
614  ss << "Stripe Index: 0" << crlf;
615  ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
616  ss << "Total Stripe Count: 1" << crlf;
617  // Include the TCP connection associated with this transfer; used by
618  // the TPC client for monitoring purposes.
619  std::string desc = state.GetConnectionDescription();
620  if (!desc.empty())
621  ss << "RemoteConnections: " << desc << crlf;
622  ss << "End" << crlf;
623  rec.bytes_transferred = state.BytesTransferred();
624  logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
625 
626  return req.ChunkResp(ss.str().c_str(), 0);
627 }
628 
629 /******************************************************************************/
630 /* T P C H a n d l e r : : S e n d P e r f M a r k e r */
631 /******************************************************************************/
632 
633 int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
634  off_t bytes_transferred)
635 {
636  // The 'performance marker' format is largely derived from how GridFTP works
637  // (e.g., the concept of `Stripe` is not quite so relevant here). See:
638  // https://twiki.cern.ch/twiki/bin/view/LCG/HttpTpcTechnical
639  // Example marker:
640  // Perf Marker\n
641  // Timestamp: 1537788010\n
642  // Stripe Index: 0\n
643  // Stripe Bytes Transferred: 238745\n
644  // Total Stripe Count: 1\n
645  // RemoteConnections: tcp:129.93.3.4:1234,tcp:[2600:900:6:1301:268a:7ff:fef6:a590]:2345\n
646  // End\n
647  //
648  std::stringstream ss;
649  const std::string crlf = "\n";
650  ss << "Perf Marker" << crlf;
651  ss << "Timestamp: " << time(NULL) << crlf;
652  ss << "Stripe Index: 0" << crlf;
653  ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf;
654  ss << "Total Stripe Count: 1" << crlf;
655  // Build a list of TCP connections associated with this transfer; used by
656  // the TPC client for monitoring purposes.
657  bool first = true;
658  std::stringstream ss2;
659  for (std::vector<State*>::const_iterator iter = state.begin();
660  iter != state.end(); iter++)
661  {
662  std::string desc = (*iter)->GetConnectionDescription();
663  if (!desc.empty()) {
664  ss2 << (first ? "" : ",") << desc;
665  first = false;
666  }
667  }
668  if (!first)
669  ss << "RemoteConnections: " << ss2.str() << crlf;
670  ss << "End" << crlf;
671  rec.bytes_transferred = bytes_transferred;
672  logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
673 
674  return req.ChunkResp(ss.str().c_str(), 0);
675 }
676 
677 /******************************************************************************/
678 /* T P C H a n d l e r : : R u n C u r l W i t h U p d a t e s */
679 /******************************************************************************/
680 
681 int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
682  TPCLogRecord &rec)
683 {
684  // Create the multi-handle and add in the current transfer to it.
685  CURLM *multi_handle = curl_multi_init();
686  if (!multi_handle) {
687  rec.status = 500;
688  logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL",
689  "Failed to initialize a libcurl multi-handle");
690  std::stringstream ss;
691  ss << "Failed to initialize internal server memory";
692  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
693  }
694 
695  //curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024);
696 
697  CURLMcode mres;
698  mres = curl_multi_add_handle(multi_handle, curl);
699  if (mres) {
700  rec.status = 500;
701  std::stringstream ss;
702  ss << "Failed to add transfer to libcurl multi-handle: HTTP library failure=" << curl_multi_strerror(mres);
703  logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", ss.str());
704  curl_multi_cleanup(multi_handle);
705  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
706  }
707 
708  // Start response to client prior to the first call to curl_multi_perform
709  int retval = req.StartChunkedResp(202, NULL, "Content-Type: text/plain");
710  if (retval) {
711  curl_multi_cleanup(multi_handle);
712  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
713  "Failed to send the initial response to the TPC client");
714  return retval;
715  } else {
716  logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
717  "Initial transfer response sent to the TPC client");
718  }
719 
720  // Transfer loop: use curl to actually run the transfer, but periodically
721  // interrupt things to send back performance updates to the client.
722  int running_handles = 1;
723  time_t last_marker = 0;
724  // Track how long it's been since the last time we recorded more bytes being transferred.
725  off_t last_advance_bytes = 0;
726  time_t last_advance_time = time(NULL);
727  time_t transfer_start = last_advance_time;
728  CURLcode res = static_cast<CURLcode>(-1);
729  do {
730  time_t now = time(NULL);
731  time_t next_marker = last_marker + m_marker_period;
732  if (now >= next_marker) {
733  off_t bytes_xfer = state.BytesTransferred();
734  if (bytes_xfer > last_advance_bytes) {
735  last_advance_bytes = bytes_xfer;
736  last_advance_time = now;
737  }
738  if (SendPerfMarker(req, rec, state)) {
739  curl_multi_remove_handle(multi_handle, curl);
740  curl_multi_cleanup(multi_handle);
741  logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
742  "Failed to send a perf marker to the TPC client");
743  return -1;
744  }
745  int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
746  if (now > last_advance_time + timeout) {
747  const char *log_prefix = rec.log_prefix.c_str();
748  bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
749 
750  state.SetErrorCode(10);
751  std::stringstream ss;
752  ss << "Transfer failed because no bytes have been "
753  << (tpc_pull ? "received from the source (pull mode) in "
754  : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
755  state.SetErrorMessage(ss.str());
756  curl_multi_remove_handle(multi_handle, curl);
757  curl_multi_cleanup(multi_handle);
758  break;
759  }
760  last_marker = now;
761  }
762  // The transfer will start after this point, notify the packet marking manager
763  rec.pmarkManager.startTransfer();
764  mres = curl_multi_perform(multi_handle, &running_handles);
765  if (mres == CURLM_CALL_MULTI_PERFORM) {
766  // curl_multi_perform should be called again immediately. On newer
767  // versions of curl, this is no longer used.
768  continue;
769  } else if (mres != CURLM_OK) {
770  break;
771  } else if (running_handles == 0) {
772  break;
773  }
774 
775  rec.pmarkManager.beginPMarks();
776  //printf("There are %d running handles\n", running_handles);
777 
778  // Harvest any messages, looking for CURLMSG_DONE.
779  CURLMsg *msg;
780  do {
781  int msgq = 0;
782  msg = curl_multi_info_read(multi_handle, &msgq);
783  if (msg && (msg->msg == CURLMSG_DONE)) {
784  CURL *easy_handle = msg->easy_handle;
785  res = msg->data.result;
786  curl_multi_remove_handle(multi_handle, easy_handle);
787  }
788  } while (msg);
789 
790  int64_t max_sleep_time = next_marker - time(NULL);
791  if (max_sleep_time <= 0) {
792  continue;
793  }
794  int fd_count;
795  mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count);
796  if (mres != CURLM_OK) {
797  break;
798  }
799  } while (running_handles);
800 
801  if (mres != CURLM_OK) {
802  std::stringstream ss;
803  ss << "Internal libcurl multi-handle error: HTTP library failure=" << curl_multi_strerror(mres);
804  logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
805 
806  curl_multi_remove_handle(multi_handle, curl);
807  curl_multi_cleanup(multi_handle);
808 
809  if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
810  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
811  "Failed to send error message to the TPC client");
812  return retval;
813  }
814  return req.ChunkResp(NULL, 0);
815  }
816 
817  // Harvest any messages, looking for CURLMSG_DONE.
818  CURLMsg *msg;
819  do {
820  int msgq = 0;
821  msg = curl_multi_info_read(multi_handle, &msgq);
822  if (msg && (msg->msg == CURLMSG_DONE)) {
823  CURL *easy_handle = msg->easy_handle;
824  res = msg->data.result;
825  curl_multi_remove_handle(multi_handle, easy_handle);
826  }
827  } while (msg);
828 
829  if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
830  curl_multi_remove_handle(multi_handle, curl);
831  curl_multi_cleanup(multi_handle);
832  std::stringstream ss;
833  ss << "Internal state error in libcurl";
834  logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
835 
836  if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
837  logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
838  "Failed to send error message to the TPC client");
839  return retval;
840  }
841  return req.ChunkResp(NULL, 0);
842  }
843  curl_multi_cleanup(multi_handle);
844 
845  state.Flush();
846 
847  rec.bytes_transferred = state.BytesTransferred();
848  rec.tpc_status = state.GetStatusCode();
849 
850  // Explicitly finalize the stream (which will close the underlying file
851  // handle) before the response is sent. In some cases, subsequent HTTP
852  // requests can occur before the filesystem is done closing the handle -
853  // and those requests may occur against partial data.
854  state.Finalize();
855 
856  // Generate the final response back to the client.
857  std::stringstream ss;
858  bool success = false;
859  if (state.GetStatusCode() >= 400) {
860  std::string err = state.GetErrorMessage();
861  std::stringstream ss2;
862  ss2 << "Remote side failed with status code " << state.GetStatusCode();
863  if (!err.empty()) {
864  std::replace(err.begin(), err.end(), '\n', ' ');
865  ss2 << "; error message: \"" << err << "\"";
866  }
867  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
868  ss << generateClientErr(ss2, rec);
869  } else if (state.GetErrorCode()) {
870  std::string err = state.GetErrorMessage();
871  if (err.empty()) {err = "(no error message provided)";}
872  else {std::replace(err.begin(), err.end(), '\n', ' ');}
873  std::stringstream ss2;
874  ss2 << "Error when interacting with local filesystem: " << err;
875  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
876  ss << generateClientErr(ss2, rec);
877  } else if (res != CURLE_OK) {
878  std::stringstream ss2;
879  ss2 << "Internal transfer failure";
880  std::stringstream ss3;
881  ss3 << ss2.str() << ": " << curl_easy_strerror(res);
882  logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss3.str());
883  ss << generateClientErr(ss2, rec, res);
884  } else {
885  ss << "success: Created";
886  success = true;
887  }
888 
889  if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
890  logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
891  "Failed to send last update to remote client");
892  return retval;
893  } else if (success) {
894  logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
895  rec.status = 0;
896  }
897  return req.ChunkResp(NULL, 0);
898 }
899 
900 /******************************************************************************/
901 /* T P C H a n d l e r : : P r o c e s s P u s h R e q */
902 /******************************************************************************/
903 
904 int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) {
905  TPCLogRecord rec(req, TpcType::Push);
906  rec.allow_local = m_allow_local;
907  rec.allow_private = m_allow_private;
908  rec.log_prefix = "PushRequest";
909  rec.local = req.resource;
910  rec.remote = resource;
911  rec.m_log = &m_log;
912  char *name = req.GetSecEntity().name;
913  req.GetClientID(rec.clID);
914  if (name) rec.name = name;
915  logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request");
916 
917  ManagedCurlHandle curlPtr(curl_easy_init());
918  auto curl = curlPtr.get();
919  if (!curl) {
920  std::stringstream ss;
921  ss << "Failed to initialize internal transfer resources";
922  rec.status = 500;
923  logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", ss.str());
924  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
925  }
926  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
927  curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
928 #if CURL_AT_LEAST_VERSION(7, 85, 0)
929  curl_easy_setopt(curl, CURLOPT_PROTOCOLS_STR, "https,http");
930  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS_STR, "https,http");
931 #else
932  long protocols = CURLPROTO_HTTP | CURLPROTO_HTTPS;
933  curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
934  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
935 #endif
936  curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
937  curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
938  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
939  curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
940  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
941  curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
942 
943  auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
944  std::string redirect_resource = req.resource;
945  if (query_header != req.headers.end()) {
946  redirect_resource = query_header->second;
947  }
948 
949  AtomicBeg(m_monid_mutex);
950  uint64_t file_monid = AtomicInc(m_monid);
951  AtomicEnd(m_monid_mutex);
952  std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
953  if (!fh.get()) {
954  rec.status = 500;
955  std::stringstream ss;
956  ss << "Failed to initialize internal transfer file handle";
957  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL",
958  ss.str());
959  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
960  }
961  std::string full_url = prepareURL(req);
962 
963  std::string authz = GetAuthz(req);
964 
965  int open_results = OpenWaitStall(*fh, full_url, SFS_O_RDONLY, 0644,
966  req.GetSecEntity(), authz);
967  if (SFS_REDIRECT == open_results) {
968  int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
969  return result;
970  } else if (SFS_OK != open_results) {
971  int code;
972  std::stringstream ss;
973  const char *msg = fh->error.getErrText(code);
974  if (msg == NULL) ss << "Failed to open local resource";
975  else ss << msg;
976  rec.status = mapErrNoToHttp(code);
977  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", msg);
978  int resp_result = req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
979  fh->close();
980  return resp_result;
981  }
982  ConfigureCurlCA(curl);
983  curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
984 
985  Stream stream(std::move(fh), 0, 0, m_log);
986  State state(0, stream, curl, true, req.tpcForwardCreds);
987  state.SetupHeaders(req);
988 
989  return RunCurlWithUpdates(curl, req, state, rec);
990 }
991 
992 /******************************************************************************/
993 /* T P C H a n d l e r : : P r o c e s s P u l l R e q */
994 /******************************************************************************/
995 
996 int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) {
997  TPCLogRecord rec(req,TpcType::Pull);
998  rec.allow_local = m_allow_local;
999  rec.allow_private = m_allow_private;
1000  rec.log_prefix = "PullRequest";
1001  rec.local = req.resource;
1002  rec.remote = resource;
1003  rec.m_log = &m_log;
1004  char *name = req.GetSecEntity().name;
1005  req.GetClientID(rec.clID);
1006  if (name) rec.name = name;
1007  logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a pull request");
1008 
1009  ManagedCurlHandle curlPtr(curl_easy_init());
1010  auto curl = curlPtr.get();
1011  if (!curl) {
1012  std::stringstream ss;
1013  ss << "Failed to initialize internal transfer resources";
1014  rec.status = 500;
1015  logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
1016  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1017  }
1018 
1019  // ddavila 2023-01-05:
1020  // The following change was required by the Rucio/SENSE project where
1021  // multiple IP addresses, each from a different subnet, are assigned to a
1022  // single server and routed differently by SENSE.
1023  // The above requires the server to utilize the same IP, that was used to
1024  // start the TPC, for the resolution of the given TPC instead of
1025  // using any of the IPs available.
1026  if (m_fixed_route){
1027  XrdNetAddr *nP;
1028  int numIP = 0;
1029  char buff[1024];
1030  char * ip;
1031 
1032  // Get the hostname used to contact the server from the http header
1033  auto host_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"host");
1034  std::string host_used;
1035  if (host_header != req.headers.end()) {
1036  host_used = host_header->second;
1037  }
1038 
1039  // Get the IP addresses associated with the above hostname
1040  XrdNetUtils::GetAddrs(host_used.c_str(), &nP, numIP, XrdNetUtils::prefAuto, 0);
1041  int ip_size = nP[0].Format(buff, 1024, XrdNetAddrInfo::fmtAddr,XrdNetAddrInfo::noPort);
1042  ip = (char *)malloc(ip_size-1);
1043 
1044  // Substring to get only the address, remove brackets and garbage
1045  memcpy(ip, buff+1, ip_size-2);
1046  ip[ip_size-2]='\0';
1047  logTransferEvent(LogMask::Info, rec, "LOCAL IP", ip);
1048 
1049  curl_easy_setopt(curl, CURLOPT_INTERFACE, ip);
1050  }
1051  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
1052  curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
1053 #if CURL_AT_LEAST_VERSION(7, 85, 0)
1054  curl_easy_setopt(curl, CURLOPT_PROTOCOLS_STR, "https,http");
1055  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS_STR, "https,http");
1056 #else
1057  long protocols = CURLPROTO_HTTP | CURLPROTO_HTTPS;
1058  curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
1059  curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
1060 #endif
1061  curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
1062  curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
1063  curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
1064  curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA , &rec);
1065  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
1066  curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
1067  curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
1068  std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
1069  if (!fh.get()) {
1070  std::stringstream ss;
1071  ss << "Failed to initialize internal transfer file handle";
1072  rec.status = 500;
1073  logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
1074  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1075  }
1076  auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
1077  std::string redirect_resource = req.resource;
1078  if (query_header != req.headers.end()) {
1079  redirect_resource = query_header->second;
1080  }
1082  auto overwrite_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"overwrite");
1083  if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) {
1084  if (! usingEC) mode = SFS_O_TRUNC;
1085  }
1086  int streams = 1;
1087  {
1088  auto streams_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"x-number-of-streams");
1089  if (streams_header != req.headers.end()) {
1090  int stream_req = -1;
1091  try {
1092  stream_req = std::stol(streams_header->second);
1093  } catch (...) { // Handled below
1094  }
1095  if (stream_req < 0 || stream_req > 100) {
1096  std::stringstream ss;
1097  ss << "Invalid request for number of streams";
1098  rec.status = 400;
1099  logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", ss.str());
1100  return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1101  }
1102  streams = stream_req == 0 ? 1 : stream_req;
1103  }
1104  }
1105  rec.streams = streams;
1106  std::string full_url = prepareURL(req);
1107  std::string authz = GetAuthz(req);
1108  curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
1109  ConfigureCurlCA(curl);
1110  uint64_t sourceFileContentLength = 0;
1111  {
1112  //Get the content-length of the source file and pass it to the OSS layer
1113  //during the open
1114  bool success = false;
1115  bool mismatchDigests = false;
1116  std::map<std::string,std::string> sourceFileReprDigest;
1117  GetRemoteFileInfoTPCPull(curl, req, sourceFileContentLength, sourceFileReprDigest, success, rec);
1118  if(success) {
1119  //In the case we cannot get the information from the source server (offline or other error)
1120  //we just don't add the file information to the opaque of the local file to open
1121  full_url += "&oss.asize=" + std::to_string(sourceFileContentLength);
1122  mismatchDigests = mismatchReprDigest(sourceFileReprDigest,req,rec);
1123  }
1124  if(!success || mismatchDigests) {
1125  // We could not get remote file information, or the checksum provided by the client
1126  // does not match the source file one, we already sent the error to the client so we
1127  // just exit here
1128  return 0;
1129  }
1130  }
1131  int open_result = OpenWaitStall(*fh, full_url, mode|SFS_O_WRONLY,
1132  0644 | SFS_O_MKPTH,
1133  req.GetSecEntity(), authz);
1134  if (SFS_REDIRECT == open_result) {
1135  int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
1136  return result;
1137  } else if (SFS_OK != open_result) {
1138  int code;
1139  std::stringstream ss;
1140  const char *msg = fh->error.getErrText(code);
1141  if ((msg == NULL) || (*msg == '\0')) ss << "Failed to open local resource";
1142  else ss << msg;
1143  rec.status = mapErrNoToHttp(code);
1144  logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", ss.str());
1145  int resp_result = req.SendSimpleResp(rec.status, NULL, NULL,
1146  generateClientErr(ss, rec).c_str(), 0);
1147  fh->close();
1148  return resp_result;
1149  }
1150  Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
1151  State state(0, stream, curl, false, req.tpcForwardCreds);
1152  state.SetupHeaders(req);
1153  state.SetContentLength(sourceFileContentLength);
1154 
1155  if (streams > 1) {
1156  return RunCurlWithStreams(req, state, streams, rec);
1157  } else {
1158  return RunCurlWithUpdates(curl, req, state, rec);
1159  }
1160 }
1161 
1162 /******************************************************************************/
1163 /* T P C H a n d l e r : : l o g T r a n s f e r E v e n t */
1164 /******************************************************************************/
1165 
1166 void TPCHandler::logTransferEvent(LogMask mask, const TPCLogRecord &rec,
1167  const std::string &event, const std::string &message)
1168 {
1169  if (!(m_log.getMsgMask() & mask)) {return;}
1170 
1171  std::stringstream ss;
1172  ss << "event=" << event << ", local=" << rec.local << ", remote=" << rec.remote;
1173  if (rec.name.empty())
1174  ss << ", user=(anonymous)";
1175  else
1176  ss << ", user=" << rec.name;
1177  if (rec.streams != 1)
1178  ss << ", streams=" << rec.streams;
1179  if (rec.bytes_transferred >= 0)
1180  ss << ", bytes_transferred=" << rec.bytes_transferred;
1181  if (rec.status >= 0)
1182  ss << ", status=" << rec.status;
1183  if (rec.tpc_status >= 0)
1184  ss << ", tpc_status=" << rec.tpc_status;
1185  if (!message.empty())
1186  ss << "; " << message;
1187  m_log.Log(mask, rec.log_prefix.c_str(), ss.str().c_str());
1188 }
1189 
1190 std::string TPCHandler::generateClientErr(std::stringstream &err_ss, const TPCLogRecord &rec, CURLcode cCode) {
1191  std::stringstream ssret;
1192  ssret << "failure: " << err_ss.str() << ", local=" << rec.local <<", remote=" << rec.remote;
1193  if(cCode != CURLcode::CURLE_OK) {
1194  ssret << ", HTTP library failure=" << curl_easy_strerror(cCode);
1195  }
1196  return ssret.str();
1197 }
1198 /******************************************************************************/
1199 /* X r d H t t p G e t E x t H a n d l e r */
1200 /******************************************************************************/
1201 
1202 extern "C" {
1203 
1204 XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) {
1205  if (curl_global_init(CURL_GLOBAL_DEFAULT)) {
1206  log->Emsg("TPCInitialize", "libcurl failed to initialize");
1207  return NULL;
1208  }
1209 
1210  TPCHandler *retval{NULL};
1211  if (!config) {
1212  log->Emsg("TPCInitialize", "TPC handler requires a config filename in order to load");
1213  return NULL;
1214  }
1215  try {
1216  log->Emsg("TPCInitialize", "Will load configuration for the TPC handler from", config);
1217  retval = new TPCHandler(log, config, myEnv);
1218  } catch (std::runtime_error &re) {
1219  log->Emsg("TPCInitialize", "Encountered a runtime failure when loading ", re.what());
1220  //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*"));
1221  }
1222  return retval;
1223 }
1224 
1225 }
void CURL
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC)
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdSysError *log, const char *config, const char *, XrdOucEnv *myEnv)
static std::string PrepareURL(const std::string &url)
std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
static bool IsAllowedScheme(const std::string &url)
int mapErrNoToHttp(int errNo)
std::string httpStatusToString(int status)
Utility functions for XrdHTTP.
std::string encode_str(const std::string &str)
#define close(a)
Definition: XrdPosix.hh:48
bool Debug
void getline(uchar *buff, int blen)
#define SFS_REDIRECT
#define SFS_O_MKPTH
#define SFS_STALL
#define SFS_O_RDONLY
#define SFS_STARTED
#define SFS_O_WRONLY
#define SFS_O_CREAT
int XrdSfsFileOpenMode
#define SFS_OK
#define SFS_O_TRUNC
#define AtomicInc(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
@ Error
int GetStatusCode() const
off_t BytesTransferred() const
void SetErrorMessage(const std::string &error_msg)
int GetErrorCode() const
std::string GetErrorMessage() const
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
void SetContentLength(const off_t content_length)
off_t GetContentLength() const
void SetErrorCode(int error_code)
const std::map< std::string, std::string > & GetReprDigest() const
void SetupHeadersForHEAD(XrdHttpExtReq &req)
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
virtual int ProcessReq(XrdHttpExtReq &req)
virtual ~TPCHandler()
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
void GetClientID(std::string &clid)
std::map< std::string, std::string > & headers
std::string resource
std::string verb
std::map< std::string, std::string > mReprDigest
Repr-Digest map where the key is the digest name and the value is the base64 encoded digest value.
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
const XrdSecEntity & GetSecEntity() const
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
static std::string prepareOpenURL(PrepareOpenURLParams &params)
static const int noPort
Do not add port number.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
static const char * GetAddrs(const char *hSpec, XrdNetAddr *aListP[], int &aListN, AddrOpts opts=allIPMap, int pNum=PortInSpec)
Definition: XrdNetUtils.cc:274
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
const char * getErrText()
void setUCap(int ucval)
Set user capabilties.
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
Definition: XrdOucTUtils.hh:79
char * name
Entity's name.
Definition: XrdSecEntity.hh:69
virtual XrdSfsFile * newFile(char *user=0, int MonID=0)=0
XrdOucErrInfo & error
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client=0, const char *opaque=0)=0
virtual int close()=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:116
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:175
int getMsgMask()
Definition: XrdSysError.hh:190
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
Definition: XrdSysError.hh:167
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
@ Warning
void operator()(CURL *curl)
static const int uIPv64
ucap: Supports only IPv4 info
static const int isaPush