xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdTpcTPC.hh
Go to the documentation of this file.
1 
2 #include <memory>
3 #include <string>
4 #include <vector>
5 #include <sys/time.h>
6 
8 
10 #include "XrdHttp/XrdHttpUtils.hh"
11 
12 #include "XrdTls/XrdTlsTempCA.hh"
13 
14 #include <curl/curl.h>
15 
16 class XrdOucErrInfo;
17 class XrdOucStream;
18 class XrdSfsFile;
19 class XrdSfsFileSystem;
21 typedef void CURL;
22 
23 namespace TPC {
24 class State;
25 
26 enum LogMask {
27  Debug = 0x01,
28  Info = 0x02,
29  Warning = 0x04,
30  Error = 0x08,
31  All = 0xff
32 };
33 
34 
35 struct CurlDeleter {
36  void operator()(CURL *curl);
37 };
38 using ManagedCurlHandle = std::unique_ptr<CURL, CurlDeleter>;
39 
40 
41 class TPCHandler : public XrdHttpExtHandler {
42 public:
43  TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv);
44  virtual ~TPCHandler();
45 
46  virtual bool MatchesPath(const char *verb, const char *path);
47  virtual int ProcessReq(XrdHttpExtReq &req);
48  // Abstract method in the base class, but does not seem to be used
49  virtual int Init(const char *cfgfile) {return 0;}
50 
51 private:
52 
53  static int sockopt_setcloexec_callback(void * clientp, curl_socket_t curlfd, curlsocktype purpose);
54  static int opensocket_callback(void *clientp,
55  curlsocktype purpose,
56  struct curl_sockaddr *address);
57 
58  struct TPCLogRecord {
59 
61  tpc_status(-1), streams( 1 ), isIPv6(false)
62  {
63  gettimeofday(&begT, 0); // Set effective start time
64  }
65  ~TPCLogRecord();
66 
67  std::string log_prefix;
68  std::string local;
69  std::string remote;
70  std::string name;
71  std::string clID;
73  timeval begT;
75  int status;
77  unsigned int streams;
78  bool isIPv6;
79  };
80 
82 
83  static std::string GetAuthz(XrdHttpExtReq &req);
84 
85  // Configure curl handle's CA settings. The CA files present here should
86  // be valid for the lifetime of the process.
87  void ConfigureCurlCA(CURL *curl);
88 
89  // Redirect the transfer according to the contents of an XrdOucErrInfo object.
90  int RedirectTransfer(CURL *curl, const std::string &redirect_resource, XrdHttpExtReq &req,
91  XrdOucErrInfo &error, TPCLogRecord &);
92 
93  int OpenWaitStall(XrdSfsFile &fh, const std::string &resource, int mode,
94  int openMode, const XrdSecEntity &sec,
95  const std::string &authz);
96 
97 #ifdef XRD_CHUNK_RESP
98  int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, TPC::State &state,
99  bool &success, TPCLogRecord &);
100 
101  // Send a 'performance marker' back to the TPC client, informing it of our
102  // progress. The TPC client will use this information to determine whether
103  // the transfer is making sufficient progress and/or other monitoring info
104  // (such as whether the transfer is happening over IPv4, IPv6, or both).
105  int SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, TPC::State &state);
106  int SendPerfMarker(XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
107  off_t bytes_transferred);
108 
109  // Perform the libcurl transfer, periodically sending back chunked updates.
110  int RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, TPC::State &state,
111  TPCLogRecord &rec);
112 
113  // Experimental multi-stream version of RunCurlWithUpdates
114  int RunCurlWithStreams(XrdHttpExtReq &req, TPC::State &state,
115  size_t streams, TPCLogRecord &rec);
116  int RunCurlWithStreamsImpl(XrdHttpExtReq &req, TPC::State &state,
117  size_t streams, std::vector<TPC::State*> &streams_handles,
118  std::vector<ManagedCurlHandle> &curl_handles,
119  TPCLogRecord &rec);
120 #else
121  int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, TPC::State &state,
122  TPCLogRecord &rec);
123 #endif
124 
125  int ProcessPushReq(const std::string & resource, XrdHttpExtReq &req);
126  int ProcessPullReq(const std::string &resource, XrdHttpExtReq &req);
127 
128  bool ConfigureFSLib(XrdOucStream &Config, std::string &path1, bool &path1_alt,
129  std::string &path2, bool &path2_alt);
130  bool Configure(const char *configfn, XrdOucEnv *myEnv);
132 
133  // Generate a consistently-formatted log message.
134  void logTransferEvent(LogMask lvl, const TPCLogRecord &record,
135  const std::string &event, const std::string &message="");
136 
137  static int m_marker_period;
138  static size_t m_block_size;
139  static size_t m_small_block_size;
141  int m_timeout; // the 'timeout interval'; if no bytes have been received during this time period, abort the transfer.
142  int m_first_timeout; // the 'first timeout interval'; the amount of time we're willing to wait to get the first byte.
143  // Unless explicitly specified, this is 2x the timeout interval.
144  std::string m_cadir; // The directory to use for CAs.
145  std::string m_cafile; // The file to use for CAs in libcurl
147  static uint64_t m_monid;
149  XrdSfsFileSystem *m_sfs;
150  std::shared_ptr<XrdTlsTempCA> m_ca_file;
151 
152  // 16 blocks in flight at 16 MB each, meaning that there will be up to 256MB
153  // in flight; this is equal to the bandwidth delay product of a 200ms transcontinental
154  // connection at 10Gbps.
155 #ifdef USE_PIPELINING
156  static const int m_pipelining_multiplier = 16;
157 #else
158  static const int m_pipelining_multiplier = 1;
159 #endif
160 
161  bool usingEC; // indicate if XrdEC is used
162 };
163 }
Definition: XrdTpcState.hh:20
std::string m_cafile
Definition: XrdTpcTPC.hh:145
std::string local
Definition: XrdTpcTPC.hh:68
LogMask
Definition: XrdTpcTPC.hh:26
Definition: XrdTpcTPC.hh:27
Definition: XrdTpcTPC.hh:58
bool m_desthttps
Definition: XrdTpcTPC.hh:140
virtual int ProcessReq(XrdHttpExtReq &req)
bool usingEC
Definition: XrdTpcTPC.hh:161
Definition: XrdOucStream.hh:46
bool isIPv6
Definition: XrdTpcTPC.hh:78
Utility functions for XrdHTTP.
static size_t m_block_size
Definition: XrdTpcTPC.hh:138
Definition: XrdXrootdTpcMon.hh:40
int tpc_status
Definition: XrdTpcTPC.hh:76
unsigned int streams
Definition: XrdTpcTPC.hh:77
Definition: XrdSysError.hh:89
std::string m_cadir
Definition: XrdTpcTPC.hh:144
static const int m_pipelining_multiplier
Definition: XrdTpcTPC.hh:158
bool ConfigureFSLib(XrdOucStream &Config, std::string &path1, bool &path1_alt, std::string &path2, bool &path2_alt)
Definition: XrdTpcTPC.hh:28
static std::string GetAuthz(XrdHttpExtReq &req)
Definition: XrdSysPthread.hh:164
bool Configure(const char *configfn, XrdOucEnv *myEnv)
virtual int Init(const char *cfgfile)
Initializes the external request handler.
Definition: XrdTpcTPC.hh:49
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
std::string log_prefix
Definition: XrdTpcTPC.hh:67
int OpenWaitStall(XrdSfsFile &fh, const std::string &resource, int mode, int openMode, const XrdSecEntity &sec, const std::string &authz)
XrdCmsConfig Config
int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, TPC::State &state, TPCLogRecord &rec)
Definition: XrdTpcTPC.hh:41
std::shared_ptr< XrdTlsTempCA > m_ca_file
Definition: XrdTpcTPC.hh:150
static XrdXrootdTpcMon * tpcMonitor
Definition: XrdTpcTPC.hh:72
Definition: XrdOucErrInfo.hh:100
Definition: XrdTpcTPC.hh:30
int RedirectTransfer(CURL *curl, const std::string &redirect_resource, XrdHttpExtReq &req, XrdOucErrInfo &error, TPCLogRecord &)
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
Definition: XrdOucEnv.hh:41
Definition: XrdHttpExtHandler.hh:82
int ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
TPCLogRecord()
Definition: XrdTpcTPC.hh:60
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
Definition: XrdTpcTPC.hh:38
static int m_marker_period
Definition: XrdTpcTPC.hh:137
Definition: XrdTpcTPC.hh:35
static uint64_t m_monid
Definition: XrdTpcTPC.hh:147
XrdSfsFileSystem * m_sfs
Definition: XrdTpcTPC.hh:149
static int opensocket_callback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address)
void operator()(CURL *curl)
static XrdSysMutex m_monid_mutex
Definition: XrdTpcTPC.hh:146
int m_first_timeout
Definition: XrdTpcTPC.hh:142
std::string clID
Definition: XrdTpcTPC.hh:71
static int sockopt_setcloexec_callback(void *clientp, curl_socket_t curlfd, curlsocktype purpose)
void logTransferEvent(LogMask lvl, const TPCLogRecord &record, const std::string &event, const std::string &message="")
int ProcessOptionsReq(XrdHttpExtReq &req)
Definition: XrdSecEntity.hh:63
bool ConfigureLogger(XrdOucStream &Config)
int ProcessPushReq(const std::string &resource, XrdHttpExtReq &req)
virtual ~TPCHandler()
Definition: XrdSfsInterface.hh:364
std::string remote
Definition: XrdTpcTPC.hh:69
int m_timeout
Definition: XrdTpcTPC.hh:141
Definition: XrdHttpExtHandler.hh:45
int status
Definition: XrdTpcTPC.hh:75
static size_t m_small_block_size
Definition: XrdTpcTPC.hh:139
void CURL
Definition: XrdTpcState.hh:13
std::string name
Definition: XrdTpcTPC.hh:70
Definition: XrdTpcTPC.hh:31
XrdSysError m_log
Definition: XrdTpcTPC.hh:148
Definition: XrdTpcTPC.hh:29
void ConfigureCurlCA(CURL *curl)
off_t bytes_transferred
Definition: XrdTpcTPC.hh:74
timeval begT
Definition: XrdTpcTPC.hh:73