XRootD
XrdThrottleFile.cc
Go to the documentation of this file.
1 
2 #include "XrdSfs/XrdSfsAio.hh"
3 #include "XrdSec/XrdSecEntity.hh"
5 
6 #include "XrdThrottle.hh"
7 
8 using namespace XrdThrottle;
9 
10 #define DO_LOADSHED if (m_throttle.CheckLoadShed(m_loadshed)) \
11 { \
12  unsigned port; \
13  std::string host; \
14  m_throttle.PerformLoadShed(m_loadshed, host, port); \
15  m_eroute.Emsg("File", "Performing load-shed for client", m_connection_id.c_str()); \
16  error.setErrInfo(port, host.c_str()); \
17  return SFS_REDIRECT; \
18 }
19 
20 #define DO_THROTTLE(amount) \
21 DO_LOADSHED \
22 m_throttle.Apply(amount, 1, m_uid); \
23 bool ok; \
24 auto xtimer = m_throttle.StartIOTimer(m_uid, ok); \
25 if (!ok) { \
26  error.setErrInfo(EMFILE, "I/O limit exceeded and wait time hit"); \
27  return SFS_ERROR; \
28 }
29 
30 
31 File::File(const char *user,
32  unique_sfs_ptr sfs,
33  XrdThrottleManager &throttle,
34  XrdSysError &eroute)
35  : XrdSfsFile(sfs->error), // Use underlying error object as ours
36 #if __cplusplus >= 201103L
37  m_sfs(std::move(sfs)), // Guaranteed to be non-null by FileSystem::newFile
38 #else
39  m_sfs(sfs),
40 #endif
41  m_uid(0),
42  m_connection_id(user ? user : ""),
43  m_throttle(throttle),
44  m_eroute(eroute)
45 {}
46 
47 File::~File()
48 {
49  if (m_is_open) {
50  m_throttle.CloseFile(m_user);
51  }
52 }
53 
54 int
55 File::open(const char *fileName,
56  XrdSfsFileOpenMode openMode,
57  mode_t createMode,
58  const XrdSecEntity *client,
59  const char *opaque)
60 {
61  std::tie(m_user, m_uid) = m_throttle.GetUserInfo(client);
62  m_throttle.PrepLoadShed(opaque, m_loadshed);
63  std::string open_error_message;
64  if (!m_throttle.OpenFile(m_user, open_error_message)) {
65  error.setErrInfo(EMFILE, open_error_message.c_str());
66  return SFS_ERROR;
67  }
68  auto retval = m_sfs->open(fileName, openMode, createMode, client, opaque);
69  if (retval != SFS_ERROR) {
70  m_is_open = true;
71  } else {
72  m_throttle.CloseFile(m_user);
73  }
74  return retval;
75 }
76 
77 int
79 {
80  m_is_open = false;
81  m_throttle.CloseFile(m_user);
82  return m_sfs->close();
83 }
84 
85 int File::checkpoint(cpAct act, struct iov *range, int n)
86 { return m_sfs->checkpoint(act, range, n);}
87 
88 int
89 File::fctl(const int cmd,
90  const char *args,
91  XrdOucErrInfo &out_error)
92 {
93  // Disable sendfile
94  if (cmd == SFS_FCTL_GETFD)
95  {
96  error.setErrInfo(ENOTSUP, "Sendfile not supported by throttle plugin.");
97  return SFS_ERROR;
98  }
99  else return m_sfs->fctl(cmd, args, out_error);
100 }
101 
102 const char *
104 {
105  return m_sfs->FName();
106 }
107 
108 int
109 File::getMmap(void **Addr, off_t &Size)
110 { // We cannot monitor mmap-based reads, so we disable them.
111  error.setErrInfo(ENOTSUP, "Mmap not supported by throttle plugin.");
112  return SFS_ERROR;
113 }
114 
117  char *buffer,
118  XrdSfsXferSize rdlen,
119  uint32_t *csvec,
120  uint64_t opts)
121 {
122  DO_THROTTLE(rdlen)
123  return m_sfs->pgRead(offset, buffer, rdlen, csvec, opts);
124 }
125 
127 File::pgRead(XrdSfsAio *aioparm, uint64_t opts)
128 { // We disable all AIO-based reads.
129  aioparm->Result = this->pgRead((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
130  (char *)aioparm->sfsAio.aio_buf,
131  (XrdSfsXferSize)aioparm->sfsAio.aio_nbytes,
132  aioparm->cksVec, opts);
133  aioparm->doneRead();
134  return SFS_OK;
135 }
136 
139  char *buffer,
140  XrdSfsXferSize rdlen,
141  uint32_t *csvec,
142  uint64_t opts)
143 {
144  DO_THROTTLE(rdlen)
145  return m_sfs->pgWrite(offset, buffer, rdlen, csvec, opts);
146 }
147 
149 File::pgWrite(XrdSfsAio *aioparm, uint64_t opts)
150 { // We disable all AIO-based writes.
151  aioparm->Result = this->pgWrite((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
152  (char *)aioparm->sfsAio.aio_buf,
153  (XrdSfsXferSize)aioparm->sfsAio.aio_nbytes,
154  aioparm->cksVec, opts);
155  aioparm->doneWrite();
156  return SFS_OK;
157 }
158 
159 int
161  XrdSfsXferSize amount)
162 {
163  DO_THROTTLE(amount)
164  return m_sfs->read(fileOffset, amount);
165 }
166 
169  char *buffer,
170  XrdSfsXferSize buffer_size)
171 {
172  DO_THROTTLE(buffer_size);
173  return m_sfs->read(fileOffset, buffer, buffer_size);
174 }
175 
176 int
178 { // We disable all AIO-based reads.
179  aioparm->Result = this->read((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
180  (char *)aioparm->sfsAio.aio_buf,
181  (XrdSfsXferSize)aioparm->sfsAio.aio_nbytes);
182  aioparm->doneRead();
183  return SFS_OK;
184 }
185 
188  const char *buffer,
189  XrdSfsXferSize buffer_size)
190 {
191  DO_THROTTLE(buffer_size);
192  return m_sfs->write(fileOffset, buffer, buffer_size);
193 }
194 
195 int
197 {
198  aioparm->Result = this->write((XrdSfsFileOffset)aioparm->sfsAio.aio_offset,
199  (char *)aioparm->sfsAio.aio_buf,
200  (XrdSfsXferSize)aioparm->sfsAio.aio_nbytes);
201  aioparm->doneWrite();
202  return SFS_OK;
203 }
204 
205 int
207 {
208  return m_sfs->sync();
209 }
210 
211 int
213 {
214  return m_sfs->sync(aiop);
215 }
216 
217 int
218 File::stat(struct stat *buf)
219 {
220  return m_sfs->stat(buf);
221 }
222 
223 int
225 {
226  return m_sfs->truncate(fileOffset);
227 }
228 
229 int
230 File::getCXinfo(char cxtype[4], int &cxrsz)
231 {
232  return m_sfs->getCXinfo(cxtype, cxrsz);
233 }
234 
235 int
237  XrdSfsFileOffset offset,
238  XrdSfsXferSize size)
239 {
240  DO_THROTTLE(size);
241  return m_sfs->SendData(sfDio, offset, size);
242 }
243 
int stat(const char *path, struct stat *buf)
struct myOpts opts
off_t aio_offset
Definition: XrdSfsAio.hh:49
size_t aio_nbytes
Definition: XrdSfsAio.hh:48
void * aio_buf
Definition: XrdSfsAio.hh:47
#define SFS_ERROR
#define SFS_FCTL_GETFD
int XrdSfsFileOpenMode
#define SFS_OK
long long XrdSfsFileOffset
int XrdSfsXferSize
if(Avsz)
#define DO_THROTTLE(amount)
int setErrInfo(int code, const char *emsg)
uint32_t * cksVec
Definition: XrdSfsAio.hh:63
ssize_t Result
Definition: XrdSfsAio.hh:65
virtual void doneRead()=0
struct aiocb sfsAio
Definition: XrdSfsAio.hh:62
virtual void doneWrite()=0
XrdOucErrInfo & error
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
virtual int getCXinfo(char cxtype[4], int &cxrsz) override
virtual const char * FName() override
virtual int sync() override
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client, const char *opaque=0) override
virtual int close() override
virtual XrdSfsXferSize pgWrite(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize rdlen, uint32_t *csvec, uint64_t opts=0) override
virtual XrdSfsXferSize pgRead(XrdSfsFileOffset offset, char *buffer, XrdSfsXferSize rdlen, uint32_t *csvec, uint64_t opts=0) override
virtual int SendData(XrdSfsDio *sfDio, XrdSfsFileOffset offset, XrdSfsXferSize size) override
virtual int checkpoint(cpAct act, struct iov *range=0, int n=0) override
virtual int read(XrdSfsFileOffset fileOffset, XrdSfsXferSize amount) override
virtual int truncate(XrdSfsFileOffset fileOffset) override
virtual int getMmap(void **Addr, off_t &Size) override
virtual XrdSfsXferSize write(XrdSfsFileOffset fileOffset, const char *buffer, XrdSfsXferSize buffer_size) override
virtual int fctl(const int cmd, const char *args, XrdOucErrInfo &eInfo)=0
virtual int stat(struct stat *buf) override
std::auto_ptr< XrdSfsFile > unique_sfs_ptr
Definition: XrdThrottle.hh:23