20 #ifndef __XrdThrottleManager_hh_
21 #define __XrdThrottleManager_hh_
24 #define likely(x) __builtin_expect(!!(x), 1)
25 #define unlikely(x) __builtin_expect(!!(x), 0)
33 #include <condition_variable>
37 #include <unordered_map>
62 bool OpenFile(
const std::string &entity, std::string &open_error_message);
63 bool CloseFile(
const std::string &entity);
65 void Apply(
int reqsize,
int reqops,
int uid);
69 bool IsThrottling() {
return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}
76 void SetThrottles(
float reqbyterate,
float reqoprate,
int concurrency,
float interval_length)
77 {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
78 m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}
80 void SetLoadShed(std::string &hostname,
unsigned port,
unsigned frequency)
81 {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}
83 void SetMaxOpen(
unsigned long max_open) {m_max_open = max_open;}
85 void SetMaxConns(
unsigned long max_conns) {m_max_conns = max_conns;}
87 void SetMaxWait(
unsigned long max_wait) {m_max_wait_time = std::chrono::seconds(max_wait);}
99 void PrepLoadShed(
const char *opaque, std::string &lsOpaque);
103 void PerformLoadShed(
const std::string &opaque, std::string &host,
unsigned &port);
114 void StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid);
121 uint16_t GetUid(
const std::string &);
125 void RecomputeInternal();
128 void * RecomputeBootstrap(
void *pp);
131 void ComputeWaiterOrder();
138 void UserIOAccounting();
142 void GetShares(
int &shares,
int &request);
144 void StealShares(
int uid,
int &reqsize,
int &reqops);
150 static unsigned GetTimerListHash();
161 float m_interval_length_seconds;
162 float m_bytes_per_second;
163 float m_ops_per_second;
164 int m_concurrency_limit;
168 static constexpr
int m_max_users = 1024;
169 std::vector<int> m_primary_bytes_shares;
170 std::vector<int> m_secondary_bytes_shares;
171 std::vector<int> m_primary_ops_shares;
172 std::vector<int> m_secondary_ops_shares;
173 int m_last_round_allocation;
176 struct alignas(64) Waiter
178 std::condition_variable m_cv;
180 unsigned m_waiting{0};
198 void NotifyOne(std::unique_lock<std::mutex> lock)
203 std::array<Waiter, m_max_users> m_waiter_info;
216 std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_0;
217 std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_1;
219 std::atomic<size_t> m_waiter_offset{0};
220 std::chrono::steady_clock::time_point m_last_waiter_recompute_time;
223 std::atomic<uint32_t> m_io_active;
227 int m_stable_io_active{0};
228 uint64_t m_stable_io_total{0};
230 std::chrono::steady_clock::duration m_stable_io_wait;
233 std::string m_loadshed_host;
234 unsigned m_loadshed_port;
235 unsigned m_loadshed_frequency;
243 unsigned long m_max_open{0};
244 unsigned long m_max_conns{0};
245 std::unordered_map<std::string, unsigned long> m_file_counters;
246 std::unordered_map<std::string, unsigned long> m_conn_counters;
247 std::unordered_map<std::string, std::unique_ptr<std::unordered_map<pid_t, unsigned long>>> m_active_conns;
248 std::mutex m_file_mutex;
259 #if defined(__linux__)
260 static constexpr
size_t m_timer_list_size = 32;
262 static constexpr
size_t m_timer_list_size = 1;
264 std::array<TimerList, m_timer_list_size> m_timer_list;
269 std::chrono::steady_clock::duration m_max_wait_time{std::chrono::seconds(30)};
274 static const char *TraceID;
295 m_start_time(std::chrono::steady_clock::time_point::min())
302 m_start_time(std::chrono::steady_clock::now())
307 auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
308 std::lock_guard<std::mutex> lock(timerList.m_mutex);
309 if (timerList.m_first ==
nullptr) {
310 timerList.m_first =
this;
312 m_prev = timerList.m_last;
313 m_prev->m_next =
this;
315 timerList.m_last =
this;
318 std::chrono::steady_clock::duration
Reset() {
319 auto now = std::chrono::steady_clock::now();
320 auto last_start = m_start_time.
exchange(now);
321 return now - last_start;
328 if (!m_manager)
return;
330 auto event_duration =
Reset();
331 auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
333 std::unique_lock<std::mutex> lock(timerList.m_mutex);
335 m_prev->m_next = m_next;
337 m_next->m_prev = m_prev;
339 timerList.m_last = m_prev;
342 timerList.m_first = m_next;
344 m_next->m_prev =
nullptr;
346 timerList.m_last =
nullptr;
353 const uint16_t m_owner{0};
354 const uint16_t m_timer_list_entry{0};
T exchange(T v, std::memory_order mo=std::memory_order_relaxed) noexcept
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMonitor(XrdXrootdGStream *gstream)
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
XrdThrottleTimer(XrdThrottleManager *manager, int uid)
std::chrono::steady_clock::duration Reset()