XRootD
XrdThrottleManager Class Reference

#include <XrdThrottleManager.hh>

+ Collaboration diagram for XrdThrottleManager:

Public Member Functions

 XrdThrottleManager (XrdSysError *lP, XrdOucTrace *tP)
 
 ~XrdThrottleManager ()
 
void Apply (int reqsize, int reqops, int uid)
 
bool CheckLoadShed (const std::string &opaque)
 
bool CloseFile (const std::string &entity)
 
void FromConfig (XrdThrottle::Configuration &config)
 
std::tuple< std::string, uint16_t > GetUserInfo (const XrdSecEntity *client)
 
unsigned long GetUserMaxConn (const std::string &username)
 
void Init ()
 
bool IsThrottling ()
 
int LoadUserLimits (const std::string &config_file)
 
bool OpenFile (const std::string &entity, std::string &open_error_message)
 
void PerformLoadShed (const std::string &opaque, std::string &host, unsigned &port)
 
void PrepLoadShed (const char *opaque, std::string &lsOpaque)
 
int ReloadUserLimits ()
 
void SetLoadShed (std::string &hostname, unsigned port, unsigned frequency)
 
void SetMaxConns (unsigned long max_conns)
 
void SetMaxOpen (unsigned long max_open)
 
void SetMaxWait (unsigned long max_wait)
 
void SetMonitor (XrdXrootdGStream *gstream)
 
void SetThrottles (float reqbyterate, float reqoprate, int concurrency, float interval_length)
 
XrdThrottleTimer StartIOTimer (uint16_t uid, bool &ok)
 

Protected Member Functions

void StopIOTimer (std::chrono::steady_clock::duration &event_duration, uint16_t uid)
 

Friends

class XrdThrottleTimer
 

Detailed Description

Definition at line 54 of file XrdThrottleManager.hh.

Constructor & Destructor Documentation

◆ XrdThrottleManager()

XrdThrottleManager::XrdThrottleManager ( XrdSysError lP,
XrdOucTrace tP 
)

Definition at line 46 of file XrdThrottleManager.cc.

46  :
47  m_trace(tP),
48  m_log(lP),
49  m_interval_length_seconds(1.0),
50  m_bytes_per_second(-1),
51  m_ops_per_second(-1),
52  m_concurrency_limit(-1),
53  m_last_round_allocation(100*1024),
54  m_loadshed_host(""),
55  m_loadshed_port(0),
56  m_loadshed_frequency(0)
57 {
58 }

◆ ~XrdThrottleManager()

XrdThrottleManager::~XrdThrottleManager ( )
inline

Definition at line 119 of file XrdThrottleManager.hh.

119 {} // The buffmanager is never deleted

Member Function Documentation

◆ Apply()

void XrdThrottleManager::Apply ( int  reqsize,
int  reqops,
int  uid 
)

Definition at line 342 of file XrdThrottleManager.cc.

343 {
344  if (m_bytes_per_second < 0)
345  reqsize = 0;
346  if (m_ops_per_second < 0)
347  reqops = 0;
348  while (reqsize || reqops)
349  {
350  // Subtract the requested out of the shares
351  AtomicBeg(m_compute_var);
352  GetShares(m_primary_bytes_shares[uid], reqsize);
353  if (reqsize)
354  {
355  TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
356  GetShares(m_secondary_bytes_shares[uid], reqsize);
357  TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
358  }
359  else
360  {
361  TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
362  }
363  GetShares(m_primary_ops_shares[uid], reqops);
364  if (reqops)
365  {
366  GetShares(m_secondary_ops_shares[uid], reqops);
367  }
368  StealShares(uid, reqsize, reqops);
369  AtomicEnd(m_compute_var);
370 
371  if (reqsize || reqops)
372  {
373  if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
374  if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
375  m_compute_var.Wait();
376  m_loadshed_limit_hit++;
377  }
378  }
379 
380 }
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
#define TRACE(act, x)
Definition: XrdTrace.hh:63

References AtomicBeg, AtomicEnd, TRACE, and XrdSysCondVar::Wait().

+ Here is the call graph for this function:

◆ CheckLoadShed()

bool XrdThrottleManager::CheckLoadShed ( const std::string &  opaque)

Definition at line 794 of file XrdThrottleManager.cc.

795 {
796  if (m_loadshed_port == 0)
797  {
798  return false;
799  }
800  if (m_loadshed_limit_hit == 0)
801  {
802  return false;
803  }
804  if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
805  {
806  return false;
807  }
808  if (opaque.empty())
809  {
810  return false;
811  }
812  return true;
813 }

◆ CloseFile()

bool XrdThrottleManager::CloseFile ( const std::string &  entity)

Definition at line 276 of file XrdThrottleManager.cc.

277 {
278  if (m_max_open == 0 && m_max_conns == 0) return true;
279 
280  bool result = true;
281  const std::lock_guard<std::mutex> lock(m_file_mutex);
282  if (m_max_open) {
283  auto iter = m_file_counters.find(entity);
284  if (iter == m_file_counters.end()) {
285  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
286  result = false;
287  } else if (iter->second == 0) {
288  TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
289  result = false;
290  } else {
291  iter->second--;
292  }
293  if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
294  " remain open");
295  }
296 
297  if (m_max_conns) {
298  auto pid = XrdSysThread::Num();
299  auto conn_iter = m_active_conns.find(entity);
300  auto conn_count_iter = m_conn_counters.find(entity);
301  if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
302  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
303  " tracking");
304  return false;
305  }
306  auto pid_iter = conn_iter->second->find(pid);
307  if (pid_iter == conn_iter->second->end()) {
308  TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
309  " tracking");
310  return false;
311  }
312  if (pid_iter->second == 0) {
313  TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
314  " plugin thinks was idle");
315  } else {
316  pid_iter->second--;
317  }
318  if (conn_count_iter == m_conn_counters.end()) {
319  TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
320  " observed an open file");
321  } else if (pid_iter->second == 0) {
322  if (conn_count_iter->second == 0) {
323  TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
324  " throttle plugin already thought all connections were idle");
325  } else {
326  conn_count_iter->second--;
327  TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
328  << conn_count_iter->second << " active connections remain");
329  }
330  }
331  }
332 
333  return result;
334 }
static unsigned long Num(void)

References XrdSysThread::Num(), and TRACE.

+ Here is the call graph for this function:

◆ FromConfig()

void XrdThrottleManager::FromConfig ( XrdThrottle::Configuration config)

Definition at line 61 of file XrdThrottleManager.cc.

62 {
63 
64  auto max_open = config.GetMaxOpen();
65  if (max_open != -1) SetMaxOpen(max_open);
66  auto max_conn = config.GetMaxConn();
67  if (max_conn != -1) SetMaxConns(max_conn);
68  auto max_wait = config.GetMaxWait();
69  if (max_wait != -1) SetMaxWait(max_wait);
70 
72  config.GetThrottleIOPSRate(),
73  config.GetThrottleConcurrency(),
74  static_cast<float>(config.GetThrottleRecomputeIntervalMS())/1000.0);
75 
76  m_trace->What = config.GetTraceLevels();
77 
78  auto loadshed_host = config.GetLoadshedHost();
79  auto loadshed_port = config.GetLoadshedPort();
80  auto loadshed_freq = config.GetLoadshedFreq();
81  if (!loadshed_host.empty() && loadshed_port > 0 && loadshed_freq > 0)
82  {
83  // Loadshed specified, so set it.
84  SetLoadShed(loadshed_host, loadshed_port, loadshed_freq);
85  }
86 
87  // Load per-user configuration if specified
88  auto user_config_file = config.GetUserConfigFile();
89  if (!user_config_file.empty())
90  {
91  m_user_config_file = user_config_file;
92  if (LoadUserLimits(user_config_file) != 0)
93  {
94  m_log->Emsg("ThrottleManager", "Failed to load per-user configuration file", user_config_file.c_str());
95  }
96  }
97 }
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:116
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
int LoadUserLimits(const std::string &config_file)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
long long GetLoadshedPort() const
long long GetThrottleDataRate() const
long long GetThrottleConcurrency() const
const std::string & GetLoadshedHost() const
long long GetLoadshedFreq() const
long long GetThrottleIOPSRate() const
long long GetThrottleRecomputeIntervalMS() const
const std::string & GetUserConfigFile() const

References XrdSysError::Emsg(), XrdThrottle::Configuration::GetLoadshedFreq(), XrdThrottle::Configuration::GetLoadshedHost(), XrdThrottle::Configuration::GetLoadshedPort(), XrdThrottle::Configuration::GetMaxConn(), XrdThrottle::Configuration::GetMaxOpen(), XrdThrottle::Configuration::GetMaxWait(), XrdThrottle::Configuration::GetThrottleConcurrency(), XrdThrottle::Configuration::GetThrottleDataRate(), XrdThrottle::Configuration::GetThrottleIOPSRate(), XrdThrottle::Configuration::GetThrottleRecomputeIntervalMS(), XrdThrottle::Configuration::GetTraceLevels(), XrdThrottle::Configuration::GetUserConfigFile(), LoadUserLimits(), SetLoadShed(), SetMaxConns(), SetMaxOpen(), SetMaxWait(), SetThrottles(), and XrdOucTrace::What.

+ Here is the call graph for this function:

◆ GetUserInfo()

std::tuple< std::string, uint16_t > XrdThrottleManager::GetUserInfo ( const XrdSecEntity client)

Definition at line 129 of file XrdThrottleManager.cc.

129  {
130  // client can be null, if so, return nobody
131  if (!client) {
132  return std::make_tuple("nobody", GetUid("nobody"));
133  }
134 
135  // Try various potential "names" associated with the request, from the most
136  // specific to most generic.
137  std::string user;
138 
139  if (client->eaAPI && client->eaAPI->Get("token.subject", user)) {
140  if (client->vorg) user = std::string(client->vorg) + ":" + user;
141  } else if (client->eaAPI) {
142  std::string request_name;
143  if (client->eaAPI->Get("request.name", request_name) && !request_name.empty()) user = request_name;
144  }
145  if (user.empty()) {user = client->name ? client->name : "nobody";}
146  uint16_t uid = GetUid(user.c_str());
147  return std::make_tuple(user, uid);
148 }
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s)
Definition: XrdSecEntity.hh:71
XrdSecEntityAttr * eaAPI
non-const API to attributes
Definition: XrdSecEntity.hh:92
char * name
Entity's name.
Definition: XrdSecEntity.hh:69

References XrdSecEntity::eaAPI, XrdSecEntityAttr::Get(), XrdSecEntity::name, and XrdSecEntity::vorg.

+ Here is the call graph for this function:

◆ GetUserMaxConn()

unsigned long XrdThrottleManager::GetUserMaxConn ( const std::string &  username)

Definition at line 957 of file XrdThrottleManager.cc.

958 {
959  std::shared_lock lock(m_user_limits_mutex);
960 
961  // First, try exact match
962  auto exact_iter = m_user_limits.find(username);
963  if (exact_iter != m_user_limits.end() && !exact_iter->second.is_wildcard)
964  {
965  return exact_iter->second.max_conn;
966  }
967 
968  // Then, try wildcard matches (prefer longest matching prefix)
969  unsigned long best_match = 0;
970  size_t best_prefix_len = 0;
971  unsigned long catch_all_match = 0;
972 
973  for (const auto &entry : m_user_limits)
974  {
975  if (!entry.second.is_wildcard) continue;
976 
977  const std::string &pattern = entry.first;
978 
979  // Special case: "*" is a catch-all pattern - store it but don't use it yet
980  if (pattern == "*")
981  {
982  catch_all_match = entry.second.max_conn;
983  continue;
984  }
985 
986  size_t wildcard_pos = pattern.find('*');
987  if (wildcard_pos == std::string::npos) continue;
988 
989  // Extract prefix before wildcard
990  std::string prefix = pattern.substr(0, wildcard_pos);
991  if (username.length() >= prefix.length() &&
992  username.substr(0, prefix.length()) == prefix)
993  {
994  // Prefer longer prefix matches
995  if (prefix.length() > best_prefix_len)
996  {
997  best_prefix_len = prefix.length();
998  best_match = entry.second.max_conn;
999  }
1000  }
1001  }
1002 
1003  // If we found a specific wildcard match, use it
1004  if (best_match > 0) return best_match;
1005 
1006  // If no specific wildcard match, use catch-all if available
1007  // return 0 if not
1008  return catch_all_match;
1009 
1010 }

Referenced by OpenFile().

+ Here is the caller graph for this function:

◆ Init()

void XrdThrottleManager::Init ( )

Definition at line 100 of file XrdThrottleManager.cc.

101 {
102  TRACE(DEBUG, "Initializing the throttle manager.");
103  // Initialize all our shares to zero.
104  m_primary_bytes_shares.resize(m_max_users);
105  m_secondary_bytes_shares.resize(m_max_users);
106  m_primary_ops_shares.resize(m_max_users);
107  m_secondary_ops_shares.resize(m_max_users);
108  for (auto & waiter : m_waiter_info) {
109  waiter.m_manager = this;
110  }
111 
112  // Allocate each user 100KB and 10 ops to bootstrap;
113  for (int i=0; i<m_max_users; i++)
114  {
115  m_primary_bytes_shares[i] = m_last_round_allocation;
116  m_secondary_bytes_shares[i] = 0;
117  m_primary_ops_shares[i] = 10;
118  m_secondary_ops_shares[i] = 0;
119  }
120 
121  int rc;
122  pthread_t tid;
123  if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
124  m_log->Emsg("ThrottleManager", rc, "create throttle thread");
125 
126 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)

References DEBUG, XrdSysError::Emsg(), XrdSysThread::Run(), and TRACE.

+ Here is the call graph for this function:

◆ IsThrottling()

bool XrdThrottleManager::IsThrottling ( )
inline

Definition at line 70 of file XrdThrottleManager.hh.

70 {return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}

◆ LoadUserLimits()

int XrdThrottleManager::LoadUserLimits ( const std::string &  config_file)

Definition at line 881 of file XrdThrottleManager.cc.

882 {
883  INIReader reader(config_file);
884  if (reader.ParseError() < 0)
885  {
886  m_log->Emsg("ThrottleManager", errno, "Unable to open per-user configuration file", config_file.c_str());
887  return 1;
888  }
889  else if (reader.ParseError() > 0)
890  {
891  std::stringstream ss;
892  ss << "Parse error on line " << reader.ParseError() << " of file " << config_file;
893  m_log->Emsg("ThrottleManager", ss.str().c_str());
894  return 1;
895  }
896 
897  std::unordered_map<std::string, UserLimit> new_limits;
898 
899  // Process all sections
900  for (const auto &section : reader.Sections())
901  {
902  // Get the name parameter (required for all sections)
903  std::string name = reader.Get(section, "name", "");
904  if (name.empty())
905  {
906  m_log->Say("ThrottleManager", "Section", section.c_str(), "missing 'name' parameter; skipping");
907  continue;
908  }
909 
910  long max_conn = reader.GetInteger(section, "maxconn", 0);
911  if (max_conn <= 0)
912  {
913  m_log->Say("ThrottleManager", "Section", section.c_str(), "has invalid or missing 'maxconn' parameter; skipping");
914  continue;
915  }
916 
917  UserLimit limit;
918  limit.max_conn = static_cast<unsigned long>(max_conn);
919  // Check if name contains wildcard (including '*' for default/catch-all)
920  limit.is_wildcard = (name.find('*') != std::string::npos);
921  new_limits[name] = limit;
922  }
923 
924  // Atomically replace the limits map
925  size_t num_entries = new_limits.size();
926  {
927  std::unique_lock<std::shared_mutex> lock(m_user_limits_mutex);
928  m_user_limits = std::move(new_limits);
929  }
930 
931  m_log->Say("ThrottleManager", "Loaded", std::to_string(num_entries).c_str(), "per-user limit entries from", config_file.c_str());
932  return 0;
933 }
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:162

References XrdSysError::Emsg(), and XrdSysError::Say().

Referenced by FromConfig(), and ReloadUserLimits().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OpenFile()

bool XrdThrottleManager::OpenFile ( const std::string &  entity,
std::string &  open_error_message 
)

Definition at line 192 of file XrdThrottleManager.cc.

193 {
194  // Get per-user connection limit (0 means use global)
195  unsigned long user_max_conn = GetUserMaxConn(entity);
196  unsigned long effective_max_conn = user_max_conn < m_max_conns ? user_max_conn : m_max_conns;
197 
198  if (m_max_open == 0 && effective_max_conn == 0) return true;
199 
200  const std::lock_guard<std::mutex> lock(m_file_mutex);
201  auto iter = m_file_counters.find(entity);
202  unsigned long cur_open_files = 0, cur_open_conns;
203  if (m_max_open) {
204  if (iter == m_file_counters.end()) {
205  m_file_counters[entity] = 1;
206  TRACE(FILES, "User " << entity << " has opened their first file");
207  cur_open_files = 1;
208  } else if (iter->second < m_max_open) {
209  iter->second++;
210  cur_open_files = iter->second;
211  } else {
212  std::stringstream ss;
213  ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
214  TRACE(FILES, ss.str());
215  error_message = ss.str();
216  return false;
217  }
218  }
219 
220  if (effective_max_conn > 0) {
221  auto pid = XrdSysThread::Num();
222  auto conn_iter = m_active_conns.find(entity);
223  auto conn_count_iter = m_conn_counters.find(entity);
224  if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == effective_max_conn) &&
225  (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
226  {
227  // note: we are rolling back the increment in open files
228  if (m_max_open) iter->second--;
229  std::stringstream ss;
230  ss << "User " << entity << " has hit the limit of " << effective_max_conn <<
231  " open connections";
232  if (user_max_conn > 0) {
233  ss << " (per-user limit)";
234  }
235  TRACE(CONNS, ss.str());
236  error_message = ss.str();
237  return false;
238  }
239  if (conn_iter == m_active_conns.end()) {
240  std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
241  new std::unordered_map<pid_t, unsigned long>());
242  (*conn_map)[pid] = 1;
243  m_active_conns[entity] = std::move(conn_map);
244  if (conn_count_iter == m_conn_counters.end()) {
245  m_conn_counters[entity] = 1;
246  cur_open_conns = 1;
247  } else {
248  m_conn_counters[entity] ++;
249  cur_open_conns = m_conn_counters[entity];
250  }
251  } else {
252  auto pid_iter = conn_iter->second->find(pid);
253  if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
254  (*(conn_iter->second))[pid] = 1;
255  conn_count_iter->second++;
256  cur_open_conns = conn_count_iter->second;
257  } else {
258  (*(conn_iter->second))[pid] ++;
259  cur_open_conns = conn_count_iter->second;
260  }
261  }
262  TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections (limit: " << effective_max_conn << ")");
263  }
264  if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
265  return true;
266 }
unsigned long GetUserMaxConn(const std::string &username)

References GetUserMaxConn(), XrdSysThread::Num(), and TRACE.

+ Here is the call graph for this function:

◆ PerformLoadShed()

void XrdThrottleManager::PerformLoadShed ( const std::string &  opaque,
std::string &  host,
unsigned &  port 
)

Definition at line 840 of file XrdThrottleManager.cc.

841 {
842  host = m_loadshed_host;
843  host += "?";
844  host += opaque;
845  port = m_loadshed_port;
846 }

◆ PrepLoadShed()

void XrdThrottleManager::PrepLoadShed ( const char *  opaque,
std::string &  lsOpaque 
)

Definition at line 816 of file XrdThrottleManager.cc.

817 {
818  if (m_loadshed_port == 0)
819  {
820  return;
821  }
822  if (opaque && opaque[0])
823  {
824  XrdOucEnv env(opaque);
825  // Do not load shed client if it has already been done once.
826  if (env.Get("throttle.shed") != 0)
827  {
828  return;
829  }
830  lsOpaque = opaque;
831  lsOpaque += "&throttle.shed=1";
832  }
833  else
834  {
835  lsOpaque = "throttle.shed=1";
836  }
837 }

References XrdOucEnv::Get().

+ Here is the call graph for this function:

◆ ReloadUserLimits()

int XrdThrottleManager::ReloadUserLimits ( )

Definition at line 939 of file XrdThrottleManager.cc.

940 {
941  if (m_user_config_file.empty())
942  {
943  m_log->Emsg("ThrottleManager", "No per-user configuration file specified");
944  return 1;
945  }
946  return LoadUserLimits(m_user_config_file);
947 }

References XrdSysError::Emsg(), and LoadUserLimits().

+ Here is the call graph for this function:

◆ SetLoadShed()

void XrdThrottleManager::SetLoadShed ( std::string &  hostname,
unsigned  port,
unsigned  frequency 
)
inline

Definition at line 81 of file XrdThrottleManager.hh.

82  {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMaxConns()

void XrdThrottleManager::SetMaxConns ( unsigned long  max_conns)
inline

Definition at line 86 of file XrdThrottleManager.hh.

86 {m_max_conns = max_conns;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMaxOpen()

void XrdThrottleManager::SetMaxOpen ( unsigned long  max_open)
inline

Definition at line 84 of file XrdThrottleManager.hh.

84 {m_max_open = max_open;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMaxWait()

void XrdThrottleManager::SetMaxWait ( unsigned long  max_wait)
inline

Definition at line 88 of file XrdThrottleManager.hh.

88 {m_max_wait_time = std::chrono::seconds(max_wait);}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMonitor()

void XrdThrottleManager::SetMonitor ( XrdXrootdGStream gstream)
inline

Definition at line 101 of file XrdThrottleManager.hh.

101 {m_gstream = gstream;}

◆ SetThrottles()

void XrdThrottleManager::SetThrottles ( float  reqbyterate,
float  reqoprate,
int  concurrency,
float  interval_length 
)
inline

Definition at line 77 of file XrdThrottleManager.hh.

78  {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
79  m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ StartIOTimer()

XrdThrottleTimer XrdThrottleManager::StartIOTimer ( uint16_t  uid,
bool &  ok 
)

Definition at line 728 of file XrdThrottleManager.cc.

729 {
730  int cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
731  m_io_total++;
732 
733  while (m_concurrency_limit >= 0 && cur_counter >= m_concurrency_limit)
734  {
735  // If the user has essentially no concurrency, then we let them
736  // temporarily exceed the limit. This prevents potential waits for
737  // every single read for an infrequent user.
738  if (m_waiter_info[uid].m_concurrency < 1)
739  {
740  break;
741  }
742  m_loadshed_limit_hit++;
743  m_io_active.fetch_sub(1, std::memory_order_acq_rel);
744  TRACE(DEBUG, "ThrottleManager (user=" << uid << "): IO concurrency limit hit; waiting for other IOs to finish.");
745  ok = m_waiter_info[uid].Wait();
746  if (!ok) {
747  TRACE(DEBUG, "ThrottleManager (user=" << uid << "): timed out waiting for other IOs to finish.");
748  return XrdThrottleTimer();
749  }
750  cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
751  }
752 
753  ok = true;
754  return XrdThrottleTimer(this, uid);
755 }
friend class XrdThrottleTimer

References DEBUG, TRACE, and XrdThrottleTimer.

◆ StopIOTimer()

void XrdThrottleManager::StopIOTimer ( std::chrono::steady_clock::duration &  event_duration,
uint16_t  uid 
)
protected

Definition at line 761 of file XrdThrottleManager.cc.

762 {
763  m_io_active_time += event_duration.count();
764  auto old_active = m_io_active.fetch_sub(1, std::memory_order_acq_rel);
765  m_waiter_info[uid].m_io_time += event_duration.count();
766  if (old_active == static_cast<unsigned>(m_concurrency_limit))
767  {
768  // If we are below the concurrency limit threshold and have another waiter
769  // for our user, then execute it immediately. Otherwise, we will give
770  // someone else a chance to run (as we have gotten more than our share recently).
771  unsigned waiting_users = m_waiting_users;
772  if (waiting_users == 0) waiting_users = 1;
773  if (m_waiter_info[uid].m_concurrency < m_concurrency_limit / waiting_users)
774  {
775  std::unique_lock<std::mutex> lock(m_waiter_info[uid].m_mutex);
776  if (m_waiter_info[uid].m_waiting > 0)
777  {
778  m_waiter_info[uid].NotifyOne(std::move(lock));
779  return;
780  }
781  }
782  NotifyOne();
783  }
784 }

Friends And Related Function Documentation

◆ XrdThrottleTimer

friend class XrdThrottleTimer
friend

Definition at line 57 of file XrdThrottleManager.hh.

Referenced by StartIOTimer().


The documentation for this class was generated from the following files: