XRootD
Loading...
Searching...
No Matches
XrdHttpTpcTPC.cc
Go to the documentation of this file.
4#include "XrdOuc/XrdOucEnv.hh"
8#include "XrdSys/XrdSysFD.hh"
9#include "XrdVersion.hh"
10
15
16#include <curl/curl.h>
17
18#include <dlfcn.h>
19#include <fcntl.h>
20
21#include <algorithm>
22#include <memory>
23#include <sstream>
24#include <stdexcept>
25#include <thread>
26
27#include "XrdHttpTpcState.hh"
28#include "XrdHttpTpcStream.hh"
29#include "XrdHttpTpcTPC.hh"
30#include <fstream>
31
32using namespace TPC;
33
34XrdXrootdTpcMon* TPCHandler::TPCLogRecord::tpcMonitor = 0;
35
36uint64_t TPCHandler::m_monid{0};
37int TPCHandler::m_marker_period = 5;
38size_t TPCHandler::m_block_size = 16*1024*1024;
39size_t TPCHandler::m_small_block_size = 1*1024*1024;
40XrdSysMutex TPCHandler::m_monid_mutex;
41
43
44/******************************************************************************/
45/* T P C H a n d l e r : : T P C L o g R e c o r d D e s t r u c t o r */
46/******************************************************************************/
47
48TPCHandler::TPCLogRecord::~TPCLogRecord()
49{
50// Record monitoring data is enabled
51//
52 if (tpcMonitor)
54
55 monInfo.clID = clID.c_str();
56 monInfo.begT = begT;
57 gettimeofday(&monInfo.endT, 0);
58
59 if (mTpcType == TpcType::Pull)
60 {monInfo.dstURL = local.c_str();
61 monInfo.srcURL = remote.c_str();
62 } else {
63 monInfo.dstURL = remote.c_str();
64 monInfo.srcURL = local.c_str();
66 }
67
68 if (!status) monInfo.endRC = 0;
69 else if (tpc_status > 0) monInfo.endRC = tpc_status;
70 else monInfo.endRC = 1;
71 monInfo.strm = static_cast<unsigned char>(streams);
72 monInfo.fSize = (bytes_transferred < 0 ? 0 : bytes_transferred);
73 if (!isIPv6) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;
74
75 tpcMonitor->Report(monInfo);
76 }
77}
78
79/******************************************************************************/
80/* C u r l D e l e t e r : : o p e r a t o r ( ) */
81/******************************************************************************/
82
84{
85 if (curl) curl_easy_cleanup(curl);
86}
87
88/******************************************************************************/
89/* s o c k o p t _ s e t c l o e x e c _ c a l l b a c k */
90/******************************************************************************/
91
100int TPCHandler::sockopt_callback(void *clientp, curl_socket_t curlfd, curlsocktype purpose) {
101 TPCLogRecord * rec = (TPCLogRecord *)clientp;
102 if (purpose == CURLSOCKTYPE_IPCXN && rec && rec->pmarkManager.isEnabled()) {
103 // We will not reach this callback if the corresponding socket could not have been connected
104 // the socket is already connected only if the packet marking is enabled
105 return CURL_SOCKOPT_ALREADY_CONNECTED;
106 }
107 return CURL_SOCKOPT_OK;
108}
109
110/******************************************************************************/
111/* o p e n s o c k e t _ c a l l b a c k */
112/******************************************************************************/
113
114
119int TPCHandler::opensocket_callback(void *clientp,
120 curlsocktype purpose,
121 struct curl_sockaddr *aInfo)
122{
123 /* CURLSOCKTYPE_IPCXN (for IP based connections) is the only type currently known by curl,
124 * so let's make sure to reject other types if they appear in the furure */
125 if (purpose != CURLSOCKTYPE_IPCXN)
126 return CURL_SOCKET_BAD;
127
128 if (!aInfo)
129 return CURL_SOCKET_BAD;
130
131 // Create the socket (note that O_CLOEXEC flag will be set)
132 int fd = XrdSysFD_Socket(aInfo->family, aInfo->socktype, aInfo->protocol);
133
134 if (fd < 0) {
135 return CURL_SOCKET_BAD;
136 }
137
138 if (!clientp)
139 return fd;
140
141 XrdNetAddr thePeer(&(aInfo->addr));
142 TPCLogRecord *rec = static_cast<TPCLogRecord*>(clientp);
143
144 /* Reject attempts to connect to local/private addresses unless allowed by configuration */
145 if ((!rec->allow_private && thePeer.isPrivate()) || (!rec->allow_local && thePeer.isLocal())) {
146 rec->tpc_status = 403; // Forbidden
147 rec->m_log->Emsg(rec->log_prefix.c_str(),
148 "Connection to local/private address is forbidden");
149 close(fd);
150 return CURL_SOCKET_BAD;
151 }
152
153 rec->isIPv6 = (thePeer.isIPType(XrdNetAddrInfo::IPv6) && !thePeer.isMapped());
154
155 std::stringstream connectErrMsg;
156 if(!rec->pmarkManager.connect(fd, &(aInfo->addr), aInfo->addrlen, CONNECT_TIMEOUT, connectErrMsg)) {
157 rec->m_log->Emsg(rec->log_prefix.c_str(), "Unable to connect socket: ", connectErrMsg.str().c_str());
158 close(fd);
159 return CURL_SOCKET_BAD;
160 }
161
162 return fd;
163}
164
165int TPCHandler::closesocket_callback(void *clientp, curl_socket_t fd) {
166 TPCLogRecord * rec = (TPCLogRecord *)clientp;
167
168 // Destroy the PMark handle associated to the file descriptor before closing it.
169 // Otherwise, we would lose the socket usage information if the socket is closed before
170 // the PMark handle is closed.
171 rec->pmarkManager.endPmark(fd);
172
173 return close(fd);
174}
175
176/******************************************************************************/
177/* p r e p a r e U R L */
178/******************************************************************************/
179
180// See XrdHttpTpcUtils::prepareOpenURL() documentation
181std::string TPCHandler::prepareURL(XrdHttpExtReq &req) {
182 return XrdHttpTpcUtils::prepareOpenURL(req.resource, req.headers,hdr2cgimap);
183}
184
185/******************************************************************************/
186/* e n c o d e _ x r o o t d _ o p a q u e _ t o _ u r i */
187/******************************************************************************/
188
189// When processing a redirection from the filesystem layer, it is permitted to return
190// some xrootd opaque data. The quoting rules for xrootd opaque data are significantly
191// more permissive than a URI (basically, only '&' and '=' are disallowed while some
192// URI parsers may dislike characters like '"'). This function takes an opaque string
193// (e.g., foo=1&bar=2&baz=") and makes it safe for all URI parsers.
194std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
195{
196 std::stringstream parser(opaque);
197 std::string sequence;
198 std::stringstream output;
199 bool first = true;
200 while (getline(parser, sequence, '&')) {
201 if (sequence.empty()) {continue;}
202 size_t equal_pos = sequence.find('=');
203 char *val = NULL;
204 if (equal_pos != std::string::npos)
205 val = curl_easy_escape(curl, sequence.c_str() + equal_pos + 1, sequence.size() - equal_pos - 1);
206 // Do not emit parameter if value exists and escaping failed.
207 if (!val && equal_pos != std::string::npos) {continue;}
208
209 if (!first) output << "&";
210 first = false;
211 output << sequence.substr(0, equal_pos);
212 if (val) {
213 output << "=" << val;
214 curl_free(val);
215 }
216 }
217 return output.str();
218}
219
220/******************************************************************************/
221/* T P C H a n d l e r : : C o n f i g u r e C u r l C A */
222/******************************************************************************/
223
224void
225TPCHandler::ConfigureCurlCA(CURL *curl)
226{
227 auto ca_filename = m_ca_file ? m_ca_file->CAFilename() : "";
228 auto crl_filename = m_ca_file ? m_ca_file->CRLFilename() : "";
229 if (!ca_filename.empty() && !crl_filename.empty()) {
230 curl_easy_setopt(curl, CURLOPT_CAINFO, ca_filename.c_str());
231 //Check that the CRL file contains at least one entry before setting this option to curl
232 //Indeed, an empty CRL file will make curl unhappy and therefore will fail
233 //all HTTP TPC transfers (https://github.com/xrootd/xrootd/issues/1543)
234 std::ifstream in(crl_filename, std::ifstream::ate | std::ifstream::binary);
235 if(in.tellg() > 0 && m_ca_file->atLeastOneValidCRLFound()){
236 curl_easy_setopt(curl, CURLOPT_CRLFILE, crl_filename.c_str());
237 } else {
238 std::ostringstream oss;
239 oss << "No valid CRL file has been found in the file " << crl_filename << ". Disabling CRL checking.";
240 m_log.Log(Warning,"TpcHandler",oss.str().c_str());
241 }
242 }
243 else if (!m_cadir.empty()) {
244 curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
245 }
246 if (!m_cafile.empty()) {
247 curl_easy_setopt(curl, CURLOPT_CAINFO, m_cafile.c_str());
248 }
249}
250
251
252bool TPCHandler::MatchesPath(const char *verb, const char *path) {
253 return !strcmp(verb, "COPY") || !strcmp(verb, "OPTIONS");
254}
255
256/******************************************************************************/
257/* P r e p a r e U R L */
258/******************************************************************************/
259
260static std::string PrepareURL(const std::string &url)
261{
262 const std::string replace_schemes[] = { "davs://", "s3://", "s3s://" };
263
264 for (const auto& s : replace_schemes)
265 if (url.compare(0, s.size(), s) == 0)
266 return "https://" + url.substr(s.size());
267
268 return url;
269}
270
271static bool IsAllowedScheme(const std::string& url)
272{
273 const std::string allowed_schemes[] = { "https://", "http://" };
274
275 for (const auto& s : allowed_schemes)
276 if (url.compare(0, s.size(), s) == 0)
277 return true;
278
279 return false;
280}
281
282/******************************************************************************/
283/* T P C H a n d l e r : : P r o c e s s R e q */
284/******************************************************************************/
285
287 if (req.verb == "OPTIONS") {
288 return ProcessOptionsReq(req);
289 }
290 auto header = XrdOucTUtils::caseInsensitiveFind(req.headers,"credential");
291 if (header != req.headers.end()) {
292 if (header->second != "none") {
293 m_log.Emsg("ProcessReq", "COPY requested an unsupported credential type: ", header->second.c_str());
294 return req.SendSimpleResp(400, NULL, NULL, "COPY requestd an unsupported Credential type", 0);
295 }
296 }
297 header = XrdOucTUtils::caseInsensitiveFind(req.headers,"source");
298 if (header != req.headers.end()) {
299 std::string src = PrepareURL(header->second);
300 if (!IsAllowedScheme(src)) {
301 const char *error_src = "COPY rejected: disallowed scheme in source URL";
302 m_log.Emsg("ProcessReq", error_src, src.c_str());
303 return req.SendSimpleResp(400, NULL, NULL, error_src, 0);
304 }
305 return ProcessPullReq(src, req);
306 }
307 header = XrdOucTUtils::caseInsensitiveFind(req.headers,"destination");
308 if (header != req.headers.end()) {
309 const std::string& dst = header->second;
310 if (!IsAllowedScheme(dst)) {
311 const char *error_dst = "COPY rejected: disallowed scheme in destination URL";
312 m_log.Emsg("ProcessReq", error_dst, dst.c_str());
313 return req.SendSimpleResp(400, NULL, NULL, error_dst, 0);
314 }
315 return ProcessPushReq(header->second, req);
316 }
317 m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified.");
318 return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0);
319}
320
321/******************************************************************************/
322/* T P C H a n d l e r D e s t r u c t o r */
323/******************************************************************************/
324
326 m_sfs = NULL;
327}
328
329/******************************************************************************/
330/* T P C H a n d l e r C o n s t r u c t o r */
331/******************************************************************************/
332
333TPCHandler::TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv) :
334 m_allow_local(false),
335 m_allow_private(true),
336 m_desthttps(false),
337 m_fixed_route(false),
338 m_timeout(60),
339 m_first_timeout(120),
340 m_log(log->logger(), "TPC_"),
341 m_sfs(NULL)
342{
343 if (!Configure(config, myEnv)) {
344 throw std::runtime_error("Failed to configure the HTTP third-party-copy handler.");
345 }
346
347// Extract out the TPC monitoring object (we share it with xrootd).
348//
349 XrdXrootdGStream *gs = (XrdXrootdGStream*)myEnv->GetPtr("Tpc.gStream*");
350 if (gs)
351 TPCLogRecord::tpcMonitor = new XrdXrootdTpcMon("http",log->logger(),*gs);
352}
353
354/******************************************************************************/
355/* T P C H a n d l e r : : P r o c e s s O p t i o n s R e q */
356/******************************************************************************/
357
361int TPCHandler::ProcessOptionsReq(XrdHttpExtReq &req) {
362 return req.SendSimpleResp(200, NULL, (char *) "DAV: 1\r\nDAV: <http://apache.org/dav/propset/fs/1>\r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0);
363}
364
365/******************************************************************************/
366/* T P C H a n d l e r : : G e t A u t h z */
367/******************************************************************************/
368
369std::string TPCHandler::GetAuthz(XrdHttpExtReq &req) {
370 std::string authz;
371 auto authz_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"authorization");
372 if (authz_header != req.headers.end()) {
373 std::stringstream ss;
374 ss << "authz=" << encode_str(authz_header->second);
375 authz += ss.str();
376 }
377 return authz;
378}
379
380/******************************************************************************/
381/* T P C H a n d l e r : : R e d i r e c t T r a n s f e r */
382/******************************************************************************/
383
384int TPCHandler::RedirectTransfer(CURL *curl, const std::string &redirect_resource,
385 XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &rec)
386{
387 int port;
388 const char *ptr = error.getErrText(port);
389 if ((ptr == NULL) || (*ptr == '\0') || (port == 0)) {
390 rec.status = 500;
391 std::stringstream ss;
392 ss << "Internal error: redirect without hostname";
393 logTransferEvent(LogMask::Error, rec, "REDIRECT_INTERNAL_ERROR", ss.str());
394 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
395 }
396
397 // Construct redirection URL taking into consideration any opaque info
398 std::string rdr_info = ptr;
399 std::string host, opaque;
400 size_t pos = rdr_info.find('?');
401 host = rdr_info.substr(0, pos);
402
403 if (pos != std::string::npos) {
404 opaque = rdr_info.substr(pos + 1);
405 }
406
407 std::stringstream ss;
408 ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << redirect_resource;
409
410 if (!opaque.empty()) {
411 ss << "?" << encode_xrootd_opaque_to_uri(curl, opaque);
412 }
413
414 rec.status = 307;
415 logTransferEvent(LogMask::Info, rec, "REDIRECT", ss.str());
416 return req.SendSimpleResp(rec.status, NULL, const_cast<char *>(ss.str().c_str()),
417 NULL, 0);
418}
419
420/******************************************************************************/
421/* T P C H a n d l e r : : O p e n W a i t S t a l l */
422/******************************************************************************/
423
424int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource,
425 int mode, int openMode, const XrdSecEntity &sec,
426 const std::string &authz)
427{
428 int open_result;
429 while (1) {
430 int orig_ucap = fh.error.getUCap();
431 fh.error.setUCap(orig_ucap | XrdOucEI::uIPv64);
432 std::string opaque;
433 size_t pos = resource.find('?');
434 // Extract the path and opaque info from the resource
435 std::string path = resource.substr(0, pos);
436
437 if (pos != std::string::npos) {
438 opaque = resource.substr(pos + 1);
439 }
440
441 // Append the authz information if there are some
442 if(!authz.empty()) {
443 opaque += (opaque.empty() ? "" : "&");
444 opaque += authz;
445 }
446 open_result = fh.open(path.c_str(), mode, openMode, &sec, opaque.c_str());
447
448 if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) {
449 int secs_to_stall = fh.error.getErrInfo();
450 if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
451 std::this_thread::sleep_for (std::chrono::seconds(secs_to_stall));
452 }
453 break;
454 }
455 return open_result;
456}
457
458/******************************************************************************/
459/* T P C H a n d l e r : : D e t e r m i n e X f e r S i z e */
460/******************************************************************************/
461
462
463
467int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state,
468 bool &success, TPCLogRecord &rec, bool shouldReturnErrorToClient) {
469 success = false;
470 curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
471 // Set a custom timeout of 60 seconds (= CONNECT_TIMEOUT for convenience) for the HEAD request
472 curl_easy_setopt(curl, CURLOPT_TIMEOUT, CONNECT_TIMEOUT);
473 CURLcode res;
474 res = curl_easy_perform(curl);
475 //Immediately set the CURLOPT_NOBODY flag to 0 as we anyway
476 //don't want the next curl call to do be a HEAD request
477 curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
478 // Reset the CURLOPT_TIMEOUT to no timeout (default)
479 curl_easy_setopt(curl, CURLOPT_TIMEOUT, 0L);
480 curl_easy_setopt(curl, CURLOPT_FAILONERROR, true);
481
482 std::stringstream ss;
483
484 if (state.GetStatusCode() >= 400)
485 res = CURLE_HTTP_RETURNED_ERROR;
486
487 if (res != CURLE_OK) { /* curl failed */
488 ss << curl_easy_strerror(res);
489 switch (res) {
490 case CURLE_HTTP_RETURNED_ERROR: /* remote side may have returned an error */
491 rec.tpc_status = state.GetStatusCode(); /* relay status received from remote side to the client */
492 ss << ": remote host returned '" << rec.tpc_status << " "
493 << httpStatusToString(rec.tpc_status) << "' while fetching file size";
494 break;
495 case CURLE_COULDNT_CONNECT: /* socket callback may have failed */
496 switch (rec.tpc_status) {
497 case 403:
498 ss << ": connection to local/private addresses is forbidden";
499 break;
500 default:
501 ss << ": internal server failure";
502 rec.tpc_status = 500;
503 }
504 break;
505 default:
506 rec.tpc_status = 500;
507 state.SetErrorCode(500);
508 }
509 }
510
511 if (rec.tpc_status >= 400) {
512 logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
513 return shouldReturnErrorToClient ? req.SendSimpleResp(rec.tpc_status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
514 }
515
516 success = true;
517 ss << "Successfully determined remote size for pull request: " << state.GetContentLength();
518 logTransferEvent(LogMask::Debug, rec, "SIZE_SUCCESS", ss.str());
519 return 0;
520}
521
522int TPCHandler::GetContentLengthTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength, bool & success, TPCLogRecord &rec) {
523 State state(curl,req.tpcForwardCreds);
524 //Don't forget to copy the headers of the client's request before doing the HEAD call. Otherwise, if there is a need for authentication,
525 //it will fail
526 state.SetupHeaders(req);
527 int result;
528 //In case we cannot get the content length, we return the error to the client
529 if ((result = DetermineXferSize(curl, req, state, success, rec)) || !success) {
530 return result;
531 }
532 contentLength = state.GetContentLength();
533 return result;
534}
535
536/******************************************************************************/
537/* T P C H a n d l e r : : S e n d P e r f M a r k e r */
538/******************************************************************************/
539
540int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state) {
541 std::stringstream ss;
542 const std::string crlf = "\n";
543 ss << "Perf Marker" << crlf;
544 ss << "Timestamp: " << time(NULL) << crlf;
545 ss << "Stripe Index: 0" << crlf;
546 ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
547 ss << "Total Stripe Count: 1" << crlf;
548 // Include the TCP connection associated with this transfer; used by
549 // the TPC client for monitoring purposes.
550 std::string desc = state.GetConnectionDescription();
551 if (!desc.empty())
552 ss << "RemoteConnections: " << desc << crlf;
553 ss << "End" << crlf;
554 rec.bytes_transferred = state.BytesTransferred();
555 logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
556
557 return req.ChunkResp(ss.str().c_str(), 0);
558}
559
560/******************************************************************************/
561/* T P C H a n d l e r : : S e n d P e r f M a r k e r */
562/******************************************************************************/
563
564int TPCHandler::SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
565 off_t bytes_transferred)
566{
567 // The 'performance marker' format is largely derived from how GridFTP works
568 // (e.g., the concept of `Stripe` is not quite so relevant here). See:
569 // https://twiki.cern.ch/twiki/bin/view/LCG/HttpTpcTechnical
570 // Example marker:
571 // Perf Marker\n
572 // Timestamp: 1537788010\n
573 // Stripe Index: 0\n
574 // Stripe Bytes Transferred: 238745\n
575 // Total Stripe Count: 1\n
576 // RemoteConnections: tcp:129.93.3.4:1234,tcp:[2600:900:6:1301:268a:7ff:fef6:a590]:2345\n
577 // End\n
578 //
579 std::stringstream ss;
580 const std::string crlf = "\n";
581 ss << "Perf Marker" << crlf;
582 ss << "Timestamp: " << time(NULL) << crlf;
583 ss << "Stripe Index: 0" << crlf;
584 ss << "Stripe Bytes Transferred: " << bytes_transferred << crlf;
585 ss << "Total Stripe Count: 1" << crlf;
586 // Build a list of TCP connections associated with this transfer; used by
587 // the TPC client for monitoring purposes.
588 bool first = true;
589 std::stringstream ss2;
590 for (std::vector<State*>::const_iterator iter = state.begin();
591 iter != state.end(); iter++)
592 {
593 std::string desc = (*iter)->GetConnectionDescription();
594 if (!desc.empty()) {
595 ss2 << (first ? "" : ",") << desc;
596 first = false;
597 }
598 }
599 if (!first)
600 ss << "RemoteConnections: " << ss2.str() << crlf;
601 ss << "End" << crlf;
602 rec.bytes_transferred = bytes_transferred;
603 logTransferEvent(LogMask::Debug, rec, "PERF_MARKER");
604
605 return req.ChunkResp(ss.str().c_str(), 0);
606}
607
608/******************************************************************************/
609/* T P C H a n d l e r : : R u n C u r l W i t h U p d a t e s */
610/******************************************************************************/
611
612int TPCHandler::RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, State &state,
613 TPCLogRecord &rec)
614{
615 // Create the multi-handle and add in the current transfer to it.
616 CURLM *multi_handle = curl_multi_init();
617 if (!multi_handle) {
618 rec.status = 500;
619 logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL",
620 "Failed to initialize a libcurl multi-handle");
621 std::stringstream ss;
622 ss << "Failed to initialize internal server memory";
623 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
624 }
625
626 //curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024);
627
628 CURLMcode mres;
629 mres = curl_multi_add_handle(multi_handle, curl);
630 if (mres) {
631 rec.status = 500;
632 std::stringstream ss;
633 ss << "Failed to add transfer to libcurl multi-handle: HTTP library failure=" << curl_multi_strerror(mres);
634 logTransferEvent(LogMask::Error, rec, "CURL_INIT_FAIL", ss.str());
635 curl_multi_cleanup(multi_handle);
636 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
637 }
638
639 // Start response to client prior to the first call to curl_multi_perform
640 int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
641 if (retval) {
642 curl_multi_cleanup(multi_handle);
643 logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
644 "Failed to send the initial response to the TPC client");
645 return retval;
646 } else {
647 logTransferEvent(LogMask::Debug, rec, "RESPONSE_START",
648 "Initial transfer response sent to the TPC client");
649 }
650
651 // Transfer loop: use curl to actually run the transfer, but periodically
652 // interrupt things to send back performance updates to the client.
653 int running_handles = 1;
654 time_t last_marker = 0;
655 // Track how long it's been since the last time we recorded more bytes being transferred.
656 off_t last_advance_bytes = 0;
657 time_t last_advance_time = time(NULL);
658 time_t transfer_start = last_advance_time;
659 CURLcode res = static_cast<CURLcode>(-1);
660 do {
661 time_t now = time(NULL);
662 time_t next_marker = last_marker + m_marker_period;
663 if (now >= next_marker) {
664 off_t bytes_xfer = state.BytesTransferred();
665 if (bytes_xfer > last_advance_bytes) {
666 last_advance_bytes = bytes_xfer;
667 last_advance_time = now;
668 }
669 if (SendPerfMarker(req, rec, state)) {
670 curl_multi_remove_handle(multi_handle, curl);
671 curl_multi_cleanup(multi_handle);
672 logTransferEvent(LogMask::Error, rec, "PERFMARKER_FAIL",
673 "Failed to send a perf marker to the TPC client");
674 return -1;
675 }
676 int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
677 if (now > last_advance_time + timeout) {
678 const char *log_prefix = rec.log_prefix.c_str();
679 bool tpc_pull = strncmp("Pull", log_prefix, 4) == 0;
680
681 state.SetErrorCode(10);
682 std::stringstream ss;
683 ss << "Transfer failed because no bytes have been "
684 << (tpc_pull ? "received from the source (pull mode) in "
685 : "transmitted to the destination (push mode) in ") << timeout << " seconds.";
686 state.SetErrorMessage(ss.str());
687 curl_multi_remove_handle(multi_handle, curl);
688 curl_multi_cleanup(multi_handle);
689 break;
690 }
691 last_marker = now;
692 }
693 // The transfer will start after this point, notify the packet marking manager
694 rec.pmarkManager.startTransfer();
695 mres = curl_multi_perform(multi_handle, &running_handles);
696 if (mres == CURLM_CALL_MULTI_PERFORM) {
697 // curl_multi_perform should be called again immediately. On newer
698 // versions of curl, this is no longer used.
699 continue;
700 } else if (mres != CURLM_OK) {
701 break;
702 } else if (running_handles == 0) {
703 break;
704 }
705
706 rec.pmarkManager.beginPMarks();
707 //printf("There are %d running handles\n", running_handles);
708
709 // Harvest any messages, looking for CURLMSG_DONE.
710 CURLMsg *msg;
711 do {
712 int msgq = 0;
713 msg = curl_multi_info_read(multi_handle, &msgq);
714 if (msg && (msg->msg == CURLMSG_DONE)) {
715 CURL *easy_handle = msg->easy_handle;
716 res = msg->data.result;
717 curl_multi_remove_handle(multi_handle, easy_handle);
718 }
719 } while (msg);
720
721 int64_t max_sleep_time = next_marker - time(NULL);
722 if (max_sleep_time <= 0) {
723 continue;
724 }
725 int fd_count;
726 mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count);
727 if (mres != CURLM_OK) {
728 break;
729 }
730 } while (running_handles);
731
732 if (mres != CURLM_OK) {
733 std::stringstream ss;
734 ss << "Internal libcurl multi-handle error: HTTP library failure=" << curl_multi_strerror(mres);
735 logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
736
737 curl_multi_remove_handle(multi_handle, curl);
738 curl_multi_cleanup(multi_handle);
739
740 if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
741 logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
742 "Failed to send error message to the TPC client");
743 return retval;
744 }
745 return req.ChunkResp(NULL, 0);
746 }
747
748 // Harvest any messages, looking for CURLMSG_DONE.
749 CURLMsg *msg;
750 do {
751 int msgq = 0;
752 msg = curl_multi_info_read(multi_handle, &msgq);
753 if (msg && (msg->msg == CURLMSG_DONE)) {
754 CURL *easy_handle = msg->easy_handle;
755 res = msg->data.result;
756 curl_multi_remove_handle(multi_handle, easy_handle);
757 }
758 } while (msg);
759
760 if (!state.GetErrorCode() && res == static_cast<CURLcode>(-1)) { // No transfers returned?!?
761 curl_multi_remove_handle(multi_handle, curl);
762 curl_multi_cleanup(multi_handle);
763 std::stringstream ss;
764 ss << "Internal state error in libcurl";
765 logTransferEvent(LogMask::Error, rec, "TRANSFER_CURL_ERROR", ss.str());
766
767 if ((retval = req.ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
768 logTransferEvent(LogMask::Error, rec, "RESPONSE_FAIL",
769 "Failed to send error message to the TPC client");
770 return retval;
771 }
772 return req.ChunkResp(NULL, 0);
773 }
774 curl_multi_cleanup(multi_handle);
775
776 state.Flush();
777
778 rec.bytes_transferred = state.BytesTransferred();
779 rec.tpc_status = state.GetStatusCode();
780
781 // Explicitly finalize the stream (which will close the underlying file
782 // handle) before the response is sent. In some cases, subsequent HTTP
783 // requests can occur before the filesystem is done closing the handle -
784 // and those requests may occur against partial data.
785 state.Finalize();
786
787 // Generate the final response back to the client.
788 std::stringstream ss;
789 bool success = false;
790 if (state.GetStatusCode() >= 400) {
791 std::string err = state.GetErrorMessage();
792 std::stringstream ss2;
793 ss2 << "Remote side failed with status code " << state.GetStatusCode();
794 if (!err.empty()) {
795 std::replace(err.begin(), err.end(), '\n', ' ');
796 ss2 << "; error message: \"" << err << "\"";
797 }
798 logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
799 ss << generateClientErr(ss2, rec);
800 } else if (state.GetErrorCode()) {
801 std::string err = state.GetErrorMessage();
802 if (err.empty()) {err = "(no error message provided)";}
803 else {std::replace(err.begin(), err.end(), '\n', ' ');}
804 std::stringstream ss2;
805 ss2 << "Error when interacting with local filesystem: " << err;
806 logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss2.str());
807 ss << generateClientErr(ss2, rec);
808 } else if (res != CURLE_OK) {
809 std::stringstream ss2;
810 ss2 << "Internal transfer failure";
811 std::stringstream ss3;
812 ss3 << ss2.str() << ": " << curl_easy_strerror(res);
813 logTransferEvent(LogMask::Error, rec, "TRANSFER_FAIL", ss3.str());
814 ss << generateClientErr(ss2, rec, res);
815 } else {
816 ss << "success: Created";
817 success = true;
818 }
819
820 if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
821 logTransferEvent(LogMask::Error, rec, "TRANSFER_ERROR",
822 "Failed to send last update to remote client");
823 return retval;
824 } else if (success) {
825 logTransferEvent(LogMask::Info, rec, "TRANSFER_SUCCESS");
826 rec.status = 0;
827 }
828 return req.ChunkResp(NULL, 0);
829}
830
831/******************************************************************************/
832/* T P C H a n d l e r : : P r o c e s s P u s h R e q */
833/******************************************************************************/
834
835int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) {
836 TPCLogRecord rec(req, TpcType::Push);
837 rec.allow_local = m_allow_local;
838 rec.allow_private = m_allow_private;
839 rec.log_prefix = "PushRequest";
840 rec.local = req.resource;
841 rec.remote = resource;
842 rec.m_log = &m_log;
843 char *name = req.GetSecEntity().name;
844 req.GetClientID(rec.clID);
845 if (name) rec.name = name;
846 logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request");
847
848 ManagedCurlHandle curlPtr(curl_easy_init());
849 auto curl = curlPtr.get();
850 if (!curl) {
851 std::stringstream ss;
852 ss << "Failed to initialize internal transfer resources";
853 rec.status = 500;
854 logTransferEvent(LogMask::Error, rec, "PUSH_FAIL", ss.str());
855 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
856 }
857 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
858 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
859#if CURL_AT_LEAST_VERSION(7, 85, 0)
860 curl_easy_setopt(curl, CURLOPT_PROTOCOLS_STR, "https,http");
861 curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS_STR, "https,http");
862#else
863 long protocols = CURLPROTO_HTTP | CURLPROTO_HTTPS;
864 curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
865 curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
866#endif
867 curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
868 curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
869 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
870 curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
871 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
872 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
873
874 auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
875 std::string redirect_resource = req.resource;
876 if (query_header != req.headers.end()) {
877 redirect_resource = query_header->second;
878 }
879
880 AtomicBeg(m_monid_mutex);
881 uint64_t file_monid = AtomicInc(m_monid);
882 AtomicEnd(m_monid_mutex);
883 std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, file_monid));
884 if (!fh.get()) {
885 rec.status = 500;
886 std::stringstream ss;
887 ss << "Failed to initialize internal transfer file handle";
888 logTransferEvent(LogMask::Error, rec, "OPEN_FAIL",
889 ss.str());
890 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
891 }
892 std::string full_url = prepareURL(req);
893
894 std::string authz = GetAuthz(req);
895
896 int open_results = OpenWaitStall(*fh, full_url, SFS_O_RDONLY, 0644,
897 req.GetSecEntity(), authz);
898 if (SFS_REDIRECT == open_results) {
899 int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
900 return result;
901 } else if (SFS_OK != open_results) {
902 int code;
903 std::stringstream ss;
904 const char *msg = fh->error.getErrText(code);
905 if (msg == NULL) ss << "Failed to open local resource";
906 else ss << msg;
907 rec.status = mapErrNoToHttp(code);
908 logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", msg);
909 int resp_result = req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
910 fh->close();
911 return resp_result;
912 }
913 ConfigureCurlCA(curl);
914 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
915
916 Stream stream(std::move(fh), 0, 0, m_log);
917 State state(0, stream, curl, true, req.tpcForwardCreds);
918 state.SetupHeaders(req);
919
920 return RunCurlWithUpdates(curl, req, state, rec);
921}
922
923/******************************************************************************/
924/* T P C H a n d l e r : : P r o c e s s P u l l R e q */
925/******************************************************************************/
926
927int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) {
928 TPCLogRecord rec(req,TpcType::Pull);
929 rec.allow_local = m_allow_local;
930 rec.allow_private = m_allow_private;
931 rec.log_prefix = "PullRequest";
932 rec.local = req.resource;
933 rec.remote = resource;
934 rec.m_log = &m_log;
935 char *name = req.GetSecEntity().name;
936 req.GetClientID(rec.clID);
937 if (name) rec.name = name;
938 logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a pull request");
939
940 ManagedCurlHandle curlPtr(curl_easy_init());
941 auto curl = curlPtr.get();
942 if (!curl) {
943 std::stringstream ss;
944 ss << "Failed to initialize internal transfer resources";
945 rec.status = 500;
946 logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
947 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
948 }
949
950 // ddavila 2023-01-05:
951 // The following change was required by the Rucio/SENSE project where
952 // multiple IP addresses, each from a different subnet, are assigned to a
953 // single server and routed differently by SENSE.
954 // The above requires the server to utilize the same IP, that was used to
955 // start the TPC, for the resolution of the given TPC instead of
956 // using any of the IPs available.
957 if (m_fixed_route){
958 XrdNetAddr *nP;
959 int numIP = 0;
960 char buff[1024];
961 char * ip;
962
963 // Get the hostname used to contact the server from the http header
964 auto host_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"host");
965 std::string host_used;
966 if (host_header != req.headers.end()) {
967 host_used = host_header->second;
968 }
969
970 // Get the IP addresses associated with the above hostname
971 XrdNetUtils::GetAddrs(host_used.c_str(), &nP, numIP, XrdNetUtils::prefAuto, 0);
972 int ip_size = nP[0].Format(buff, 1024, XrdNetAddrInfo::fmtAddr,XrdNetAddrInfo::noPort);
973 ip = (char *)malloc(ip_size-1);
974
975 // Substring to get only the address, remove brackets and garbage
976 memcpy(ip, buff+1, ip_size-2);
977 ip[ip_size-2]='\0';
978 logTransferEvent(LogMask::Info, rec, "LOCAL IP", ip);
979
980 curl_easy_setopt(curl, CURLOPT_INTERFACE, ip);
981 }
982 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
983 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (long) CURL_HTTP_VERSION_1_1);
984#if CURL_AT_LEAST_VERSION(7, 85, 0)
985 curl_easy_setopt(curl, CURLOPT_PROTOCOLS_STR, "https,http");
986 curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS_STR, "https,http");
987#else
988 long protocols = CURLPROTO_HTTP | CURLPROTO_HTTPS;
989 curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
990 curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
991#endif
992 curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
993 curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
994 curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
995 curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA , &rec);
996 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
997 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
998 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
999 std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
1000 if (!fh.get()) {
1001 std::stringstream ss;
1002 ss << "Failed to initialize internal transfer file handle";
1003 rec.status = 500;
1004 logTransferEvent(LogMask::Error, rec, "PULL_FAIL", ss.str());
1005 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1006 }
1007 auto query_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"xrd-http-fullresource");
1008 std::string redirect_resource = req.resource;
1009 if (query_header != req.headers.end()) {
1010 redirect_resource = query_header->second;
1011 }
1013 auto overwrite_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"overwrite");
1014 if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) {
1015 if (! usingEC) mode = SFS_O_TRUNC;
1016 }
1017 int streams = 1;
1018 {
1019 auto streams_header = XrdOucTUtils::caseInsensitiveFind(req.headers,"x-number-of-streams");
1020 if (streams_header != req.headers.end()) {
1021 int stream_req = -1;
1022 try {
1023 stream_req = std::stol(streams_header->second);
1024 } catch (...) { // Handled below
1025 }
1026 if (stream_req < 0 || stream_req > 100) {
1027 std::stringstream ss;
1028 ss << "Invalid request for number of streams";
1029 rec.status = 400;
1030 logTransferEvent(LogMask::Info, rec, "INVALID_REQUEST", ss.str());
1031 return req.SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1032 }
1033 streams = stream_req == 0 ? 1 : stream_req;
1034 }
1035 }
1036 rec.streams = streams;
1037 std::string full_url = prepareURL(req);
1038 std::string authz = GetAuthz(req);
1039 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
1040 ConfigureCurlCA(curl);
1041 uint64_t sourceFileContentLength = 0;
1042 {
1043 //Get the content-length of the source file and pass it to the OSS layer
1044 //during the open
1045 bool success;
1046 GetContentLengthTPCPull(curl, req, sourceFileContentLength, success, rec);
1047 if(success) {
1048 //In the case we cannot get the information from the source server (offline or other error)
1049 //we just don't add the size information to the opaque of the local file to open
1050 full_url += "&oss.asize=" + std::to_string(sourceFileContentLength);
1051 } else {
1052 // In the case the GetContentLength is not successful, an error will be returned to the client
1053 // just exit here so we don't open the file!
1054 return 0;
1055 }
1056 }
1057 int open_result = OpenWaitStall(*fh, full_url, mode|SFS_O_WRONLY,
1058 0644 | SFS_O_MKPTH,
1059 req.GetSecEntity(), authz);
1060 if (SFS_REDIRECT == open_result) {
1061 int result = RedirectTransfer(curl, redirect_resource, req, fh->error, rec);
1062 return result;
1063 } else if (SFS_OK != open_result) {
1064 int code;
1065 std::stringstream ss;
1066 const char *msg = fh->error.getErrText(code);
1067 if ((msg == NULL) || (*msg == '\0')) ss << "Failed to open local resource";
1068 else ss << msg;
1069 rec.status = mapErrNoToHttp(code);
1070 logTransferEvent(LogMask::Error, rec, "OPEN_FAIL", ss.str());
1071 int resp_result = req.SendSimpleResp(rec.status, NULL, NULL,
1072 generateClientErr(ss, rec).c_str(), 0);
1073 fh->close();
1074 return resp_result;
1075 }
1076 Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
1077 State state(0, stream, curl, false, req.tpcForwardCreds);
1078 state.SetupHeaders(req);
1079 state.SetContentLength(sourceFileContentLength);
1080
1081 if (streams > 1) {
1082 return RunCurlWithStreams(req, state, streams, rec);
1083 } else {
1084 return RunCurlWithUpdates(curl, req, state, rec);
1085 }
1086}
1087
1088/******************************************************************************/
1089/* T P C H a n d l e r : : l o g T r a n s f e r E v e n t */
1090/******************************************************************************/
1091
1092void TPCHandler::logTransferEvent(LogMask mask, const TPCLogRecord &rec,
1093 const std::string &event, const std::string &message)
1094{
1095 if (!(m_log.getMsgMask() & mask)) {return;}
1096
1097 std::stringstream ss;
1098 ss << "event=" << event << ", local=" << rec.local << ", remote=" << rec.remote;
1099 if (rec.name.empty())
1100 ss << ", user=(anonymous)";
1101 else
1102 ss << ", user=" << rec.name;
1103 if (rec.streams != 1)
1104 ss << ", streams=" << rec.streams;
1105 if (rec.bytes_transferred >= 0)
1106 ss << ", bytes_transferred=" << rec.bytes_transferred;
1107 if (rec.status >= 0)
1108 ss << ", status=" << rec.status;
1109 if (rec.tpc_status >= 0)
1110 ss << ", tpc_status=" << rec.tpc_status;
1111 if (!message.empty())
1112 ss << "; " << message;
1113 m_log.Log(mask, rec.log_prefix.c_str(), ss.str().c_str());
1114}
1115
1116std::string TPCHandler::generateClientErr(std::stringstream &err_ss, const TPCLogRecord &rec, CURLcode cCode) {
1117 std::stringstream ssret;
1118 ssret << "failure: " << err_ss.str() << ", local=" << rec.local <<", remote=" << rec.remote;
1119 if(cCode != CURLcode::CURLE_OK) {
1120 ssret << ", HTTP library failure=" << curl_easy_strerror(cCode);
1121 }
1122 return ssret.str();
1123}
1124/******************************************************************************/
1125/* X r d H t t p G e t E x t H a n d l e r */
1126/******************************************************************************/
1127
1128extern "C" {
1129
1130XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, const char * /*parms*/, XrdOucEnv *myEnv) {
1131 if (curl_global_init(CURL_GLOBAL_DEFAULT)) {
1132 log->Emsg("TPCInitialize", "libcurl failed to initialize");
1133 return NULL;
1134 }
1135
1136 TPCHandler *retval{NULL};
1137 if (!config) {
1138 log->Emsg("TPCInitialize", "TPC handler requires a config filename in order to load");
1139 return NULL;
1140 }
1141 try {
1142 log->Emsg("TPCInitialize", "Will load configuration for the TPC handler from", config);
1143 retval = new TPCHandler(log, config, myEnv);
1144 } catch (std::runtime_error &re) {
1145 log->Emsg("TPCInitialize", "Encountered a runtime failure when loading ", re.what());
1146 //printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*"));
1147 }
1148 return retval;
1149}
1150
1151}
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdHttpExtHandlerArgs)
void CURL
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC)
static std::string PrepareURL(const std::string &url)
std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
static bool IsAllowedScheme(const std::string &url)
int mapErrNoToHttp(int errNo)
std::string httpStatusToString(int status)
Utility functions for XrdHTTP.
std::string encode_str(const std::string &str)
#define close(a)
Definition XrdPosix.hh:48
void getline(uchar *buff, int blen)
#define SFS_REDIRECT
#define SFS_O_MKPTH
#define SFS_STALL
#define SFS_O_RDONLY
#define SFS_STARTED
#define SFS_O_WRONLY
#define SFS_O_CREAT
int XrdSfsFileOpenMode
#define SFS_OK
#define SFS_O_TRUNC
#define AtomicInc(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
if(Avsz)
int GetStatusCode() const
off_t BytesTransferred() const
void SetErrorMessage(const std::string &error_msg)
int GetErrorCode() const
std::string GetErrorMessage() const
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
void SetContentLength(const off_t content_length)
off_t GetContentLength() const
void SetErrorCode(int error_code)
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
virtual int ProcessReq(XrdHttpExtReq &req)
virtual ~TPCHandler()
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
void GetClientID(std::string &clid)
std::map< std::string, std::string > & headers
std::string resource
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
const XrdSecEntity & GetSecEntity() const
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
static std::string prepareOpenURL(const std::string &reqResource, std::map< std::string, std::string > &reqHeaders, const std::map< std::string, std::string > &hdr2cgimap)
static const int noPort
Do not add port number.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
static const char * GetAddrs(const char *hSpec, XrdNetAddr *aListP[], int &aListN, AddrOpts opts=allIPMap, int pNum=PortInSpec)
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:281
const char * getErrText()
void setUCap(int ucval)
Set user capabilties.
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
char * name
Entity's name.
XrdOucErrInfo & error
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client=0, const char *opaque=0)=0
virtual int close()=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
void operator()(CURL *curl)
static const int uIPv64
ucap: Supports only IPv4 info