XRootD
XrdThrottleManager.cc
Go to the documentation of this file.
1 
2 #include "XrdThrottleManager.hh"
3 
4 #include "XrdOuc/XrdOucEnv.hh"
5 #include "XrdSec/XrdSecEntity.hh"
8 #include "XrdSys/XrdSysTimer.hh"
12 
13 #define XRD_TRACE m_trace->
14 
15 #include "INIReader.h"
17 
18 #include <algorithm>
19 #include <array>
20 #include <cmath>
21 #include <random>
22 #include <sstream>
23 
24 #if defined(__linux__)
25 
26 #include <sched.h>
27 unsigned XrdThrottleManager::GetTimerListHash() {
28  int cpu = sched_getcpu();
29  if (cpu < 0) {
30  return 0;
31  }
32  return cpu % m_timer_list_size;
33 }
34 
35 #else
36 
37 unsigned XrdThrottleManager::GetTimerListHash() {
38  return 0;
39 }
40 
41 #endif
42 
43 const char *
44 XrdThrottleManager::TraceID = "ThrottleManager";
45 
47  m_trace(tP),
48  m_log(lP),
49  m_interval_length_seconds(1.0),
50  m_bytes_per_second(-1),
51  m_ops_per_second(-1),
52  m_concurrency_limit(-1),
53  m_last_round_allocation(100*1024),
54  m_loadshed_host(""),
55  m_loadshed_port(0),
56  m_loadshed_frequency(0)
57 {
58 }
59 
60 void
62 {
63 
64  auto max_open = config.GetMaxOpen();
65  if (max_open != -1) SetMaxOpen(max_open);
66  auto max_conn = config.GetMaxConn();
67  if (max_conn != -1) SetMaxConns(max_conn);
68  auto max_wait = config.GetMaxWait();
69  if (max_wait != -1) SetMaxWait(max_wait);
70 
72  config.GetThrottleIOPSRate(),
73  config.GetThrottleConcurrency(),
74  static_cast<float>(config.GetThrottleRecomputeIntervalMS())/1000.0);
75 
76  m_trace->What = config.GetTraceLevels();
77 
78  auto loadshed_host = config.GetLoadshedHost();
79  auto loadshed_port = config.GetLoadshedPort();
80  auto loadshed_freq = config.GetLoadshedFreq();
81  if (!loadshed_host.empty() && loadshed_port > 0 && loadshed_freq > 0)
82  {
83  // Loadshed specified, so set it.
84  SetLoadShed(loadshed_host, loadshed_port, loadshed_freq);
85  }
86 
87  // Load per-user configuration if specified
88  auto user_config_file = config.GetUserConfigFile();
89  if (!user_config_file.empty())
90  {
91  m_user_config_file = user_config_file;
92  if (LoadUserLimits(user_config_file) != 0)
93  {
94  m_log->Emsg("ThrottleManager", "Failed to load per-user configuration file", user_config_file.c_str());
95  }
96  }
97 }
98 
99 void
101 {
102  TRACE(DEBUG, "Initializing the throttle manager.");
103  // Initialize all our shares to zero.
104  m_primary_bytes_shares.resize(m_max_users);
105  m_secondary_bytes_shares.resize(m_max_users);
106  m_primary_ops_shares.resize(m_max_users);
107  m_secondary_ops_shares.resize(m_max_users);
108  for (auto & waiter : m_waiter_info) {
109  waiter.m_manager = this;
110  }
111 
112  // Allocate each user 100KB and 10 ops to bootstrap;
113  for (int i=0; i<m_max_users; i++)
114  {
115  m_primary_bytes_shares[i] = m_last_round_allocation;
116  m_secondary_bytes_shares[i] = 0;
117  m_primary_ops_shares[i] = 10;
118  m_secondary_ops_shares[i] = 0;
119  }
120 
121  int rc;
122  pthread_t tid;
123  if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
124  m_log->Emsg("ThrottleManager", rc, "create throttle thread");
125 
126 }
127 
128 std::tuple<std::string, uint16_t>
130  // client can be null, if so, return nobody
131  if (!client) {
132  return std::make_tuple("nobody", GetUid("nobody"));
133  }
134 
135  // Try various potential "names" associated with the request, from the most
136  // specific to most generic.
137  std::string user;
138 
139  if (client->eaAPI && client->eaAPI->Get("token.subject", user)) {
140  if (client->vorg) user = std::string(client->vorg) + ":" + user;
141  } else if (client->eaAPI) {
142  std::string request_name;
143  if (client->eaAPI->Get("request.name", request_name) && !request_name.empty()) user = request_name;
144  }
145  if (user.empty()) {user = client->name ? client->name : "nobody";}
146  uint16_t uid = GetUid(user.c_str());
147  return std::make_tuple(user, uid);
148 }
149 
150 /*
151  * Take as many shares as possible to fulfill the request; update
152  * request with current remaining value, or zero if satisfied.
153  */
154 inline void
155 XrdThrottleManager::GetShares(int &shares, int &request)
156 {
157  int remaining;
158  AtomicFSub(remaining, shares, request);
159  if (remaining > 0)
160  {
161  request -= (remaining < request) ? remaining : request;
162  }
163 }
164 
165 /*
166  * Iterate through all of the secondary shares, attempting
167  * to steal enough to fulfill the request.
168  */
169 void
170 XrdThrottleManager::StealShares(int uid, int &reqsize, int &reqops)
171 {
172  if (!reqsize && !reqops) return;
173  TRACE(BANDWIDTH, "Stealing shares to fill request of " << reqsize << " bytes");
174  TRACE(IOPS, "Stealing shares to fill request of " << reqops << " ops.");
175 
176  for (int i=uid+1; i % m_max_users == uid; i++)
177  {
178  if (reqsize) GetShares(m_secondary_bytes_shares[i % m_max_users], reqsize);
179  if (reqops) GetShares(m_secondary_ops_shares[ i % m_max_users], reqops);
180  }
181 
182  TRACE(BANDWIDTH, "After stealing shares, " << reqsize << " of request bytes remain.");
183  TRACE(IOPS, "After stealing shares, " << reqops << " of request ops remain.");
184 }
185 
186 /*
187  * Increment the number of files held open by a given entity. Returns false
188  * if the user is at the maximum; in this case, the internal counter is not
189  * incremented.
190  */
191 bool
192 XrdThrottleManager::OpenFile(const std::string &entity, std::string &error_message)
193 {
194  // Get per-user connection limit (0 means use global)
195  unsigned long user_max_conn = GetUserMaxConn(entity);
196  unsigned long effective_max_conn = user_max_conn < m_max_conns ? user_max_conn : m_max_conns;
197 
198  if (m_max_open == 0 && effective_max_conn == 0) return true;
199 
200  const std::lock_guard<std::mutex> lock(m_file_mutex);
201  auto iter = m_file_counters.find(entity);
202  unsigned long cur_open_files = 0, cur_open_conns;
203  if (m_max_open) {
204  if (iter == m_file_counters.end()) {
205  m_file_counters[entity] = 1;
206  TRACE(FILES, "User " << entity << " has opened their first file");
207  cur_open_files = 1;
208  } else if (iter->second < m_max_open) {
209  iter->second++;
210  cur_open_files = iter->second;
211  } else {
212  std::stringstream ss;
213  ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
214  TRACE(FILES, ss.str());
215  error_message = ss.str();
216  return false;
217  }
218  }
219 
220  if (effective_max_conn > 0) {
221  auto pid = XrdSysThread::Num();
222  auto conn_iter = m_active_conns.find(entity);
223  auto conn_count_iter = m_conn_counters.find(entity);
224  if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == effective_max_conn) &&
225  (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
226  {
227  // note: we are rolling back the increment in open files
228  if (m_max_open) iter->second--;
229  std::stringstream ss;
230  ss << "User " << entity << " has hit the limit of " << effective_max_conn <<
231  " open connections";
232  if (user_max_conn > 0) {
233  ss << " (per-user limit)";
234  }
235  TRACE(CONNS, ss.str());
236  error_message = ss.str();
237  return false;
238  }
239  if (conn_iter == m_active_conns.end()) {
240  std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
241  new std::unordered_map<pid_t, unsigned long>());
242  (*conn_map)[pid] = 1;
243  m_active_conns[entity] = std::move(conn_map);
244  if (conn_count_iter == m_conn_counters.end()) {
245  m_conn_counters[entity] = 1;
246  cur_open_conns = 1;
247  } else {
248  m_conn_counters[entity] ++;
249  cur_open_conns = m_conn_counters[entity];
250  }
251  } else {
252  auto pid_iter = conn_iter->second->find(pid);
253  if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
254  (*(conn_iter->second))[pid] = 1;
255  conn_count_iter->second++;
256  cur_open_conns = conn_count_iter->second;
257  } else {
258  (*(conn_iter->second))[pid] ++;
259  cur_open_conns = conn_count_iter->second;
260  }
261  }
262  TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections (limit: " << effective_max_conn << ")");
263  }
264  if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
265  return true;
266 }
267 
268 
269 /*
270  * Decrement the number of files held open by a given entity.
271  *
272  * Returns false if the value would have fallen below zero or
273  * if the entity isn't tracked.
274  */
275 bool
276 XrdThrottleManager::CloseFile(const std::string &entity)
277 {
278  if (m_max_open == 0 && m_max_conns == 0) return true;
279 
280  bool result = true;
281  const std::lock_guard<std::mutex> lock(m_file_mutex);
282  if (m_max_open) {
283  auto iter = m_file_counters.find(entity);
284  if (iter == m_file_counters.end()) {
285  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
286  result = false;
287  } else if (iter->second == 0) {
288  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
289  result = false;
290  } else {
291  iter->second--;
292  }
293  if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
294  " remain open");
295  }
296 
297  if (m_max_conns) {
298  auto pid = XrdSysThread::Num();
299  auto conn_iter = m_active_conns.find(entity);
300  auto conn_count_iter = m_conn_counters.find(entity);
301  if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
302  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
303  " tracking");
304  return false;
305  }
306  auto pid_iter = conn_iter->second->find(pid);
307  if (pid_iter == conn_iter->second->end()) {
308  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
309  " tracking");
310  return false;
311  }
312  if (pid_iter->second == 0) {
313  TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
314  " plugin thinks was idle");
315  } else {
316  pid_iter->second--;
317  }
318  if (conn_count_iter == m_conn_counters.end()) {
319  TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
320  " observed an open file");
321  } else if (pid_iter->second == 0) {
322  if (conn_count_iter->second == 0) {
323  TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
324  " throttle plugin already thought all connections were idle");
325  } else {
326  conn_count_iter->second--;
327  TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
328  << conn_count_iter->second << " active connections remain");
329  }
330  }
331  }
332 
333  return result;
334 }
335 
336 
337 /*
338  * Apply the throttle. If there are no limits set, returns immediately. Otherwise,
339  * this applies the limits as best possible, stalling the thread if necessary.
340  */
341 void
342 XrdThrottleManager::Apply(int reqsize, int reqops, int uid)
343 {
344  if (m_bytes_per_second < 0)
345  reqsize = 0;
346  if (m_ops_per_second < 0)
347  reqops = 0;
348  while (reqsize || reqops)
349  {
350  // Subtract the requested out of the shares
351  AtomicBeg(m_compute_var);
352  GetShares(m_primary_bytes_shares[uid], reqsize);
353  if (reqsize)
354  {
355  TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
356  GetShares(m_secondary_bytes_shares[uid], reqsize);
357  TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
358  }
359  else
360  {
361  TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
362  }
363  GetShares(m_primary_ops_shares[uid], reqops);
364  if (reqops)
365  {
366  GetShares(m_secondary_ops_shares[uid], reqops);
367  }
368  StealShares(uid, reqsize, reqops);
369  AtomicEnd(m_compute_var);
370 
371  if (reqsize || reqops)
372  {
373  if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
374  if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
375  m_compute_var.Wait();
376  m_loadshed_limit_hit++;
377  }
378  }
379 
380 }
381 
382 void
383 XrdThrottleManager::UserIOAccounting()
384 {
385  std::chrono::steady_clock::duration::rep total_active_time = 0;
386  for (size_t idx = 0; idx < m_timer_list.size(); idx++) {
387  auto &timerList = m_timer_list[idx];
388  std::unique_lock<std::mutex> lock(timerList.m_mutex);
389  auto timer = timerList.m_first;
390  while (timer) {
391  auto next = timer->m_next;
392  auto uid = timer->m_owner;
393  auto &waiter = m_waiter_info[uid];
394  auto recent_duration = timer->Reset();
395  waiter.m_io_time += recent_duration.count();
396 
397  total_active_time += recent_duration.count();
398  timer = next;
399  }
400  }
401  m_io_active_time += total_active_time;
402 }
403 
404 void
405 XrdThrottleManager::ComputeWaiterOrder()
406 {
407  // Update the IO time for long-running I/O operations. This prevents,
408  // for example, a 2-minute I/O operation from causing a spike in
409  // concurrency because it's otherwise only reported at the end.
410  UserIOAccounting();
411 
412  auto now = std::chrono::steady_clock::now();
413  auto elapsed = now - m_last_waiter_recompute_time;
414  m_last_waiter_recompute_time = now;
415  std::chrono::duration<double> elapsed_secs = elapsed;
416  // Alpha is the decay factor for the exponential moving average. One window is 10 seconds,
417  // so every 10 seconds we decay the prior average by 1/e (that is, the weight is 64% of the
418  // total). This means the contribution of I/O load from a minute ago is 0.2% of the total.
419 
420  // The moving average will be used to determine how close the user is to their "fair share"
421  // of the concurrency limit among the users that are waiting.
422  auto alpha = 1 - std::exp(-1 * elapsed_secs.count() / 10.0);
423 
424  std::vector<double> share;
425  share.resize(m_max_users);
426  size_t users_with_waiters = 0;
427  // For each user, compute their current concurrency and determine how many waiting users
428  // total there are.
429  for (int i = 0; i < m_max_users; i++)
430  {
431  auto &waiter = m_waiter_info[i];
432  auto io_duration_rep = waiter.m_io_time.exchange(std::chrono::steady_clock::duration(0).count());
433  std::chrono::steady_clock::duration io_duration = std::chrono::steady_clock::duration(io_duration_rep);
434  std::chrono::duration<double> io_duration_secs = io_duration;
435  auto prev_concurrency = io_duration_secs.count() / elapsed_secs.count();
436  float new_concurrency = waiter.m_concurrency;
437 
438  new_concurrency = (1 - alpha) * new_concurrency + alpha * prev_concurrency;
439  waiter.m_concurrency = new_concurrency;
440  if (new_concurrency > 0) {
441  TRACE(DEBUG, "User " << i << " has concurrency of " << new_concurrency);
442  }
443  unsigned waiting;
444  {
445  std::lock_guard<std::mutex> lock(waiter.m_mutex);
446  waiting = waiter.m_waiting;
447  }
448  if (waiting > 0)
449  {
450  share[i] = new_concurrency;
451  TRACE(DEBUG, "User " << i << " has concurrency of " << share[i] << " and is waiting for " << waiting);
452  // Handle the division-by-zero case; if we have no history of usage whatsoever, we should pretend we
453  // have at least some minimal load
454  if (share[i] == 0) {
455  share[i] = 0.1;
456  }
457  users_with_waiters++;
458  }
459  else
460  {
461  share[i] = 0;
462  }
463  }
464  auto fair_share = static_cast<double>(m_concurrency_limit) / static_cast<double>(users_with_waiters);
465  std::vector<uint16_t> waiter_order;
466  waiter_order.resize(m_max_users);
467 
468  // Calculate the share for each user. We assume the user should get a share proportional to how
469  // far above or below the fair share they are. So, a user with concurrency of 20 when the fairshare
470  // is 10 will get 0.5 shares; a user with concurrency of 5 when the fairshare is 10 will get 2.0 shares.
471  double shares_sum = 0;
472  for (int idx = 0; idx < m_max_users; idx++)
473  {
474  if (share[idx]) {
475  shares_sum += fair_share / share[idx];
476  }
477  }
478 
479  // We must quantize the overall shares into an array of 1024 elements. We do this by
480  // scaling up (or down) based on the total number of shares computed above. Note this
481  // quantization can lead to an over-provisioned user being assigned zero shares; thus,
482  // we scale based on (1024-#users) so we can give one extra share to each user.
483  auto scale_factor = (static_cast<double>(m_max_users) - static_cast<double>(users_with_waiters)) / shares_sum;
484  size_t offset = 0;
485  for (int uid = 0; uid < m_max_users; uid++) {
486  if (share[uid] > 0) {
487  auto shares = static_cast<unsigned>(scale_factor * fair_share / share[uid]) + 1;
488  TRACE(DEBUG, "User " << uid << " has " << shares << " shares");
489  for (unsigned idx = 0; idx < shares; idx++)
490  {
491  waiter_order[offset % m_max_users] = uid;
492  offset++;
493  }
494  }
495  }
496  if (offset < m_max_users) {
497  for (size_t idx = offset; idx < m_max_users; idx++) {
498  waiter_order[idx] = -1;
499  }
500  }
501  // Shuffle the order to randomize the wakeup order.
502  std::shuffle(waiter_order.begin(), waiter_order.end(), std::default_random_engine());
503 
504  // Copy the order to the inactive array. We do not shuffle in-place because RAtomics are
505  // not move constructible, which is a requirement for std::shuffle.
506  auto &waiter_order_to_modify = (m_wake_order_active == 0) ? m_wake_order_1 : m_wake_order_0;
507  std::copy(waiter_order.begin(), waiter_order.end(), waiter_order_to_modify.begin());
508 
509  // Set the array we just modified to be the active one. Since this is a relaxed write, it could take
510  // some time for other CPUs to see the change; that's OK as this is all stochastic anyway.
511  m_wake_order_active = (m_wake_order_active + 1) % 2;
512 
513  m_waiter_offset = 0;
514 
515  // If we find ourselves below the concurrency limit because we woke up too few operations in the last
516  // interval, try waking up enough operations to fill the gap. If we race with new incoming operations,
517  // the threads will just go back to sleep.
518  if (users_with_waiters) {
519  m_waiting_users = users_with_waiters;
520  auto io_active = m_io_active.load(std::memory_order_acquire);
521  for (size_t idx = io_active; idx < static_cast<size_t>(m_concurrency_limit); idx++) {
522  NotifyOne();
523  }
524  }
525 }
526 
527 void *
528 XrdThrottleManager::RecomputeBootstrap(void *instance)
529 {
530  XrdThrottleManager * manager = static_cast<XrdThrottleManager*>(instance);
531  manager->Recompute();
532  return NULL;
533 }
534 
535 void
536 XrdThrottleManager::Recompute()
537 {
538  while (1)
539  {
540  // The connection counter can accumulate a number of known-idle connections.
541  // We only need to keep long-term memory of idle ones. Take this chance to garbage
542  // collect old connection counters.
543  if (m_max_open || m_max_conns) {
544  const std::lock_guard<std::mutex> lock(m_file_mutex);
545  for (auto iter = m_active_conns.begin(); iter != m_active_conns.end();)
546  {
547  auto & conn_count = *iter;
548  if (!conn_count.second) {
549  iter = m_active_conns.erase(iter);
550  continue;
551  }
552  for (auto iter2 = conn_count.second->begin(); iter2 != conn_count.second->end();) {
553  if (iter2->second == 0) {
554  iter2 = conn_count.second->erase(iter2);
555  } else {
556  iter2++;
557  }
558  }
559  if (!conn_count.second->size()) {
560  iter = m_active_conns.erase(iter);
561  } else {
562  iter++;
563  }
564  }
565  for (auto iter = m_conn_counters.begin(); iter != m_conn_counters.end();) {
566  if (!iter->second) {
567  iter = m_conn_counters.erase(iter);
568  } else {
569  iter++;
570  }
571  }
572  for (auto iter = m_file_counters.begin(); iter != m_file_counters.end();) {
573  if (!iter->second) {
574  iter = m_file_counters.erase(iter);
575  } else {
576  iter++;
577  }
578  }
579  }
580 
581  TRACE(DEBUG, "Recomputing fairshares for throttle.");
582  RecomputeInternal();
583  ComputeWaiterOrder();
584  TRACE(DEBUG, "Finished recomputing fairshares for throttle; sleeping for " << m_interval_length_seconds << " seconds.");
585  XrdSysTimer::Wait(static_cast<int>(1000*m_interval_length_seconds));
586  }
587 }
588 
589 /*
590  * The heart of the manager approach.
591  *
592  * This routine periodically recomputes the shares of each current user.
593  * Each user has a "primary" and a "secondary" share. At the end of the
594  * each time interval, the remaining primary share is moved to secondary.
595  * A user can utilize both shares; if both are gone, they must block until
596  * the next recompute interval.
597  *
598  * The secondary share can be "stolen" by any other user; so, if a user
599  * is idle or under-utilizing, their share can be used by someone else.
600  * However, they can never be completely starved, as no one can steal
601  * primary share.
602  *
603  * In this way, we violate the throttle for an interval, but never starve.
604  *
605  */
606 void
607 XrdThrottleManager::RecomputeInternal()
608 {
609  // Compute total shares for this interval;
610  float intervals_per_second = 1.0/m_interval_length_seconds;
611  float total_bytes_shares = m_bytes_per_second / intervals_per_second;
612  float total_ops_shares = m_ops_per_second / intervals_per_second;
613 
614  // Compute the number of active users; a user is active if they used
615  // any primary share during the last interval;
616  AtomicBeg(m_compute_var);
617  float active_users = 0;
618  long bytes_used = 0;
619  for (int i=0; i<m_max_users; i++)
620  {
621  int primary = AtomicFAZ(m_primary_bytes_shares[i]);
622  if (primary != m_last_round_allocation)
623  {
624  active_users++;
625  if (primary >= 0)
626  m_secondary_bytes_shares[i] = primary;
627  primary = AtomicFAZ(m_primary_ops_shares[i]);
628  if (primary >= 0)
629  m_secondary_ops_shares[i] = primary;
630  bytes_used += (primary < 0) ? m_last_round_allocation : (m_last_round_allocation-primary);
631  }
632  }
633 
634  if (active_users == 0)
635  {
636  active_users++;
637  }
638 
639  // Note we allocate the same number of shares to *all* users, not
640  // just the active ones. If a new user becomes active in the next
641  // interval, we'll go over our bandwidth budget just a bit.
642  m_last_round_allocation = static_cast<int>(total_bytes_shares / active_users);
643  int ops_shares = static_cast<int>(total_ops_shares / active_users);
644  TRACE(BANDWIDTH, "Round byte allocation " << m_last_round_allocation << " ; last round used " << bytes_used << ".");
645  TRACE(IOPS, "Round ops allocation " << ops_shares);
646  for (int i=0; i<m_max_users; i++)
647  {
648  m_primary_bytes_shares[i] = m_last_round_allocation;
649  m_primary_ops_shares[i] = ops_shares;
650  }
651 
652  AtomicEnd(m_compute_var);
653 
654  // Reset the loadshed limit counter.
655  int limit_hit = m_loadshed_limit_hit.exchange(0);
656  TRACE(DEBUG, "Throttle limit hit " << limit_hit << " times during last interval.");
657 
658  // Update the IO counters
659  m_compute_var.Lock();
660  m_stable_io_active = m_io_active.load(std::memory_order_acquire);
661  auto io_active = m_stable_io_active;
662  m_stable_io_total = m_io_total;
663  auto io_total = m_stable_io_total;
664  auto io_wait_rep = m_io_active_time.exchange(std::chrono::steady_clock::duration(0).count());
665  m_stable_io_wait += std::chrono::steady_clock::duration(io_wait_rep);
666 
667  m_compute_var.UnLock();
668 
669  auto io_wait_ms = std::chrono::duration_cast<std::chrono::milliseconds>(m_stable_io_wait).count();
670  TRACE(IOLOAD, "Current IO counter is " << io_active << "; total IO active time is " << io_wait_ms << "ms.");
671  if (m_gstream)
672  {
673  char buf[128];
674  auto len = snprintf(buf, 128,
675  R"({"event":"throttle_update","io_wait":%.4f,"io_active":%d,"io_total":%llu})",
676  static_cast<double>(io_wait_ms) / 1000.0, io_active, static_cast<long long unsigned>(io_total));
677  auto suc = (len < 128) ? m_gstream->Insert(buf, len + 1) : false;
678  if (!suc)
679  {
680  TRACE(IOLOAD, "Failed g-stream insertion of throttle_update record (len=" << len << "): " << buf);
681  }
682  }
683  m_compute_var.Broadcast();
684 }
685 
686 /*
687  * Do a simple hash across the username.
688  */
689 uint16_t
690 XrdThrottleManager::GetUid(const std::string &username)
691 {
692  std::hash<std::string> hash_fn;
693  auto hash = hash_fn(username);
694  auto uid = static_cast<uint16_t>(hash % m_max_users);
695  TRACE(DEBUG, "Mapping user " << username << " to UID " << uid);
696  return uid;
697 }
698 
699 /*
700  * Notify a single waiter thread that it can proceed.
701  */
702 void
703 XrdThrottleManager::NotifyOne()
704 {
705  auto &wake_order = (m_wake_order_active == 0) ? m_wake_order_0 : m_wake_order_1;
706 
707  for (size_t idx = 0; idx < m_max_users; ++idx)
708  {
709  auto offset = m_waiter_offset.fetch_add(1, std::memory_order_acq_rel);
710  int16_t uid = wake_order[offset % m_max_users];
711  if (uid < 0)
712  {
713  continue;
714  }
715  auto &waiter_info = m_waiter_info[uid];
716  std::unique_lock<std::mutex> lock(waiter_info.m_mutex);
717  if (waiter_info.m_waiting) {
718  waiter_info.NotifyOne(std::move(lock));
719  return;
720  }
721  }
722 }
723 
724 /*
725  * Create an IO timer object; increment the number of outstanding IOs.
726  */
728 XrdThrottleManager::StartIOTimer(uint16_t uid, bool &ok)
729 {
730  int cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
731  m_io_total++;
732 
733  while (m_concurrency_limit >= 0 && cur_counter >= m_concurrency_limit)
734  {
735  // If the user has essentially no concurrency, then we let them
736  // temporarily exceed the limit. This prevents potential waits for
737  // every single read for an infrequent user.
738  if (m_waiter_info[uid].m_concurrency < 1)
739  {
740  break;
741  }
742  m_loadshed_limit_hit++;
743  m_io_active.fetch_sub(1, std::memory_order_acq_rel);
744  TRACE(DEBUG, "ThrottleManager (user=" << uid << "): IO concurrency limit hit; waiting for other IOs to finish.");
745  ok = m_waiter_info[uid].Wait();
746  if (!ok) {
747  TRACE(DEBUG, "ThrottleManager (user=" << uid << "): timed out waiting for other IOs to finish.");
748  return XrdThrottleTimer();
749  }
750  cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
751  }
752 
753  ok = true;
754  return XrdThrottleTimer(this, uid);
755 }
756 
757 /*
758  * Finish recording an IO timer.
759  */
760 void
761 XrdThrottleManager::StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid)
762 {
763  m_io_active_time += event_duration.count();
764  auto old_active = m_io_active.fetch_sub(1, std::memory_order_acq_rel);
765  m_waiter_info[uid].m_io_time += event_duration.count();
766  if (old_active == static_cast<unsigned>(m_concurrency_limit))
767  {
768  // If we are below the concurrency limit threshold and have another waiter
769  // for our user, then execute it immediately. Otherwise, we will give
770  // someone else a chance to run (as we have gotten more than our share recently).
771  unsigned waiting_users = m_waiting_users;
772  if (waiting_users == 0) waiting_users = 1;
773  if (m_waiter_info[uid].m_concurrency < m_concurrency_limit / waiting_users)
774  {
775  std::unique_lock<std::mutex> lock(m_waiter_info[uid].m_mutex);
776  if (m_waiter_info[uid].m_waiting > 0)
777  {
778  m_waiter_info[uid].NotifyOne(std::move(lock));
779  return;
780  }
781  }
782  NotifyOne();
783  }
784 }
785 
786 /*
787  * Check the counters to see if we have hit any throttle limits in the
788  * current time period. If so, shed the client randomly.
789  *
790  * If the client has already been load-shedded once and reconnected to this
791  * server, then do not load-shed it again.
792  */
793 bool
794 XrdThrottleManager::CheckLoadShed(const std::string &opaque)
795 {
796  if (m_loadshed_port == 0)
797  {
798  return false;
799  }
800  if (m_loadshed_limit_hit == 0)
801  {
802  return false;
803  }
804  if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
805  {
806  return false;
807  }
808  if (opaque.empty())
809  {
810  return false;
811  }
812  return true;
813 }
814 
815 void
816 XrdThrottleManager::PrepLoadShed(const char * opaque, std::string &lsOpaque)
817 {
818  if (m_loadshed_port == 0)
819  {
820  return;
821  }
822  if (opaque && opaque[0])
823  {
824  XrdOucEnv env(opaque);
825  // Do not load shed client if it has already been done once.
826  if (env.Get("throttle.shed") != 0)
827  {
828  return;
829  }
830  lsOpaque = opaque;
831  lsOpaque += "&throttle.shed=1";
832  }
833  else
834  {
835  lsOpaque = "throttle.shed=1";
836  }
837 }
838 
839 void
840 XrdThrottleManager::PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
841 {
842  host = m_loadshed_host;
843  host += "?";
844  host += opaque;
845  port = m_loadshed_port;
846 }
847 
848 bool
849 XrdThrottleManager::Waiter::Wait()
850 {
851  auto timeout = std::chrono::steady_clock::now() + m_manager->m_max_wait_time;
852  {
853  std::unique_lock<std::mutex> lock(m_mutex);
854  m_waiting++;
855  m_cv.wait_until(lock, timeout,
856  [&] { return m_manager->m_io_active.load(std::memory_order_acquire) < static_cast<unsigned>(m_manager->m_concurrency_limit) || std::chrono::steady_clock::now() >= timeout; });
857  m_waiting--;
858  }
859  if (std::chrono::steady_clock::now() > timeout) {
860  return false;
861  }
862  return true;
863 }
864 
865 /*
866  * Load per-user limits from an INI-style configuration file.
867  * Format:
868  * [default]
869  * name = *
870  * maxconn = 200
871  *
872  * [user1]
873  * name = user1
874  * maxconn = 25
875  *
876  * [wildcarduser]
877  * name = wildcarduser*
878  * maxconn = 10
879  */
880 int
881 XrdThrottleManager::LoadUserLimits(const std::string &config_file)
882 {
883  INIReader reader(config_file);
884  if (reader.ParseError() < 0)
885  {
886  m_log->Emsg("ThrottleManager", errno, "Unable to open per-user configuration file", config_file.c_str());
887  return 1;
888  }
889  else if (reader.ParseError() > 0)
890  {
891  std::stringstream ss;
892  ss << "Parse error on line " << reader.ParseError() << " of file " << config_file;
893  m_log->Emsg("ThrottleManager", ss.str().c_str());
894  return 1;
895  }
896 
897  std::unordered_map<std::string, UserLimit> new_limits;
898 
899  // Process all sections
900  for (const auto &section : reader.Sections())
901  {
902  // Get the name parameter (required for all sections)
903  std::string name = reader.Get(section, "name", "");
904  if (name.empty())
905  {
906  m_log->Say("ThrottleManager", "Section", section.c_str(), "missing 'name' parameter; skipping");
907  continue;
908  }
909 
910  long max_conn = reader.GetInteger(section, "maxconn", 0);
911  if (max_conn <= 0)
912  {
913  m_log->Say("ThrottleManager", "Section", section.c_str(), "has invalid or missing 'maxconn' parameter; skipping");
914  continue;
915  }
916 
917  UserLimit limit;
918  limit.max_conn = static_cast<unsigned long>(max_conn);
919  // Check if name contains wildcard (including '*' for default/catch-all)
920  limit.is_wildcard = (name.find('*') != std::string::npos);
921  new_limits[name] = limit;
922  }
923 
924  // Atomically replace the limits map
925  size_t num_entries = new_limits.size();
926  {
927  std::unique_lock<std::shared_mutex> lock(m_user_limits_mutex);
928  m_user_limits = std::move(new_limits);
929  }
930 
931  m_log->Say("ThrottleManager", "Loaded", std::to_string(num_entries).c_str(), "per-user limit entries from", config_file.c_str());
932  return 0;
933 }
934 
935 /*
936  * Reload per-user limits from the configured file.
937  */
938 int
940 {
941  if (m_user_config_file.empty())
942  {
943  m_log->Emsg("ThrottleManager", "No per-user configuration file specified");
944  return 1;
945  }
946  return LoadUserLimits(m_user_config_file);
947 }
948 
949 /*
950  * Get the per-user connection limit for a given username.
951  * Returns 0 if no per-user limit is set (use global), otherwise returns the limit.
952  * Supports wildcard matching (e.g., "user*" matches "user1", "user2", etc.)
953  * Special case: "*" matches all users (default/catch-all)
954  * Priority: exact match > wildcard match (longest prefix) > "*" > global
955  */
956 unsigned long
957 XrdThrottleManager::GetUserMaxConn(const std::string &username)
958 {
959  std::shared_lock lock(m_user_limits_mutex);
960 
961  // First, try exact match
962  auto exact_iter = m_user_limits.find(username);
963  if (exact_iter != m_user_limits.end() && !exact_iter->second.is_wildcard)
964  {
965  return exact_iter->second.max_conn;
966  }
967 
968  // Then, try wildcard matches (prefer longest matching prefix)
969  unsigned long best_match = 0;
970  size_t best_prefix_len = 0;
971  unsigned long catch_all_match = 0;
972 
973  for (const auto &entry : m_user_limits)
974  {
975  if (!entry.second.is_wildcard) continue;
976 
977  const std::string &pattern = entry.first;
978 
979  // Special case: "*" is a catch-all pattern - store it but don't use it yet
980  if (pattern == "*")
981  {
982  catch_all_match = entry.second.max_conn;
983  continue;
984  }
985 
986  size_t wildcard_pos = pattern.find('*');
987  if (wildcard_pos == std::string::npos) continue;
988 
989  // Extract prefix before wildcard
990  std::string prefix = pattern.substr(0, wildcard_pos);
991  if (username.length() >= prefix.length() &&
992  username.substr(0, prefix.length()) == prefix)
993  {
994  // Prefer longer prefix matches
995  if (prefix.length() > best_prefix_len)
996  {
997  best_prefix_len = prefix.length();
998  best_match = entry.second.max_conn;
999  }
1000  }
1001  }
1002 
1003  // If we found a specific wildcard match, use it
1004  if (best_match > 0) return best_match;
1005 
1006  // If no specific wildcard match, use catch-all if available
1007  // return 0 if not
1008  return catch_all_match;
1009 
1010 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define AtomicFSub(w, x, y)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
char * Get(const char *varname)
Definition: XrdOucEnv.hh:69
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s)
Definition: XrdSecEntity.hh:71
XrdSecEntityAttr * eaAPI
non-const API to attributes
Definition: XrdSecEntity.hh:92
char * name
Entity's name.
Definition: XrdSecEntity.hh:69
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:116
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:162
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static unsigned long Num(void)
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
T exchange(T v, std::memory_order mo=std::memory_order_relaxed) noexcept
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
friend class XrdThrottleTimer
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
int LoadUserLimits(const std::string &config_file)
void SetMaxWait(unsigned long max_wait)
unsigned long GetUserMaxConn(const std::string &username)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
long long GetLoadshedPort() const
long long GetThrottleDataRate() const
long long GetThrottleConcurrency() const
const std::string & GetLoadshedHost() const
long long GetLoadshedFreq() const
long long GetThrottleIOPSRate() const
long long GetThrottleRecomputeIntervalMS() const
const std::string & GetUserConfigFile() const
bool Insert(const char *data, int dlen)