XRootD
XrdClHttpWorker.hh
Go to the documentation of this file.
1 /******************************************************************************/
2 /* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3 /* */
4 /* This file is part of the XrdClHttp client plugin for XRootD. */
5 /* */
6 /* XRootD is free software: you can redistribute it and/or modify it under */
7 /* the terms of the GNU Lesser General Public License as published by the */
8 /* Free Software Foundation, either version 3 of the License, or (at your */
9 /* option) any later version. */
10 /* */
11 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14 /* License for more details. */
15 /* */
16 /* The copyright holder's institutional names and contributor's names may not */
17 /* be used to endorse or promote products derived from this software without */
18 /* specific prior written permission of the institution or contributor. */
19 /******************************************************************************/
20 
21 #ifndef XRDCLHTTPWORKER_HH
22 #define XRDCLHTTPWORKER_HH
23 
24 #include "XrdClHttpOps.hh"
25 
26 #include <array>
27 #include <atomic>
28 #include <chrono>
29 #include <condition_variable>
30 #include <memory>
31 #include <mutex>
32 #include <unordered_map>
33 #include <unordered_set>
34 
35 typedef void CURL;
36 
37 namespace XrdCl {
38 
39 class Env;
40 class Log;
41 class ResponseHandler;
42 class URL;
43 
44 }
45 
46 namespace XrdClHttp {
47 
48 class HandlerQueue;
49 class VerbsCache;
50 
51 class CurlWorker {
52 public:
53  CurlWorker(std::shared_ptr<HandlerQueue> queue, VerbsCache &cache, XrdCl::Log* logger);
54 
55  CurlWorker(const CurlWorker &) = delete;
56 
57  void Run();
58  static void RunStatic(CurlWorker *myself);
59 
60  // Passes some initial values to the worker so it can start
61  void Start(std::unique_ptr<XrdClHttp::CurlWorker> self, std::thread tid);
62 
63  // Returns the configured X509 client certificate and key file name
64  std::tuple<std::string, std::string> ClientX509CertKeyFile() const;
65 
66  // Change the period (in seconds) for queue maintenance.
67  //
68  // Defaults to 5 seconds; smaller values are convenient for unit tests.
69  static void SetMaintenancePeriod(unsigned maint) {
70  m_maintenance_period.store(maint, std::memory_order_relaxed);
71  }
72 
73  static std::string GetMonitoringJson();
74 
75 private:
76  // Invoked by the destructor of one of our static members. This triggers when
77  // the plugin is unloaded, triggers the shutdown of each of the worker threads.
78  static void ShutdownAll();
79 
80  // Invoked by ShutdownAll, kills off the current object's thread
81  void Shutdown();
82 
83  // A list of all known worker threads -- used to shutdown the process
84  static std::vector<std::unique_ptr<XrdClHttp::CurlWorker>> m_workers;
85  // Protects the data in m_workers
86  static std::mutex m_workers_mutex;
87 
88  std::chrono::steady_clock::time_point m_last_prefix_log;
89  VerbsCache &m_cache; // Cache mapping server URLs to list of selected HTTP verbs.
90  std::shared_ptr<HandlerQueue> m_queue;
91 
92  // Queue for operations that can be unpaused.
93  // Paused operations occur when a PUT is started but cannot be continued
94  // because more data is needed from the caller.
95  std::shared_ptr<HandlerQueue> m_continue_queue;
96 
97  std::unordered_map<CURL*, std::pair<std::shared_ptr<CurlOperation>, std::chrono::system_clock::time_point>> m_op_map;
98  XrdCl::Log* m_logger;
99  std::string m_x509_client_cert_file;
100  std::string m_x509_client_key_file;
101 
102  const static unsigned m_max_ops{20};
103  static std::atomic<unsigned> m_maintenance_period;
104 
105  // File descriptor pair indicating shutdown is requested.
106  int m_shutdown_pipe_r{-1};
107  int m_shutdown_pipe_w{-1};
108  // Mutex for managing the startup of a worker
109  std::mutex m_start_lock;
110  // Condition variable for a worker to indicate to RunStatic that it is ready
111  std::condition_variable m_start_complete_cv;
112  // Flag indicating that Start has been called.
113  bool m_start_complete{false};
114  // The worker's thread object
115  std::thread m_self_tid;
116 
117  // Monitoring statistics
118  struct OpStats {
119  std::atomic<uint64_t> m_conncall_timeout{}; // Timeout due to the connection callout mechanism
120  std::atomic<uint64_t> m_client_timeout{};
121  std::atomic<std::chrono::system_clock::duration::rep> m_duration{};
122  std::atomic<uint64_t> m_error{};
123  std::atomic<uint64_t> m_finished{};
124  std::atomic<std::chrono::steady_clock::duration::rep> m_pause_duration{};
125  std::atomic<uint64_t> m_started{};
126  std::atomic<uint64_t> m_server_timeout{};
127  std::atomic<uint64_t> m_bytes{};
128  };
129 
130  enum class OpKind {
131  ConncallTimeout,
132  ClientTimeout,
133  Error,
134  Finish,
135  Start,
136  ServerTimeout,
137  Update
138  };
139  void OpRecord(XrdClHttp::CurlOperation &op, OpKind);
140 
141  static std::atomic<uint64_t> m_conncall_errors;
142  static std::atomic<uint64_t> m_conncall_req;
143  static std::atomic<uint64_t> m_conncall_success;
144  static std::atomic<uint64_t> m_conncall_timeout;
145  static std::array<std::array<OpStats, 403>, static_cast<size_t>(XrdClHttp::CurlOperation::HttpVerb::Count)> m_ops;
146  std::atomic<std::chrono::system_clock::rep> m_last_completed_cycle;
147  std::atomic<std::chrono::system_clock::rep> m_oldest_op;
148 
149  // Vector tracking known worker statistics.
150  static std::vector<std::atomic<std::chrono::system_clock::rep>*> m_workers_last_completed_cycle;
151  static std::vector<std::atomic<std::chrono::system_clock::rep>*> m_workers_oldest_op;
152  size_t m_stats_offset{0};
153  static std::mutex m_worker_stats_mutex;
154 
155  // shutdown trigger
156  static struct initcontrol {
157  initcontrol();
158  ~initcontrol();
159  } m_initcontrol;
160 };
161 
162 }
163 
164 #endif // XRDCLHTTPWORKER_HH
void CURL
std::tuple< std::string, std::string > ClientX509CertKeyFile() const
CurlWorker(std::shared_ptr< HandlerQueue > queue, VerbsCache &cache, XrdCl::Log *logger)
CurlWorker(const CurlWorker &)=delete
static void RunStatic(CurlWorker *myself)
void Start(std::unique_ptr< XrdClHttp::CurlWorker > self, std::thread tid)
static void SetMaintenancePeriod(unsigned maint)
static std::string GetMonitoringJson()
Handle diagnostics.
Definition: XrdClLog.hh:101
XrdSysError Log
Definition: XrdConfig.cc:113