XRootD
XrdClHttpFactory.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3 /* */
4 /* This file is part of the XrdClHttp client plugin for XRootD. */
5 /* */
6 /* XRootD is free software: you can redistribute it and/or modify it under */
7 /* the terms of the GNU Lesser General Public License as published by the */
8 /* Free Software Foundation, either version 3 of the License, or (at your */
9 /* option) any later version. */
10 /* */
11 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14 /* License for more details. */
15 /* */
16 /* The copyright holder's institutional names and contributor's names may not */
17 /* be used to endorse or promote products derived from this software without */
18 /* specific prior written permission of the institution or contributor. */
19 /******************************************************************************/
20 
21 #include "XrdClHttpFactory.hh"
22 #include "XrdClHttpFile.hh"
23 #include "XrdClHttpFilesystem.hh"
24 #include "XrdClHttpUtil.hh"
25 #include "XrdClHttpOps.hh"
26 #include "XrdClHttpParseTimeout.hh"
27 #include "XrdClHttpWorker.hh"
28 
29 #include "XrdCl/XrdClConstants.hh"
30 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClLog.hh"
33 #include "XrdVersion.hh"
34 
35 #include <stdio.h>
36 #include <unistd.h>
37 
39 
40 using namespace XrdClHttp;
41 
42 struct timespec
43 Factory::GetHeaderTimeoutWithDefault(time_t oper_timeout)
44 {
45  if (oper_timeout == 0) {
47  XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
48  oper_timeout = val;
49  }
50  if (oper_timeout <= 0) {
51  return {0, 0};
52  }
53  return {oper_timeout, 0};
54 }
55 
56 bool Factory::m_initialized = false;
57 std::shared_ptr<XrdClHttp::HandlerQueue> Factory::m_queue;
58 XrdCl::Log *Factory::m_log = nullptr;
59 std::once_flag Factory::m_init_once;
60 std::string Factory::m_stats_location;
61 std::chrono::system_clock::time_point Factory::m_start{};
62 
63 std::mutex Factory::m_shutdown_lock;
64 std::thread Factory::m_monitor_tid;
65 std::condition_variable Factory::m_shutdown_requested_cv;
66 bool Factory::m_shutdown_requested = false;
67 
68 // shutdown trigger, must be last of the static members
69 Factory::shutdown_s Factory::m_shutdowns;
70 
71 void
72 Factory::Initialize()
73 {
74  std::unique_lock lock(m_shutdown_lock);
75  if (m_shutdown_requested) {
76  return;
77  }
78  std::call_once(m_init_once, [&] {
79  m_log = XrdCl::DefaultEnv::GetLog();
80  if (!m_log) {
81  return;
82  }
83  m_log->SetTopicName(kLogXrdClHttp, "XrdClHttp");
84 
85  auto env = XrdCl::DefaultEnv::GetEnv();
86  if (!env) {
87  return;
88  }
89 
90  SetupX509();
91 
92  // The location for the client to write the statistics file; this will be dropped
93  // atomically every ~5 seconds and is meant to be complementary to the future g-stream.
94  env->PutString("HttpStatisticsLocation", "");
95  env->ImportString("HttpStatisticsLocation", "XRD_HTTPSTATISTICSLOCATION");
96  if (env->GetString("HttpStatisticsLocation", m_stats_location)) {
97  m_log->Debug(kLogXrdClHttp, "Will write client statistics to %s", m_stats_location.c_str());
98  } else {
99  m_log->Debug(kLogXrdClHttp, "Not writing client statistics to disk");
100  }
101  m_start = std::chrono::system_clock::now();
102 
103  // The minimum value we will accept from the request for a header timeout.
104  // (i.e., the amount of time the plugin will wait to receive headers from the remote server)
105  env->PutString("HttpMinimumHeaderTimeout", "");
106  env->ImportString("HttpMinimumHeaderTimeout", "XRD_HTTPMINIMUMHEADERTIMEOUT");
107 
108  // The default value of the header timeout (the amount of time the plugin will wait)
109  // to receive headers from the remote server.
110  env->PutString("HttpDefaultHeaderTimeout", "");
111  env->ImportString("HttpDefaultHeaderTimeout", "XRD_HTTPDEFAULTHEADERTIMEOUT");
112 
113  // The number of pending operations allowed in the global work queue.
114  env->PutInt("HttpMaxPendingOps", XrdClHttp::HandlerQueue::GetDefaultMaxPendingOps());
115  env->ImportInt("HttpMaxPendingOps", "XRD_HTTPMAXPENDINGOPS");
117  if (env->GetInt("HttpMaxPendingOps", max_pending)) {
118  if (max_pending <= 0 || max_pending > 10'000'000) {
119  m_log->Error(kLogXrdClHttp,
120  "Invalid value for the maximum number of pending operations in the global work queue (%d); using default value of %d",
121  max_pending,
124  env->PutInt("HttpMaxPendingOps", max_pending);
125  }
126  m_log->Debug(kLogXrdClHttp, "Using %d pending operations in the global work queue", max_pending);
127  }
128  m_queue.reset(new XrdClHttp::HandlerQueue(max_pending));
129 
130  // The number of threads to use for curl operations.
131  env->PutInt("HttpNumThreads", m_poll_threads);
132  env->ImportInt("HttpNumThreads", "XRD_HTTPNUMTHREADS");
133  int num_threads = m_poll_threads;
134  if (env->GetInt("HttpNumThreads", num_threads)) {
135  if (num_threads <= 0 || num_threads > 1'000) {
136  m_log->Error(kLogXrdClHttp, "Invalid value for the number of threads to use for curl operations (%d); using default value of %d", num_threads, m_poll_threads);
137  num_threads = m_poll_threads;
138  env->PutInt("HttpNumThreads", num_threads);
139  }
140  m_log->Debug(kLogXrdClHttp, "Using %d threads for curl operations", num_threads);
141  }
142 
143  // The stall timeout to use for transfer operations.
144  env->PutInt("HttpStallTimeout", XrdClHttp::CurlOperation::GetDefaultStallTimeout());
145  env->ImportInt("HttpStallTimeout", "XRD_HTTPSTALLTIMEOUT");
147  if (env->GetInt("HttpStallTimeout", stall_timeout)) {
148  if (stall_timeout < 0 || stall_timeout > 86'400) {
149  m_log->Error(kLogXrdClHttp, "Invalid value for the stall timeout (%d); using default value of %d", stall_timeout, XrdClHttp::CurlOperation::GetDefaultStallTimeout());
151  env->PutInt("HttpStallTimeout", stall_timeout);
152  }
153  m_log->Debug(kLogXrdClHttp, "Using %d seconds for the stall timeout", stall_timeout);
154  }
156 
157  // The slow transfer rate, in bytes per second, for timing out slow uploads/downloads.
158  env->PutInt("HttpSlowRateBytesSec", XrdClHttp::CurlOperation::GetDefaultSlowRateBytesSec());
159  env->ImportInt("HttpSlowRateBytesSec", "XRD_HTTPSLOWRATEBYTESSEC");
161  if (env->GetInt("HttpSlowRateBytesSec", slow_xfer_rate)) {
162  if (slow_xfer_rate < 0 || slow_xfer_rate > (1024 * 1024 * 1024)) {
163  m_log->Error(kLogXrdClHttp, "Invalid value for the slow transfer rate threshold (%d); using default value of %d", stall_timeout, XrdClHttp::CurlOperation::GetDefaultSlowRateBytesSec());
165  env->PutInt("HttpSlowRateBytesSec", slow_xfer_rate);
166  }
167  m_log->Debug(kLogXrdClHttp, "Using %d bytes/sec for the slow transfer rate threshold", slow_xfer_rate);
168  }
170 
171  // Determine the minimum header timeout. It's somewhat arbitrarily defaulted to 2s; below
172  // that and timeouts could be caused by OS scheduling noise. If the client has unreasonable
173  // expectations of the origin, we don't want to cause it to generate lots of origin-side load.
174  std::string val;
175  struct timespec mct{2, 0};
176  if (env->GetString("HttpMinimumHeaderTimeout", val) && !val.empty()) {
177  std::string errmsg;
178  if (!ParseTimeout(val, mct, errmsg)) {
179  m_log->Error(kLogXrdClHttp, "Failed to parse the minimum client timeout (%s): %s", val.c_str(), errmsg.c_str());
180  }
181  }
183 
184  struct timespec dht{9, 500'000'000};
185  if (env->GetString("HttpDefaultHeaderTimeout", val) && !val.empty()) {
186  std::string errmsg;
187  if (!ParseTimeout(val, dht, errmsg)) {
188  m_log->Error(kLogXrdClHttp, "Failed to parse the default header timeout (%s): %s", val.c_str(), errmsg.c_str());
189  }
190  }
192 
193  // Start up the cache for the OPTIONS response
194  auto &cache = XrdClHttp::VerbsCache::Instance();
195 
196  // Startup curl workers after we've set the configs to avoid race conditions
197  for (unsigned idx=0; idx<m_poll_threads; idx++) {
198  auto wk = std::make_unique<XrdClHttp::CurlWorker>(m_queue, cache, m_log);
199  auto wkp = wk.get();
200  std::thread t(XrdClHttp::CurlWorker::RunStatic, wkp);
201  wkp->Start(std::move(wk), std::move(t));
202  }
203 
204  std::thread t([this]{Monitor();});
205  m_monitor_tid = std::move(t);
206 
207  m_initialized = true;
208  });
209 }
210 
211 void
212 Factory::Monitor()
213 {
214  // This function is run in a separate thread to monitor the XrdClHttp statistics.
215  // It periodically saves the statistics to the stats file.
216  // Note: this previously had support for sending the statistics through the gstream.
217  // However, this was removed because gstream currently requires linking against XrdServer
218  // which is not available in the client; some further rearranging of headers and linkages
219  // is necessary.
220 
221  while (true) {
222  {
223  std::unique_lock lock(m_shutdown_lock);
224  m_shutdown_requested_cv.wait_for(
225  lock,
226  std::chrono::seconds(5),
227  []{return m_shutdown_requested;}
228  );
229  if (m_shutdown_requested) {
230  break;
231  }
232  }
233 
234  auto now = std::chrono::system_clock::now();
235 
236  std::string monitoring = "{\"event\": \"xrdclhttp\", "
237  "\"start\": " + std::to_string(std::chrono::duration<double>(m_start.time_since_epoch()).count()) + ","
238  "\"now\": " + std::to_string(std::chrono::duration<double>(now.time_since_epoch()).count()) + ","
239  "\"file\": " + File::GetMonitoringJson() + ","
240  "\"workers\": " + CurlWorker::GetMonitoringJson() + ","
241  "\"queues\": " + HandlerQueue::GetMonitoringJson() +
242  " }";
243  m_log->Info(kLogXrdClHttp, "Client monitoring statistics: %s", monitoring.c_str());
244  if (!m_stats_location.empty())
245  {
246  auto stats_tmp = m_stats_location + ".XXXXXX";
247  std::vector<char> stats_vector(stats_tmp.size() + 1, '\0');
248  memcpy(&stats_vector[0], stats_tmp.data(), stats_tmp.size() + 1);
249  auto fd = mkstemp(&stats_vector[0]);
250  if (fd == -1) {
251  m_log->Warning(kLogXrdClHttp, "Failed to create temporary stats file %s: %s", m_stats_location.c_str(), strerror(errno));
252  continue;
253  }
254  auto nb = write(fd, monitoring.data(), monitoring.size());
255  if (nb != static_cast<ssize_t>(monitoring.size())) {
256  if (nb == -1) m_log->Warning(kLogXrdClHttp, "Failed to write statistics into temporary file %s: %s", &stats_vector[0], strerror(errno));
257  else m_log->Warning(kLogXrdClHttp, "Failed to write statistics into temporary file %s: short write", &stats_vector[0]);
258  close(fd);
259  continue;
260  }
261  close(fd);
262  auto rv = rename(&stats_vector[0], m_stats_location.c_str());
263  if (rv) {
264  m_log->Warning(kLogXrdClHttp, "Failed to atomically rename stats file to final destination %s: %s", m_stats_location.c_str(), strerror(errno));
265  }
266  }
267  }
268 }
269 
270 namespace {
271 
272 void SetIfEmpty(XrdCl::Env *env, XrdCl::Log &log, const std::string &optName, const std::string &envName) {
273  if (!env) return;
274 
275  std::string val;
276  if (!env->GetString(optName, val) || val.empty()) {
277  env->PutString(optName, "");
278  env->ImportString(optName, envName);
279  }
280  if (env->GetString(optName, val) && !val.empty()) {
281  log.Info(kLogXrdClHttp, "Setting %s to value '%s'", optName.c_str(), val.c_str());
282  }
283 }
284 
285 } // namespace
286 
287 void
288 Factory::SetupX509() {
289 
290  auto env = XrdCl::DefaultEnv::GetEnv();
291  SetIfEmpty(env, *m_log, "HttpCertFile", "XRD_HTTPCERTFILE");
292  SetIfEmpty(env, *m_log, "HttpCertDir", "XRD_HTTPCERTDIR");
293  SetIfEmpty(env, *m_log, "HttpClientCertFile", "XRD_HTTPCLIENTCERTFILE");
294  SetIfEmpty(env, *m_log, "HttpClientKeyFile", "XRD_HTTPCLIENTKEYFILE");
295 
296  int disable_proxy = 0;
297  env->PutInt("HttpDisableX509", 0);
298  env->ImportInt("HttpDisableX509", "XRD_HTTPDISABLEX509");
299 
300  std::string filename;
301  char *filename_char;
302  if (!disable_proxy && (!env->GetString("HttpClientCertFile", filename) || filename.empty())) {
303  if ((filename_char = getenv("X509_USER_PROXY"))) {
304  filename = filename_char;
305  }
306  if (filename.empty()) {
307  filename = "/tmp/x509up_u" + std::to_string(geteuid());
308  }
309  if (access(filename.c_str(), R_OK) == 0) {
310  m_log->Debug(kLogXrdClHttp, "Using X509 proxy file found at %s for TLS client credential", filename.c_str());
311  env->PutString("HttpClientCertFile", filename);
312  env->PutString("HttpClientKeyFile", filename);
313  }
314  }
315  if ((!env->GetString("HttpCertDir", filename) || filename.empty()) && (filename_char = getenv("X509_CERT_DIR"))) {
316  env->PutString("HttpCertDir", filename_char);
317  }
318 }
319 
320 void
321 Factory::Shutdown()
322 {
323  {
324  std::unique_lock lock(m_shutdown_lock);
325  m_shutdown_requested = true;
326  m_shutdown_requested_cv.notify_one();
327  }
328  if (m_monitor_tid.joinable()) {
329  m_monitor_tid.join();
330  }
331 }
332 
333 void
334 Factory::Produce(std::unique_ptr<XrdClHttp::CurlOperation> operation)
335 {
336  m_queue->Produce(std::move(operation));
337 }
338 
340 Factory::CreateFile(const std::string & /*url*/) {
341  Initialize();
342  if (!m_initialized) {return nullptr;}
343  return new File(m_queue, m_log);
344 }
345 
347 Factory::CreateFileSystem(const std::string & url) {
348  Initialize();
349  if (!m_initialized) {return nullptr;}
350  return new Filesystem(url, m_queue, m_log);
351 }
352 
353 extern "C"
354 {
355  void *XrdClGetPlugIn(const void*)
356  {
357  return static_cast<void*>(new Factory());
358  }
359 }
XrdVERSIONINFO(XrdClGetPlugIn, XrdClGetPlugIn) using namespace XrdClHttp
void * XrdClGetPlugIn(const void *)
int rename(const char *oldpath, const char *newpath)
int access(const char *path, int amode)
#define close(a)
Definition: XrdPosix.hh:48
#define write(a, b, c)
Definition: XrdPosix.hh:123
XrdOucString File
static void SetSlowRateBytesSec(int rate)
static void SetStallTimeout(int stall_interval)
static int GetDefaultSlowRateBytesSec()
static int GetDefaultStallTimeout()
static void RunStatic(CurlWorker *myself)
static std::string GetMonitoringJson()
void Produce(std::unique_ptr< XrdClHttp::CurlOperation > operation)
virtual XrdCl::FileSystemPlugIn * CreateFileSystem(const std::string &url) override
Create a file system plug-in for the given URL.
virtual XrdCl::FilePlugIn * CreateFile(const std::string &url) override
Create a file plug-in for the given URL.
static void SetDefaultHeaderTimeout(struct timespec &ts)
static std::string GetMonitoringJson()
static void SetMinimumHeaderTimeout(struct timespec &ts)
static std::string GetMonitoringJson()
static unsigned GetDefaultMaxPendingOps()
static VerbsCache & Instance()
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
Definition: XrdClEnv.cc:110
bool PutString(const std::string &key, const std::string &value)
Definition: XrdClEnv.cc:52
bool ImportString(const std::string &key, const std::string &shellKey)
Definition: XrdClEnv.cc:214
bool ImportInt(const std::string &key, const std::string &shellKey)
Definition: XrdClEnv.cc:185
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
An interface for file plug-ins.
An interface for file plug-ins.
Handle diagnostics.
Definition: XrdClLog.hh:101
void SetTopicName(uint64_t topic, std::string name)
Map a topic number to a string.
Definition: XrdClLog.cc:163
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
bool ParseTimeout(const std::string &duration, struct timespec &, std::string &errmsg)
const int DefaultRequestTimeout
const uint64_t kLogXrdClHttp