20 #ifndef __XrdThrottleManager_hh_
21 #define __XrdThrottleManager_hh_
24 #define likely(x) __builtin_expect(!!(x), 1)
25 #define unlikely(x) __builtin_expect(!!(x), 0)
33 #include <condition_variable>
37 #include <shared_mutex>
38 #include <unordered_map>
63 bool OpenFile(
const std::string &entity, std::string &open_error_message);
64 bool CloseFile(
const std::string &entity);
66 void Apply(
int reqsize,
int reqops,
int uid);
70 bool IsThrottling() {
return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}
77 void SetThrottles(
float reqbyterate,
float reqoprate,
int concurrency,
float interval_length)
78 {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
79 m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}
81 void SetLoadShed(std::string &hostname,
unsigned port,
unsigned frequency)
82 {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}
84 void SetMaxOpen(
unsigned long max_open) {m_max_open = max_open;}
86 void SetMaxConns(
unsigned long max_conns) {m_max_conns = max_conns;}
88 void SetMaxWait(
unsigned long max_wait) {m_max_wait_time = std::chrono::seconds(max_wait);}
111 void PrepLoadShed(
const char *opaque, std::string &lsOpaque);
115 void PerformLoadShed(
const std::string &opaque, std::string &host,
unsigned &port);
126 void StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid);
133 uint16_t GetUid(
const std::string &);
137 void RecomputeInternal();
140 void * RecomputeBootstrap(
void *pp);
143 void ComputeWaiterOrder();
150 void UserIOAccounting();
154 void GetShares(
int &shares,
int &request);
156 void StealShares(
int uid,
int &reqsize,
int &reqops);
162 static unsigned GetTimerListHash();
173 float m_interval_length_seconds;
174 float m_bytes_per_second;
175 float m_ops_per_second;
176 int m_concurrency_limit;
180 static constexpr
int m_max_users = 1024;
181 std::vector<int> m_primary_bytes_shares;
182 std::vector<int> m_secondary_bytes_shares;
183 std::vector<int> m_primary_ops_shares;
184 std::vector<int> m_secondary_ops_shares;
185 int m_last_round_allocation;
188 struct alignas(64) Waiter
190 std::condition_variable m_cv;
192 unsigned m_waiting{0};
210 void NotifyOne(std::unique_lock<std::mutex> lock)
215 std::array<Waiter, m_max_users> m_waiter_info;
228 std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_0;
229 std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_1;
231 std::atomic<size_t> m_waiter_offset{0};
232 std::chrono::steady_clock::time_point m_last_waiter_recompute_time;
235 std::atomic<uint32_t> m_io_active;
239 int m_stable_io_active{0};
240 uint64_t m_stable_io_total{0};
242 std::chrono::steady_clock::duration m_stable_io_wait;
245 std::string m_loadshed_host;
246 unsigned m_loadshed_port;
247 unsigned m_loadshed_frequency;
255 unsigned long m_max_open{0};
256 unsigned long m_max_conns{0};
257 std::unordered_map<std::string, unsigned long> m_file_counters;
258 std::unordered_map<std::string, unsigned long> m_conn_counters;
259 std::unordered_map<std::string, std::unique_ptr<std::unordered_map<pid_t, unsigned long>>> m_active_conns;
260 std::mutex m_file_mutex;
264 unsigned long max_conn{0};
265 bool is_wildcard{
false};
267 std::unordered_map<std::string, UserLimit> m_user_limits;
268 std::shared_mutex m_user_limits_mutex;
269 std::string m_user_config_file;
280 #if defined(__linux__)
281 static constexpr
size_t m_timer_list_size = 32;
283 static constexpr
size_t m_timer_list_size = 1;
285 std::array<TimerList, m_timer_list_size> m_timer_list;
290 std::chrono::steady_clock::duration m_max_wait_time{std::chrono::seconds(30)};
295 static const char *TraceID;
316 m_start_time(std::chrono::steady_clock::time_point::min())
323 m_start_time(std::chrono::steady_clock::now())
328 auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
329 std::lock_guard<std::mutex> lock(timerList.m_mutex);
330 if (timerList.m_first ==
nullptr) {
331 timerList.m_first =
this;
333 m_prev = timerList.m_last;
334 m_prev->m_next =
this;
336 timerList.m_last =
this;
339 std::chrono::steady_clock::duration
Reset() {
340 auto now = std::chrono::steady_clock::now();
341 auto last_start = m_start_time.
exchange(now);
342 return now - last_start;
349 if (!m_manager)
return;
351 auto event_duration =
Reset();
352 auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
354 std::unique_lock<std::mutex> lock(timerList.m_mutex);
356 m_prev->m_next = m_next;
358 m_next->m_prev = m_prev;
360 timerList.m_last = m_prev;
363 timerList.m_first = m_next;
365 m_next->m_prev =
nullptr;
367 timerList.m_last =
nullptr;
374 const uint16_t m_owner{0};
375 const uint16_t m_timer_list_entry{0};
T exchange(T v, std::memory_order mo=std::memory_order_relaxed) noexcept
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMonitor(XrdXrootdGStream *gstream)
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
int LoadUserLimits(const std::string &config_file)
void SetMaxWait(unsigned long max_wait)
unsigned long GetUserMaxConn(const std::string &username)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
XrdThrottleTimer(XrdThrottleManager *manager, int uid)
std::chrono::steady_clock::duration Reset()