XRootD
XrdThrottleManager.hh
Go to the documentation of this file.
1 
2 /*
3  * XrdThrottleManager
4  *
5  * This class provides an implementation of a throttle manager.
6  * The throttled manager purposely pause if the bandwidth, IOPS
7  * rate, or number of outstanding IO requests is sustained above
8  * a certain level.
9  *
10  * The XrdThrottleManager is user-aware and provides fairshare.
11  *
12  * This works by having a separate thread periodically refilling
13  * each user's shares.
14  *
15  * Note that we do not actually keep close track of users, but rather
16  * put them into a hash. This way, we can pretend there's a constant
17  * number of users and use a lock-free algorithm.
18  */
19 
20 #ifndef __XrdThrottleManager_hh_
21 #define __XrdThrottleManager_hh_
22 
23 #ifdef __GNUC__
24 #define likely(x) __builtin_expect(!!(x), 1)
25 #define unlikely(x) __builtin_expect(!!(x), 0)
26 #else
27 #define likely(x) x
28 #define unlikely(x) x
29 #endif
30 
31 #include <array>
32 #include <ctime>
33 #include <condition_variable>
34 #include <memory>
35 #include <mutex>
36 #include <string>
37 #include <unordered_map>
38 #include <vector>
39 
40 #include "XrdSys/XrdSysRAtomic.hh"
41 #include "XrdSys/XrdSysPthread.hh"
42 
43 class XrdSecEntity;
44 class XrdSysError;
45 class XrdOucTrace;
46 class XrdThrottleTimer;
47 class XrdXrootdGStream;
48 
49 namespace XrdThrottle {
50  class Configuration;
51 }
52 
54 {
55 
56 friend class XrdThrottleTimer;
57 
58 public:
59 
60 void Init();
61 
62 bool OpenFile(const std::string &entity, std::string &open_error_message);
63 bool CloseFile(const std::string &entity);
64 
65 void Apply(int reqsize, int reqops, int uid);
66 
68 
69 bool IsThrottling() {return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}
70 
71 // Returns the user name and UID for the given client.
72 //
73 // The UID is a hash of the user name; it is not guaranteed to be unique.
74 std::tuple<std::string, uint16_t> GetUserInfo(const XrdSecEntity *client);
75 
76 void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
77  {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
78  m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}
79 
80 void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
81  {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}
82 
83 void SetMaxOpen(unsigned long max_open) {m_max_open = max_open;}
84 
85 void SetMaxConns(unsigned long max_conns) {m_max_conns = max_conns;}
86 
87 void SetMaxWait(unsigned long max_wait) {m_max_wait_time = std::chrono::seconds(max_wait);}
88 
89 void SetMonitor(XrdXrootdGStream *gstream) {m_gstream = gstream;}
90 
91 //int Stats(char *buff, int blen, int do_sync=0) {return m_pool.Stats(buff, blen, do_sync);}
92 
93 // Notify that an I/O operation has started for a given user.
94 //
95 // If we are at the maximum concurrency limit then this will block;
96 // if we block for too long, the second return value will return false.
97 XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok);
98 
99 void PrepLoadShed(const char *opaque, std::string &lsOpaque);
100 
101 bool CheckLoadShed(const std::string &opaque);
102 
103 void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port);
104 
106 
107  ~XrdThrottleManager() {} // The buffmanager is never deleted
108 
109 protected:
110 
111 // Notify the manager an I/O operation has completed for a given user.
112 // This is used to update the I/O wait time for the user and, potentially,
113 // wake up a waiting thread.
114 void StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid);
115 
116 private:
117 
118 // Determine the UID for a given user name.
119 // This is a hash of the username; it is not guaranteed to be unique.
120 // The UID is used to index into the waiters array and cannot be more than m_max_users.
121 uint16_t GetUid(const std::string &);
122 
123 void Recompute();
124 
125 void RecomputeInternal();
126 
127 static
128 void * RecomputeBootstrap(void *pp);
129 
130 // Compute the order of wakeups for the existing waiters.
131 void ComputeWaiterOrder();
132 
133 // Walk through the outstanding IO operations and compute the per-user
134 // IO time.
135 //
136 // Meant to be done periodically as part of the Recompute interval. Used
137 // to make sure we have a better estimate of the concurrency for each user.
138 void UserIOAccounting();
139 
140 int WaitForShares();
141 
142 void GetShares(int &shares, int &request);
143 
144 void StealShares(int uid, int &reqsize, int &reqops);
145 
146 // Return the timer hash list ID to use for the current request.
147 //
148 // When on Linux, this will hash across the CPU ID; the goal is to distribute
149 // the different timers across several lists to avoid mutex contention.
150 static unsigned GetTimerListHash();
151 
152 // Notify a single waiter thread that it can proceed.
153 void NotifyOne();
154 
155 XrdOucTrace * m_trace;
156 XrdSysError * m_log;
157 
158 XrdSysCondVar m_compute_var;
159 
160 // Controls for the various rates.
161 float m_interval_length_seconds;
162 float m_bytes_per_second;
163 float m_ops_per_second;
164 int m_concurrency_limit;
165 
166 // Maintain the shares
167 
168 static constexpr int m_max_users = 1024; // Maximum number of users we can have; used for various fixed-size arrays.
169 std::vector<int> m_primary_bytes_shares;
170 std::vector<int> m_secondary_bytes_shares;
171 std::vector<int> m_primary_ops_shares;
172 std::vector<int> m_secondary_ops_shares;
173 int m_last_round_allocation;
174 
175 // Waiter counts for each user
176 struct alignas(64) Waiter
177 {
178  std::condition_variable m_cv; // Condition variable for waiters of this user.
179  std::mutex m_mutex; // Mutex for this structure
180  unsigned m_waiting{0}; // Number of waiting operations for this user.
181 
182  // EWMA of the concurrency for this user. This is used to determine how much
183  // above / below the user's concurrency share they've been recently. This subsequently
184  // will affect the likelihood of being woken up.
185  XrdSys::RAtomic<float> m_concurrency{0};
186 
187  // I/O time for this user since the last recompute interval. The value is used
188  // to compute the EWMA of the concurrency (m_concurrency).
190 
191  // Pointer to the XrdThrottleManager object that owns this waiter.
192  XrdThrottleManager *m_manager{nullptr};
193 
194  // Causes the current thread to wait until it's the user's turn to wake up.
195  bool Wait();
196 
197  // Wakes up one I/O operation for this user.
198  void NotifyOne(std::unique_lock<std::mutex> lock)
199  {
200  m_cv.notify_one();
201  }
202 };
203 std::array<Waiter, m_max_users> m_waiter_info;
204 
205 // Array with the wake up ordering of the waiter users.
206 // Every recompute interval, we compute how much over the concurrency limit
207 // each user is, quantize this to an integer number of shares and then set the
208 // array value to the user ID (so if user ID 5 has two shares, then there are two
209 // entries with value 5 in the array). The array is then shuffled to randomize the
210 // order of the wakeup.
211 //
212 // All reads and writes to the wake order array are meant to be relaxed atomics; if a thread
213 // has an outdated view of the array, it simply means that a given user might get slightly
214 // incorrect random probability of being woken up. That's seen as acceptable to keep
215 // the selection algorithm lock and fence-free.
216 std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_0;
217 std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_1; // A second wake order array; every recompute interval, we will swap the active array, avoiding locks.
218 XrdSys::RAtomic<char> m_wake_order_active; // The current active wake order array; 0 or 1
219 std::atomic<size_t> m_waiter_offset{0}; // Offset inside the wake order array; this is used to wake up the next potential user in line. Cannot be relaxed atomic as offsets need to be seen in order.
220 std::chrono::steady_clock::time_point m_last_waiter_recompute_time; // Last time we recomputed the wait ordering.
221 XrdSys::RAtomic<unsigned> m_waiting_users{0}; // Number of users waiting behind the throttle as of the last recompute time.
222 
223 std::atomic<uint32_t> m_io_active; // Count of in-progress IO operations: cannot be a relaxed atomic as ordering of inc/dec matters.
224 XrdSys::RAtomic<std::chrono::steady_clock::duration::rep> m_io_active_time; // Total IO wait time recorded since the last recompute interval; reset to zero about every second.
225 XrdSys::RAtomic<uint64_t> m_io_total{0}; // Monotonically increasing count of IO operations; reset to zero about every second.
226 
227 int m_stable_io_active{0}; // Number of IO operations in progress as of the last recompute interval; must hold m_compute_var lock when reading/writing.
228 uint64_t m_stable_io_total{0}; // Total IO operations since startup. Recomputed every second; must hold m_compute_var lock when reading/writing.
229 
230 std::chrono::steady_clock::duration m_stable_io_wait; // Total IO wait time as of the last recompute interval.
231 
232 // Load shed details
233 std::string m_loadshed_host;
234 unsigned m_loadshed_port;
235 unsigned m_loadshed_frequency;
236 
237 // The number of times we have an I/O operation that hit the concurrency limit.
238 // This is monotonically increasing and is "relaxed" because it's purely advisory;
239 // ordering of the increments between threads is not important.
240 XrdSys::RAtomic<int> m_loadshed_limit_hit;
241 
242 // Maximum number of open files
243 unsigned long m_max_open{0};
244 unsigned long m_max_conns{0};
245 std::unordered_map<std::string, unsigned long> m_file_counters;
246 std::unordered_map<std::string, unsigned long> m_conn_counters;
247 std::unordered_map<std::string, std::unique_ptr<std::unordered_map<pid_t, unsigned long>>> m_active_conns;
248 std::mutex m_file_mutex;
249 
250 // Track the ongoing I/O operations. We have several linked lists (hashed on the
251 // CPU ID) of I/O operations that are in progress. This way, we can periodically sum
252 // up the time spent in ongoing operations - which is important for operations that
253 // last longer than the recompute interval.
254 struct TimerList {
255  std::mutex m_mutex;
256  XrdThrottleTimer *m_first{nullptr};
257  XrdThrottleTimer *m_last{nullptr};
258 };
259 #if defined(__linux__)
260 static constexpr size_t m_timer_list_size = 32;
261 #else
262 static constexpr size_t m_timer_list_size = 1;
263 #endif
264 std::array<TimerList, m_timer_list_size> m_timer_list; // A vector of linked lists of I/O operations. We keep track of multiple instead of a single one to avoid a global mutex.
265 
266 // Maximum wait time for a user to perform an I/O operation before failing.
267 // Most clients have some sort of operation timeout; after that point, if we go
268 // ahead and do the work, it's wasted effort as the client has gone.
269 std::chrono::steady_clock::duration m_max_wait_time{std::chrono::seconds(30)};
270 
271 // Monitoring handle, if configured
272 XrdXrootdGStream* m_gstream{nullptr};
273 
274 static const char *TraceID;
275 
276 };
277 
279 {
280 
281 friend class XrdThrottleManager;
282 
283 public:
284 
286 {
287  if (m_manager) {
288  StopTimer();
289  }
290 }
291 
292 protected:
293 
295  m_start_time(std::chrono::steady_clock::time_point::min())
296 {}
297 
299  m_owner(uid),
300  m_timer_list_entry(XrdThrottleManager::GetTimerListHash()),
301  m_manager(manager),
302  m_start_time(std::chrono::steady_clock::now())
303 {
304  if (!m_manager) {
305  return;
306  }
307  auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
308  std::lock_guard<std::mutex> lock(timerList.m_mutex);
309  if (timerList.m_first == nullptr) {
310  timerList.m_first = this;
311  } else {
312  m_prev = timerList.m_last;
313  m_prev->m_next = this;
314  }
315  timerList.m_last = this;
316 }
317 
318 std::chrono::steady_clock::duration Reset() {
319  auto now = std::chrono::steady_clock::now();
320  auto last_start = m_start_time.exchange(now);
321  return now - last_start;
322 }
323 
324 private:
325 
326  void StopTimer()
327  {
328  if (!m_manager) return;
329 
330  auto event_duration = Reset();
331  auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
332  {
333  std::unique_lock<std::mutex> lock(timerList.m_mutex);
334  if (m_prev) {
335  m_prev->m_next = m_next;
336  if (m_next) {
337  m_next->m_prev = m_prev;
338  } else {
339  timerList.m_last = m_prev;
340  }
341  } else {
342  timerList.m_first = m_next;
343  if (m_next) {
344  m_next->m_prev = nullptr;
345  } else {
346  timerList.m_last = nullptr;
347  }
348  }
349  }
350  m_manager->StopIOTimer(event_duration, m_owner);
351  }
352 
353  const uint16_t m_owner{0};
354  const uint16_t m_timer_list_entry{0};
355  XrdThrottleManager *m_manager{nullptr};
356  XrdThrottleTimer *m_prev{nullptr};
357  XrdThrottleTimer *m_next{nullptr};
359 
360 };
361 
362 #endif
T exchange(T v, std::memory_order mo=std::memory_order_relaxed) noexcept
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMonitor(XrdXrootdGStream *gstream)
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
XrdThrottleTimer(XrdThrottleManager *manager, int uid)
std::chrono::steady_clock::duration Reset()