XRootD
XrdPfcFile.hh
Go to the documentation of this file.
1 #ifndef __XRDPFC_FILE_HH__
2 #define __XRDPFC_FILE_HH__
3 //----------------------------------------------------------------------------------
4 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5 // Author: Alja Mrak-Tadel, Matevz Tadel
6 //----------------------------------------------------------------------------------
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //----------------------------------------------------------------------------------
20 
21 #include "XrdPfcTypes.hh"
22 #include "XrdPfcInfo.hh"
23 #include "XrdPfcStats.hh"
24 
26 
27 #include "XrdOuc/XrdOucCache.hh"
28 #include "XrdOuc/XrdOucIOVec.hh"
29 
30 #include <functional>
31 #include <map>
32 #include <set>
33 #include <string>
34 
35 class XrdJob;
36 class XrdOucIOVec;
37 
38 namespace XrdCl
39 {
40 class Log;
41 }
42 
43 namespace XrdPfc
44 {
45 class File;
46 class BlockResponseHandler;
47 class DirectResponseHandler;
48 class IO;
49 
50 struct ReadVBlockListRAM;
51 struct ReadVChunkListRAM;
52 struct ReadVBlockListDisk;
53 struct ReadVChunkListDisk;
54 
55 struct ReadReqRH : public XrdOucCacheIOCB
56 {
57  int m_expected_size = 0;
58  int m_n_chunks = 0; // Only set for ReadV().
59  unsigned short m_seq_id;
60  XrdOucCacheIOCB *m_iocb; // External callback passed into IO::Read().
61 
62  ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb) :
63  m_seq_id(sid), m_iocb(iocb)
64  {}
65 };
66 
67 // -------------------------------------------------------------
68 
70 {
71  IO *m_io;
72  ReadReqRH *m_rh; // Internal callback created in IO::Read().
73 
74  long long m_bytes_read = 0;
75  int m_error_cond = 0; // to be set to -errno
76  int m_error_count = 0;
78 
79  int m_n_chunk_reqs = 0;
80  bool m_sync_done = false;
81  bool m_direct_done = true;
82 
83  ReadRequest(IO *io, ReadReqRH *rh) :
84  m_io(io), m_rh(rh)
85  {}
86 
88 
89  bool is_complete() const { return m_n_chunk_reqs == 0 && m_sync_done && m_direct_done; }
90  int return_value() const { return m_error_cond ? m_error_cond : m_bytes_read; }
91 };
92 
93 // -------------------------------------------------------------
94 
96 {
98  char *m_buf; // Where to place the data chunk.
99  long long m_off; // Offset *within* the corresponding block.
100  int m_size; // Size of the data chunk.
101 
102  ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size) :
103  m_read_req(rreq), m_buf(buf), m_off(off), m_size(size)
104  {}
105 };
106 
107 using vChunkRequest_t = std::vector<ChunkRequest>;
108 using vChunkRequest_i = std::vector<ChunkRequest>::iterator;
109 
110 // ================================================================
111 
112 class Block
113 {
114 public:
116  IO *m_io; // IO that handled current request, used for == / != comparisons only
117  void *m_req_id; // Identity of requestor -- used for stats.
118 
119  char *m_buff;
120  long long m_offset;
121  int m_size;
123  int m_refcnt;
124  int m_errno; // stores negative errno
130 
132 
133  Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize,
134  bool m_prefetch, bool cks_net) :
135  m_file(f), m_io(io), m_req_id(rid),
136  m_buff(buf), m_offset(off), m_size(size), m_req_size(rsize),
138  m_req_cksum_net(cks_net), m_n_cksum_errors(0)
139  {}
140 
141  char* get_buff() const { return m_buff; }
142  int get_size() const { return m_size; }
143  int get_req_size() const { return m_req_size; }
144  long long get_offset() const { return m_offset; }
145 
146  File* get_file() const { return m_file; }
147  IO* get_io() const { return m_io; }
148  void* get_req_id() const { return m_req_id; }
149 
150  bool is_finished() const { return m_downloaded || m_errno != 0; }
151  bool is_ok() const { return m_downloaded; }
152  bool is_failed() const { return m_errno != 0; }
153 
154  void set_downloaded() { m_downloaded = true; }
155  void set_error(int err) { m_errno = err; }
156  int get_error() const { return m_errno; }
157 
158  void reset_error_and_set_io(IO *io, void *rid)
159  {
160  m_errno = 0;
161  m_io = io;
162  m_req_id = rid;
163  }
164 
165  bool req_cksum_net() const { return m_req_cksum_net; }
166  bool has_cksums() const { return ! m_cksum_vec.empty(); }
170 };
171 
172 using BlockList_t = std::list<Block*>;
173 using BlockList_i = std::list<Block*>::iterator;
174 
175 // ================================================================
176 
178 {
179 public:
181 
183 
184  void Done(int result) override;
185 };
186 
187 // ----------------------------------------------------------------
188 
190 {
191 public:
196  int m_bytes_read = 0;
197  int m_errno = 0;
198 
199  DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait) :
200  m_file(file), m_read_req(rreq), m_to_wait(to_wait)
201  {}
202 
203  void Done(int result) override;
204 };
205 
206 // ================================================================
207 
208 class File
209 {
210  friend class BlockResponseHandler;
211  friend class DirectResponseHandler;
212 public:
213  // Constructor and Open() are private.
214 
216  static File* FileOpen(const std::string &path, long long offset, long long fileSize);
217 
219  ~File();
220 
223 
225  void BlocksRemovedFromWriteQ(std::list<Block*>&);
226 
228  int Read(IO *io, char* buff, long long offset, int size, ReadReqRH *rh);
229 
231  int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh);
232 
233  //----------------------------------------------------------------------
235  //----------------------------------------------------------------------
236  void ioUpdated(IO *io);
237 
238  //----------------------------------------------------------------------
241  //----------------------------------------------------------------------
242  bool ioActive(IO *io);
243 
244  //----------------------------------------------------------------------
247  //----------------------------------------------------------------------
249 
250  //----------------------------------------------------------------------
253  //----------------------------------------------------------------------
254  bool FinalizeSyncBeforeExit();
255 
256  //----------------------------------------------------------------------
258  //----------------------------------------------------------------------
259  void Sync();
260 
261  void WriteBlockToDisk(Block* b);
262 
263  void Prefetch();
264 
265  float GetPrefetchScore() const;
266 
268  const char* lPath() const;
269 
270  const std::string& GetLocalPath() const { return m_filename; }
271 
272  XrdSysError* GetLog();
274 
275  long long GetFileSize() const { return m_file_size; }
276 
277  void AddIO(IO *io);
280  void RemoveIO(IO *io);
281 
282  std::string GetRemoteLocations() const;
283  const Info::AStat* GetLastAccessStats() const { return m_cfi.GetLastAccessStats(); }
284  size_t GetAccessCnt() const { return m_cfi.GetAccessCnt(); }
285  int GetBlockSize() const { return m_cfi.GetBufferSize(); }
286  int GetNBlocks() const { return m_cfi.GetNBlocks(); }
287  int GetNDownloadedBlocks() const { return m_cfi.GetNDownloadedBlocks(); }
288  long long GetPrefetchedBytes() const { return m_prefetch_bytes; }
289  const Stats& RefStats() const { return m_stats; }
290 
291  int Fstat(struct stat &sbuff);
292 
293  // These three methods are called under Cache's m_active lock
294  int get_ref_cnt() { return m_ref_cnt; }
295  int inc_ref_cnt() { return ++m_ref_cnt; }
296  int dec_ref_cnt() { return --m_ref_cnt; }
297 
299  bool is_in_emergency_shutdown() { return m_in_shutdown; }
300 
301 private:
303  File(const std::string &path, long long offset, long long fileSize);
304 
306  bool Open();
307 
308  static const char *m_traceID;
309 
310  int m_ref_cnt;
311 
312  XrdOssDF *m_data_file;
313  XrdOssDF *m_info_file;
314  Info m_cfi;
315 
316  const std::string m_filename;
317  const long long m_offset;
318  const long long m_file_size;
319 
320  // IO objects attached to this file.
321 
322  typedef std::set<IO*> IoSet_t;
323  typedef IoSet_t::iterator IoSet_i;
324 
325  IoSet_t m_io_set;
326  IoSet_i m_current_io;
327  int m_ios_in_detach;
328 
329  // FSync
330 
331  std::vector<int> m_writes_during_sync;
332  int m_non_flushed_cnt;
333  bool m_in_sync;
334  bool m_detach_time_logged;
335  bool m_in_shutdown;
336 
337  // Block state and management
338 
339  typedef std::list<int> IntList_t;
340  typedef IntList_t::iterator IntList_i;
341 
342  typedef std::map<int, Block*> BlockMap_t;
343  typedef BlockMap_t::iterator BlockMap_i;
344 
345  BlockMap_t m_block_map;
346  XrdSysCondVar m_state_cond;
347  long long m_block_size;
348  int m_num_blocks;
349 
350  // Stats and ResourceMonitor interface
351 
352  Stats m_stats;
353  Stats m_delta_stats;
354  long long m_st_blocks;
355  long long m_resmon_report_threshold;
356  int m_resmon_token;
357 
358  void check_delta_stats();
359  void report_and_merge_delta_stats();
360 
361  std::set<std::string> m_remote_locations;
362  void insert_remote_location(const std::string &loc);
363 
364  // Prefetch
365 
366  enum PrefetchState_e { kOff=-1, kOn, kHold, kStopped, kComplete };
367 
368  PrefetchState_e m_prefetch_state;
369 
370  long long m_prefetch_bytes;
371  int m_prefetch_read_cnt;
372  int m_prefetch_hit_cnt;
373  float m_prefetch_score; // cached
374 
375  void inc_prefetch_read_cnt(int prc) { if (prc) { m_prefetch_read_cnt += prc; calc_prefetch_score(); } }
376  void inc_prefetch_hit_cnt (int phc) { if (phc) { m_prefetch_hit_cnt += phc; calc_prefetch_score(); } }
377  void calc_prefetch_score() { m_prefetch_score = float(m_prefetch_hit_cnt) / m_prefetch_read_cnt; }
378 
379  // Helpers
380 
381  bool overlap(int blk, // block to query
382  long long blk_size, //
383  long long req_off, // offset of user request
384  int req_size, // size of user request
385  // output:
386  long long &off, // offset in user buffer
387  long long &blk_off, // offset in block
388  int &size);
389 
390  // Read & ReadV
391 
392  Block* PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch);
393 
394  void ProcessBlockRequest (Block *b);
395  void ProcessBlockRequests(BlockList_t& blks);
396 
397  void RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size);
398 
399  int ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size);
400 
401  int ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
402  ReadReqRH *rh, const char *tpfx);
403 
404  void ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond);
405  void ProcessBlockError(Block *b, ReadRequest *rreq);
406  void ProcessBlockSuccess(Block *b, ChunkRequest &creq);
407  void FinalizeReadRequest(ReadRequest *rreq);
408 
409  void ProcessBlockResponse(Block *b, int res);
410 
411  // Block management
412 
413  void inc_ref_count(Block* b);
414  void dec_ref_count(Block* b, int count = 1);
415  void free_block(Block*);
416 
417  bool select_current_io_or_disable_prefetching(bool skip_current);
418 
419  int offsetIdx(int idx) const;
420 };
421 
422 //------------------------------------------------------------------------------
423 
424 inline void File::inc_ref_count(Block* b)
425 {
426  // Method always called under lock.
427  b->m_refcnt++;
428 }
429 
430 //------------------------------------------------------------------------------
431 
432 inline void File::dec_ref_count(Block* b, int count)
433 {
434  // Method always called under lock.
435  assert(b->is_finished());
436  b->m_refcnt -= count;
437  assert(b->m_refcnt >= 0);
438 
439  if (b->m_refcnt == 0)
440  {
441  free_block(b);
442  }
443 }
444 
445 }
446 
447 #endif
int stat(const char *path, struct stat *buf)
XrdOucString File
Definition: XrdJob.hh:43
void Done(int result) override
Definition: XrdPfcFile.cc:1645
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:169
int get_size() const
Definition: XrdPfcFile.hh:142
int get_error() const
Definition: XrdPfcFile.hh:156
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:168
Block(File *f, IO *io, void *rid, char *buf, long long off, int size, int rsize, bool m_prefetch, bool cks_net)
Definition: XrdPfcFile.hh:133
void * get_req_id() const
Definition: XrdPfcFile.hh:148
long long get_offset() const
Definition: XrdPfcFile.hh:144
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:131
bool is_finished() const
Definition: XrdPfcFile.hh:150
bool is_ok() const
Definition: XrdPfcFile.hh:151
void set_error(int err)
Definition: XrdPfcFile.hh:155
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:167
int m_n_cksum_errors
Definition: XrdPfcFile.hh:129
char * get_buff() const
Definition: XrdPfcFile.hh:141
IO * get_io() const
Definition: XrdPfcFile.hh:147
void set_downloaded()
Definition: XrdPfcFile.hh:154
bool req_cksum_net() const
Definition: XrdPfcFile.hh:165
void * m_req_id
Definition: XrdPfcFile.hh:117
bool has_cksums() const
Definition: XrdPfcFile.hh:166
File * get_file() const
Definition: XrdPfcFile.hh:146
bool is_failed() const
Definition: XrdPfcFile.hh:152
long long m_offset
Definition: XrdPfcFile.hh:120
vCkSum_t m_cksum_vec
Definition: XrdPfcFile.hh:128
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:158
bool m_req_cksum_net
Definition: XrdPfcFile.hh:127
int get_req_size() const
Definition: XrdPfcFile.hh:143
void Done(int result) override
Definition: XrdPfcFile.cc:1653
DirectResponseHandler(File *file, ReadRequest *rreq, int to_wait)
Definition: XrdPfcFile.hh:199
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:287
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1501
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:749
XrdSysTrace * GetTrace()
Definition: XrdPfcFile.cc:1599
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1032
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:109
float GetPrefetchScore() const
Definition: XrdPfcFile.cc:1589
XrdSysError * GetLog()
Definition: XrdPfcFile.cc:1594
int GetNBlocks() const
Definition: XrdPfcFile.hh:286
void Prefetch()
Definition: XrdPfcFile.cc:1516
void StopPrefetchingOnIO(IO *io)
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1613
const Info::AStat * GetLastAccessStats() const
Definition: XrdPfcFile.hh:283
size_t GetAccessCnt() const
Definition: XrdPfcFile.hh:284
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:527
void AddIO(IO *io)
Definition: XrdPfcFile.cc:311
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:281
long long GetPrefetchedBytes() const
Definition: XrdPfcFile.hh:288
int GetBlockSize() const
Definition: XrdPfcFile.hh:285
int GetNDownloadedBlocks() const
Definition: XrdPfcFile.hh:287
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:181
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:122
int inc_ref_cnt()
Definition: XrdPfcFile.hh:295
int GetPrefetchCountOnIO(IO *io)
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1115
int dec_ref_cnt()
Definition: XrdPfcFile.hh:296
int get_ref_cnt()
Definition: XrdPfcFile.hh:294
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:712
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:195
long long GetFileSize() const
Definition: XrdPfcFile.hh:275
const Stats & RefStats() const
Definition: XrdPfcFile.hh:289
~File()
Destructor.
Definition: XrdPfcFile.cc:80
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:348
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Definition: XrdPfcFile.cc:173
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:270
bool is_in_emergency_shutdown()
Definition: XrdPfcFile.hh:299
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:204
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
const AStat * GetLastAccessStats() const
Get latest access stats.
Definition: XrdPfcInfo.cc:491
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:469
int GetNDownloadedBlocks() const
Get number of downloaded blocks.
Definition: XrdPfcInfo.hh:398
size_t GetAccessCnt() const
Get number of accesses.
Definition: XrdPfcInfo.hh:261
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:437
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
XrdSysError Log
Definition: XrdConfig.cc:112
Definition: XrdPfc.hh:41
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:107
std::vector< ChunkRequest >::iterator vChunkRequest_i
Definition: XrdPfcFile.hh:108
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:172
std::vector< uint32_t > vCkSum_t
Definition: XrdPfcTypes.hh:31
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:173
ChunkRequest(ReadRequest *rreq, char *buf, long long off, int size)
Definition: XrdPfcFile.hh:102
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:97
Access statistics.
Definition: XrdPfcInfo.hh:57
XrdOucCacheIOCB * m_iocb
Definition: XrdPfcFile.hh:60
unsigned short m_seq_id
Definition: XrdPfcFile.hh:59
ReadReqRH(unsigned short sid, XrdOucCacheIOCB *iocb)
Definition: XrdPfcFile.hh:62
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:87
ReadRequest(IO *io, ReadReqRH *rh)
Definition: XrdPfcFile.hh:83
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:72
bool is_complete() const
Definition: XrdPfcFile.hh:89
int return_value() const
Definition: XrdPfcFile.hh:90
long long m_bytes_read
Definition: XrdPfcFile.hh:74