XRootD
XrdThrottleManager.hh
Go to the documentation of this file.
1 
2 /*
3  * XrdThrottleManager
4  *
5  * This class provides an implementation of a throttle manager.
6  * The throttled manager purposely pause if the bandwidth, IOPS
7  * rate, or number of outstanding IO requests is sustained above
8  * a certain level.
9  *
10  * The XrdThrottleManager is user-aware and provides fairshare.
11  *
12  * This works by having a separate thread periodically refilling
13  * each user's shares.
14  *
15  * Note that we do not actually keep close track of users, but rather
16  * put them into a hash. This way, we can pretend there's a constant
17  * number of users and use a lock-free algorithm.
18  */
19 
20 #ifndef __XrdThrottleManager_hh_
21 #define __XrdThrottleManager_hh_
22 
23 #ifdef __GNUC__
24 #define likely(x) __builtin_expect(!!(x), 1)
25 #define unlikely(x) __builtin_expect(!!(x), 0)
26 #else
27 #define likely(x) x
28 #define unlikely(x) x
29 #endif
30 
31 #include <array>
32 #include <ctime>
33 #include <condition_variable>
34 #include <memory>
35 #include <mutex>
36 #include <string>
37 #include <shared_mutex>
38 #include <unordered_map>
39 #include <vector>
40 
41 #include "XrdSys/XrdSysRAtomic.hh"
42 #include "XrdSys/XrdSysPthread.hh"
43 
44 class XrdSecEntity;
45 class XrdSysError;
46 class XrdOucTrace;
47 class XrdThrottleTimer;
48 class XrdXrootdGStream;
49 
50 namespace XrdThrottle {
51  class Configuration;
52 }
53 
55 {
56 
57 friend class XrdThrottleTimer;
58 
59 public:
60 
61 void Init();
62 
63 bool OpenFile(const std::string &entity, std::string &open_error_message);
64 bool CloseFile(const std::string &entity);
65 
66 void Apply(int reqsize, int reqops, int uid);
67 
69 
70 bool IsThrottling() {return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}
71 
72 // Returns the user name and UID for the given client.
73 //
74 // The UID is a hash of the user name; it is not guaranteed to be unique.
75 std::tuple<std::string, uint16_t> GetUserInfo(const XrdSecEntity *client);
76 
77 void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
78  {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
79  m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}
80 
81 void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
82  {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}
83 
84 void SetMaxOpen(unsigned long max_open) {m_max_open = max_open;}
85 
86 void SetMaxConns(unsigned long max_conns) {m_max_conns = max_conns;}
87 
88 void SetMaxWait(unsigned long max_wait) {m_max_wait_time = std::chrono::seconds(max_wait);}
89 
90 // Load per-user limits from configuration file
91 // Returns 0 on success, non-zero on failure
92 int LoadUserLimits(const std::string &config_file);
93 
94 // Reload per-user limits (for runtime reloading)
95 int ReloadUserLimits();
96 
97 // Get per-user connection limit for a given username
98 // Returns 0 if no per-user limit is set (use global), otherwise returns the limit
99 unsigned long GetUserMaxConn(const std::string &username);
100 
101 void SetMonitor(XrdXrootdGStream *gstream) {m_gstream = gstream;}
102 
103 //int Stats(char *buff, int blen, int do_sync=0) {return m_pool.Stats(buff, blen, do_sync);}
104 
105 // Notify that an I/O operation has started for a given user.
106 //
107 // If we are at the maximum concurrency limit then this will block;
108 // if we block for too long, the second return value will return false.
109 XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok);
110 
111 void PrepLoadShed(const char *opaque, std::string &lsOpaque);
112 
113 bool CheckLoadShed(const std::string &opaque);
114 
115 void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port);
116 
118 
119  ~XrdThrottleManager() {} // The buffmanager is never deleted
120 
121 protected:
122 
123 // Notify the manager an I/O operation has completed for a given user.
124 // This is used to update the I/O wait time for the user and, potentially,
125 // wake up a waiting thread.
126 void StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid);
127 
128 private:
129 
130 // Determine the UID for a given user name.
131 // This is a hash of the username; it is not guaranteed to be unique.
132 // The UID is used to index into the waiters array and cannot be more than m_max_users.
133 uint16_t GetUid(const std::string &);
134 
135 void Recompute();
136 
137 void RecomputeInternal();
138 
139 static
140 void * RecomputeBootstrap(void *pp);
141 
142 // Compute the order of wakeups for the existing waiters.
143 void ComputeWaiterOrder();
144 
145 // Walk through the outstanding IO operations and compute the per-user
146 // IO time.
147 //
148 // Meant to be done periodically as part of the Recompute interval. Used
149 // to make sure we have a better estimate of the concurrency for each user.
150 void UserIOAccounting();
151 
152 int WaitForShares();
153 
154 void GetShares(int &shares, int &request);
155 
156 void StealShares(int uid, int &reqsize, int &reqops);
157 
158 // Return the timer hash list ID to use for the current request.
159 //
160 // When on Linux, this will hash across the CPU ID; the goal is to distribute
161 // the different timers across several lists to avoid mutex contention.
162 static unsigned GetTimerListHash();
163 
164 // Notify a single waiter thread that it can proceed.
165 void NotifyOne();
166 
167 XrdOucTrace * m_trace;
168 XrdSysError * m_log;
169 
170 XrdSysCondVar m_compute_var;
171 
172 // Controls for the various rates.
173 float m_interval_length_seconds;
174 float m_bytes_per_second;
175 float m_ops_per_second;
176 int m_concurrency_limit;
177 
178 // Maintain the shares
179 
180 static constexpr int m_max_users = 1024; // Maximum number of users we can have; used for various fixed-size arrays.
181 std::vector<int> m_primary_bytes_shares;
182 std::vector<int> m_secondary_bytes_shares;
183 std::vector<int> m_primary_ops_shares;
184 std::vector<int> m_secondary_ops_shares;
185 int m_last_round_allocation;
186 
187 // Waiter counts for each user
188 struct alignas(64) Waiter
189 {
190  std::condition_variable m_cv; // Condition variable for waiters of this user.
191  std::mutex m_mutex; // Mutex for this structure
192  unsigned m_waiting{0}; // Number of waiting operations for this user.
193 
194  // EWMA of the concurrency for this user. This is used to determine how much
195  // above / below the user's concurrency share they've been recently. This subsequently
196  // will affect the likelihood of being woken up.
197  XrdSys::RAtomic<float> m_concurrency{0};
198 
199  // I/O time for this user since the last recompute interval. The value is used
200  // to compute the EWMA of the concurrency (m_concurrency).
202 
203  // Pointer to the XrdThrottleManager object that owns this waiter.
204  XrdThrottleManager *m_manager{nullptr};
205 
206  // Causes the current thread to wait until it's the user's turn to wake up.
207  bool Wait();
208 
209  // Wakes up one I/O operation for this user.
210  void NotifyOne(std::unique_lock<std::mutex> lock)
211  {
212  m_cv.notify_one();
213  }
214 };
215 std::array<Waiter, m_max_users> m_waiter_info;
216 
217 // Array with the wake up ordering of the waiter users.
218 // Every recompute interval, we compute how much over the concurrency limit
219 // each user is, quantize this to an integer number of shares and then set the
220 // array value to the user ID (so if user ID 5 has two shares, then there are two
221 // entries with value 5 in the array). The array is then shuffled to randomize the
222 // order of the wakeup.
223 //
224 // All reads and writes to the wake order array are meant to be relaxed atomics; if a thread
225 // has an outdated view of the array, it simply means that a given user might get slightly
226 // incorrect random probability of being woken up. That's seen as acceptable to keep
227 // the selection algorithm lock and fence-free.
228 std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_0;
229 std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_1; // A second wake order array; every recompute interval, we will swap the active array, avoiding locks.
230 XrdSys::RAtomic<char> m_wake_order_active; // The current active wake order array; 0 or 1
231 std::atomic<size_t> m_waiter_offset{0}; // Offset inside the wake order array; this is used to wake up the next potential user in line. Cannot be relaxed atomic as offsets need to be seen in order.
232 std::chrono::steady_clock::time_point m_last_waiter_recompute_time; // Last time we recomputed the wait ordering.
233 XrdSys::RAtomic<unsigned> m_waiting_users{0}; // Number of users waiting behind the throttle as of the last recompute time.
234 
235 std::atomic<uint32_t> m_io_active; // Count of in-progress IO operations: cannot be a relaxed atomic as ordering of inc/dec matters.
236 XrdSys::RAtomic<std::chrono::steady_clock::duration::rep> m_io_active_time; // Total IO wait time recorded since the last recompute interval; reset to zero about every second.
237 XrdSys::RAtomic<uint64_t> m_io_total{0}; // Monotonically increasing count of IO operations; reset to zero about every second.
238 
239 int m_stable_io_active{0}; // Number of IO operations in progress as of the last recompute interval; must hold m_compute_var lock when reading/writing.
240 uint64_t m_stable_io_total{0}; // Total IO operations since startup. Recomputed every second; must hold m_compute_var lock when reading/writing.
241 
242 std::chrono::steady_clock::duration m_stable_io_wait; // Total IO wait time as of the last recompute interval.
243 
244 // Load shed details
245 std::string m_loadshed_host;
246 unsigned m_loadshed_port;
247 unsigned m_loadshed_frequency;
248 
249 // The number of times we have an I/O operation that hit the concurrency limit.
250 // This is monotonically increasing and is "relaxed" because it's purely advisory;
251 // ordering of the increments between threads is not important.
252 XrdSys::RAtomic<int> m_loadshed_limit_hit;
253 
254 // Maximum number of open files
255 unsigned long m_max_open{0};
256 unsigned long m_max_conns{0};
257 std::unordered_map<std::string, unsigned long> m_file_counters;
258 std::unordered_map<std::string, unsigned long> m_conn_counters;
259 std::unordered_map<std::string, std::unique_ptr<std::unordered_map<pid_t, unsigned long>>> m_active_conns;
260 std::mutex m_file_mutex;
261 
262 // Per-user connection limits
263 struct UserLimit {
264  unsigned long max_conn{0}; // 0 means no limit (use global)
265  bool is_wildcard{false}; // true if this is a wildcard pattern
266 };
267 std::unordered_map<std::string, UserLimit> m_user_limits;
268 std::shared_mutex m_user_limits_mutex;
269 std::string m_user_config_file;
270 
271 // Track the ongoing I/O operations. We have several linked lists (hashed on the
272 // CPU ID) of I/O operations that are in progress. This way, we can periodically sum
273 // up the time spent in ongoing operations - which is important for operations that
274 // last longer than the recompute interval.
275 struct TimerList {
276  std::mutex m_mutex;
277  XrdThrottleTimer *m_first{nullptr};
278  XrdThrottleTimer *m_last{nullptr};
279 };
280 #if defined(__linux__)
281 static constexpr size_t m_timer_list_size = 32;
282 #else
283 static constexpr size_t m_timer_list_size = 1;
284 #endif
285 std::array<TimerList, m_timer_list_size> m_timer_list; // A vector of linked lists of I/O operations. We keep track of multiple instead of a single one to avoid a global mutex.
286 
287 // Maximum wait time for a user to perform an I/O operation before failing.
288 // Most clients have some sort of operation timeout; after that point, if we go
289 // ahead and do the work, it's wasted effort as the client has gone.
290 std::chrono::steady_clock::duration m_max_wait_time{std::chrono::seconds(30)};
291 
292 // Monitoring handle, if configured
293 XrdXrootdGStream* m_gstream{nullptr};
294 
295 static const char *TraceID;
296 
297 };
298 
300 {
301 
302 friend class XrdThrottleManager;
303 
304 public:
305 
307 {
308  if (m_manager) {
309  StopTimer();
310  }
311 }
312 
313 protected:
314 
316  m_start_time(std::chrono::steady_clock::time_point::min())
317 {}
318 
320  m_owner(uid),
321  m_timer_list_entry(XrdThrottleManager::GetTimerListHash()),
322  m_manager(manager),
323  m_start_time(std::chrono::steady_clock::now())
324 {
325  if (!m_manager) {
326  return;
327  }
328  auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
329  std::lock_guard<std::mutex> lock(timerList.m_mutex);
330  if (timerList.m_first == nullptr) {
331  timerList.m_first = this;
332  } else {
333  m_prev = timerList.m_last;
334  m_prev->m_next = this;
335  }
336  timerList.m_last = this;
337 }
338 
339 std::chrono::steady_clock::duration Reset() {
340  auto now = std::chrono::steady_clock::now();
341  auto last_start = m_start_time.exchange(now);
342  return now - last_start;
343 }
344 
345 private:
346 
347  void StopTimer()
348  {
349  if (!m_manager) return;
350 
351  auto event_duration = Reset();
352  auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
353  {
354  std::unique_lock<std::mutex> lock(timerList.m_mutex);
355  if (m_prev) {
356  m_prev->m_next = m_next;
357  if (m_next) {
358  m_next->m_prev = m_prev;
359  } else {
360  timerList.m_last = m_prev;
361  }
362  } else {
363  timerList.m_first = m_next;
364  if (m_next) {
365  m_next->m_prev = nullptr;
366  } else {
367  timerList.m_last = nullptr;
368  }
369  }
370  }
371  m_manager->StopIOTimer(event_duration, m_owner);
372  }
373 
374  const uint16_t m_owner{0};
375  const uint16_t m_timer_list_entry{0};
376  XrdThrottleManager *m_manager{nullptr};
377  XrdThrottleTimer *m_prev{nullptr};
378  XrdThrottleTimer *m_next{nullptr};
380 
381 };
382 
383 #endif
T exchange(T v, std::memory_order mo=std::memory_order_relaxed) noexcept
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMonitor(XrdXrootdGStream *gstream)
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
int LoadUserLimits(const std::string &config_file)
void SetMaxWait(unsigned long max_wait)
unsigned long GetUserMaxConn(const std::string &username)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
XrdThrottleTimer(XrdThrottleManager *manager, int uid)
std::chrono::steady_clock::duration Reset()