XRootD
XrdPfcFile.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 
20 #include "XrdPfcFile.hh"
21 #include "XrdPfc.hh"
22 #include "XrdPfcResourceMonitor.hh"
23 #include "XrdPfcIO.hh"
24 #include "XrdPfcTrace.hh"
25 
26 #include "XProtocol/XProtocol.hh"
27 #include "XrdSys/XrdSysTimer.hh"
28 #include "XrdOss/XrdOss.hh"
29 #include "XrdOuc/XrdOucEnv.hh"
31 
32 #include <cstdio>
33 #include <sstream>
34 #include <fcntl.h>
35 #include <cassert>
36 
37 
38 using namespace XrdPfc;
39 
40 namespace
41 {
42 
43 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
44 
45 Cache* cache() { return &Cache::GetInstance(); }
46 
47 }
48 
49 const char *File::m_traceID = "File";
50 
51 //------------------------------------------------------------------------------
52 
53 File::File(const std::string& path, long long iOffset, long long iFileSize) :
54  m_ref_cnt(0),
55  m_data_file(0),
56  m_info_file(0),
57  m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
58  m_filename(path),
59  m_offset(iOffset),
60  m_file_size(iFileSize),
61  m_current_io(m_io_set.end()),
62  m_ios_in_detach(0),
63  m_non_flushed_cnt(0),
64  m_in_sync(false),
65  m_detach_time_logged(false),
66  m_in_shutdown(false),
67  m_state_cond(0),
68  m_block_size(0),
69  m_num_blocks(0),
70  m_resmon_token(-1),
71  m_prefetch_state(kOff),
72  m_prefetch_bytes(0),
73  m_prefetch_read_cnt(0),
74  m_prefetch_hit_cnt(0),
75  m_prefetch_score(0)
76 {}
77 
78 File::~File()
79 {
80  TRACEF(Debug, "~File() for ");
81 }
82 
83 void File::Close()
84 {
85  // Close is called while nullptr is put into Cache::m_active map, see Cache::dec_ref_count(File*).
86  // A stat is called after close to re-check that m_stat_blocks have been reported correctly
87  // to the resource-monitor. Note that the reporting is already clamped down to m_file_size
88  // in report_and_merge_delta_stats() below.
89  //
90  // XFS can pre-allocate significant amount of blocks (1 GB at 1GB mark, 4 GB above 4GB) and those
91  // get reported in as stat.st_blocks.
92  // The reported number is correct in a stat immediately following a close.
93  // If one starts off by writing the last byte of the file, this pre-allocation does not get
94  // triggered up to that point. But comes back with a vengeance right after.
95  //
96  // To be determined if other FSes do something similar (Ceph, ZFS, ...). Ext4 doesn't.
97 
98  if (m_info_file)
99  {
100  TRACEF(Debug, "Close() closing info-file ");
101  m_info_file->Close();
102  delete m_info_file;
103  m_info_file = nullptr;
104  }
105 
106  if (m_data_file)
107  {
108  TRACEF(Debug, "Close() closing data-file ");
109  m_data_file->Close();
110  delete m_data_file;
111  m_data_file = nullptr;
112  }
113 
114  if (m_resmon_token >= 0)
115  {
116  // Last update of file stats has been sent from the final Sync unless we are in_shutdown --
117  // but in this case the file will get unlinked by the cache and reported as purge event.
118  // We check if the reported st_blocks so far is correct.
119  if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
120  struct stat s;
121  int sr = Cache::GetInstance().GetOss()->Stat(m_filename.c_str(), &s);
122  if (sr == 0 && s.st_blocks != m_st_blocks) {
123  Stats stats;
124  stats.m_StBlocksAdded = s.st_blocks - m_st_blocks;
125  m_st_blocks = s.st_blocks;
126  Cache::ResMon().register_file_update_stats(m_resmon_token, stats);
127  }
128  }
129 
130  Cache::ResMon().register_file_close(m_resmon_token, time(0), m_stats);
131  }
132 
133  TRACEF(Debug, "Close() finished, prefetch score = " << m_prefetch_score);
134 }
135 
136 //------------------------------------------------------------------------------
137 
138 File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
139 {
140  File *file = new File(path, offset, fileSize);
141  if ( ! file->Open())
142  {
143  delete file;
144  file = 0;
145  }
146  return file;
147 }
148 
149 //------------------------------------------------------------------------------
150 
152 {
153  // Called from Cache::Unlink() when the file is currently open.
154  // Cache::Unlink is also called on FSync error and when wrong number of bytes
155  // is received from a remote read.
156  //
157  // From this point onward the file will not be written to, cinfo file will
158  // not be updated, and all new read requests will return -ENOENT.
159  //
160  // File's entry in the Cache's active map is set to nullptr and will be
161  // removed from there shortly, in any case, well before this File object
162  // shuts down. Cache::Unlink() also reports the appropriate purge event.
163 
164  XrdSysCondVarHelper _lck(m_state_cond);
165 
166  m_in_shutdown = true;
167 
168  if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
169  {
170  m_prefetch_state = kStopped;
171  cache()->DeRegisterPrefetchFile(this);
172  }
173 
174  report_and_merge_delta_stats();
175 
176  return m_st_blocks;
177 }
178 
179 //------------------------------------------------------------------------------
180 
181 void File::check_delta_stats()
182 {
183  // Called under m_state_cond lock.
184  // BytesWritten indirectly trigger an unconditional merge through periodic Sync().
185  if (m_delta_stats.BytesReadAndWritten() >= m_resmon_report_threshold && ! m_in_shutdown)
186  report_and_merge_delta_stats();
187 }
188 
189 void File::report_and_merge_delta_stats()
190 {
191  // Called under m_state_cond lock.
192  struct stat s;
193  m_data_file->Fstat(&s);
194  // Do not report st_blocks beyond 4kB round-up over m_file_size. Some FSs report
195  // aggressive pre-allocation in this field (XFS, 4GB).
196  long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
197  : m_file_size >> 9;
198  long long st_blocks_to_report = std::min((long long) s.st_blocks, max_st_blocks_to_report);
199  m_delta_stats.m_StBlocksAdded = st_blocks_to_report - m_st_blocks;
200  m_st_blocks = st_blocks_to_report;
201  Cache::ResMon().register_file_update_stats(m_resmon_token, m_delta_stats);
202  m_stats.AddUp(m_delta_stats);
203  m_delta_stats.Reset();
204 }
205 
206 //------------------------------------------------------------------------------
207 
209 {
210  TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
211 
212  XrdSysCondVarHelper _lck(m_state_cond);
213  dec_ref_count(b);
214 }
215 
216 void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
217 {
218  TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
219 
220  XrdSysCondVarHelper _lck(m_state_cond);
221 
222  for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
223  {
224  dec_ref_count(*i);
225  }
226 }
227 
228 //------------------------------------------------------------------------------
229 
231 {
232  std::string loc(io->GetLocation());
233  XrdSysCondVarHelper _lck(m_state_cond);
234  insert_remote_location(loc);
235 }
236 
237 //------------------------------------------------------------------------------
238 
240 {
241  // Returns true if delay is needed.
242 
243  TRACEF(Debug, "ioActive start for io " << io);
244 
245  std::string loc(io->GetLocation());
246 
247  {
248  XrdSysCondVarHelper _lck(m_state_cond);
249 
250  IoSet_i mi = m_io_set.find(io);
251 
252  if (mi != m_io_set.end())
253  {
254  unsigned int n_active_reads = io->m_active_read_reqs;
255 
256  TRACE(Info, "ioActive for io " << io <<
257  ", active_reads " << n_active_reads <<
258  ", active_prefetches " << io->m_active_prefetches <<
259  ", allow_prefetching " << io->m_allow_prefetching <<
260  ", ios_in_detach " << m_ios_in_detach);
261  TRACEF(Info,
262  "\tio_map.size() " << m_io_set.size() <<
263  ", block_map.size() " << m_block_map.size() << ", file");
264 
265  insert_remote_location(loc);
266 
267  io->m_allow_prefetching = false;
268  io->m_in_detach = true;
269 
270  // Check if any IO is still available for prfetching. If not, stop it.
271  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
272  {
273  if ( ! select_current_io_or_disable_prefetching(false) )
274  {
275  TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
276  }
277  }
278 
279  // On last IO, consider write queue blocks. Note, this also contains
280  // blocks being prefetched.
281 
282  bool io_active_result;
283 
284  if (n_active_reads > 0)
285  {
286  io_active_result = true;
287  }
288  else if (m_io_set.size() - m_ios_in_detach == 1)
289  {
290  io_active_result = ! m_block_map.empty();
291  }
292  else
293  {
294  io_active_result = io->m_active_prefetches > 0;
295  }
296 
297  if ( ! io_active_result)
298  {
299  ++m_ios_in_detach;
300  }
301 
302  TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
303 
304  return io_active_result;
305  }
306  else
307  {
308  TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
309  return false;
310  }
311  }
312 }
313 
314 //------------------------------------------------------------------------------
315 
317 {
318  XrdSysCondVarHelper _lck(m_state_cond);
319  m_detach_time_logged = false;
320 }
321 
323 {
324  // Returns true if sync is required.
325  // This method is called after corresponding IO is detached from PosixCache.
326 
327  XrdSysCondVarHelper _lck(m_state_cond);
328  if ( ! m_in_shutdown)
329  {
330  if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
331  {
332  report_and_merge_delta_stats();
333  m_cfi.WriteIOStatDetach(m_stats);
334  m_detach_time_logged = true;
335  m_in_sync = true;
336  TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
337  return true;
338  }
339  }
340  TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
341  return false;
342 }
343 
344 //------------------------------------------------------------------------------
345 
346 void File::AddIO(IO *io)
347 {
348  // Called from Cache::GetFile() when a new IO asks for the file.
349 
350  TRACEF(Debug, "AddIO() io = " << (void*)io);
351 
352  time_t now = time(0);
353  std::string loc(io->GetLocation());
354 
355  m_state_cond.Lock();
356 
357  IoSet_i mi = m_io_set.find(io);
358 
359  if (mi == m_io_set.end())
360  {
361  m_io_set.insert(io);
362  io->m_attach_time = now;
363  m_delta_stats.IoAttach();
364 
365  insert_remote_location(loc);
366 
367  if (m_prefetch_state == kStopped)
368  {
369  m_prefetch_state = kOn;
370  cache()->RegisterPrefetchFile(this);
371  }
372  }
373  else
374  {
375  TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
376  }
377 
378  m_state_cond.UnLock();
379 }
380 
381 //------------------------------------------------------------------------------
382 
384 {
385  // Called from Cache::ReleaseFile.
386 
387  TRACEF(Debug, "RemoveIO() io = " << (void*)io);
388 
389  time_t now = time(0);
390 
391  m_state_cond.Lock();
392 
393  IoSet_i mi = m_io_set.find(io);
394 
395  if (mi != m_io_set.end())
396  {
397  if (mi == m_current_io)
398  {
399  ++m_current_io;
400  }
401 
402  m_delta_stats.IoDetach(now - io->m_attach_time);
403  m_io_set.erase(mi);
404  --m_ios_in_detach;
405 
406  if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
407  {
408  TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
409  m_prefetch_state = kStopped;
410  cache()->DeRegisterPrefetchFile(this);
411  }
412  }
413  else
414  {
415  TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
416  }
417 
418  m_state_cond.UnLock();
419 }
420 
421 //------------------------------------------------------------------------------
422 
423 bool File::Open()
424 {
425  // Sets errno accordingly.
426 
427  static const char *tpfx = "Open() ";
428 
429  TRACEF(Dump, tpfx << "entered");
430 
431  // Before touching anything, check with ResourceMonitor if a scan is in progress.
432  // This function will wait internally if needed until it is safe to proceed.
433  Cache::ResMon().CrossCheckIfScanIsInProgress(m_filename, m_state_cond);
434 
436 
437  XrdOss &myOss = * Cache::GetInstance().GetOss();
438  const char *myUser = conf.m_username.c_str();
439  XrdOucEnv myEnv;
440  struct stat data_stat, info_stat;
441 
442  std::string ifn = m_filename + Info::s_infoExtension;
443 
444  bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
445  bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
446 
447  // Create the data file itself.
448  char size_str[32]; sprintf(size_str, "%lld", m_file_size);
449  myEnv.Put("oss.asize", size_str);
450  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
451 
452  int res;
453 
454  if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
455  {
456  TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
457  errno = -res;
458  return false;
459  }
460 
461  m_data_file = myOss.newFile(myUser);
462  if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
463  {
464  TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
465  errno = -res;
466  delete m_data_file; m_data_file = 0;
467  return false;
468  }
469 
470  myEnv.Put("oss.asize", "64k"); // Advisory, block-map and access list lengths vary.
471  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
472  if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
473  {
474  TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
475  errno = -res;
476  m_data_file->Close(); delete m_data_file; m_data_file = 0;
477  return false;
478  }
479 
480  m_info_file = myOss.newFile(myUser);
481  if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
482  {
483  TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
484  errno = -res;
485  delete m_info_file; m_info_file = 0;
486  m_data_file->Close(); delete m_data_file; m_data_file = 0;
487  return false;
488  }
489 
490  bool initialize_info_file = true;
491 
492  if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
493  {
494  TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
495  ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
496  ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");
497 
498  // Check if data file exists and is of reasonable size.
499  if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
500  {
501  initialize_info_file = false;
502  } else {
503  TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
504  m_cfi.ResetAllAccessStats();
505  m_data_file->Ftruncate(0);
506  Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
507  }
508  }
509 
510  if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
511  {
512  if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
513  conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
514  {
515  TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
516  initialize_info_file = true;
517  m_cfi.ResetAllAccessStats();
518  m_data_file->Ftruncate(0);
519  Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
520  } else {
521  // TODO: If the file is complete, we don't need to reset net cksums.
522  m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
523  }
524  }
525 
526  if (initialize_info_file)
527  {
528  m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size);
529  m_cfi.SetCkSumState(conf.get_cs_Chk());
530  m_cfi.ResetNoCkSumTime();
531  m_cfi.Write(m_info_file, ifn.c_str());
532  m_info_file->Fsync();
533  cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
534  TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
535  }
536  else
537  {
538  if (futimens(m_info_file->getFD(), NULL)) {
539  TRACEF(Error, tpfx << "failed setting modification time " << ERRNO_AND_ERRSTR(errno));
540  }
541  }
542 
543  m_cfi.WriteIOStatAttach();
544  m_state_cond.Lock();
545  m_block_size = m_cfi.GetBufferSize();
546  m_num_blocks = m_cfi.GetNBlocks();
547  m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
548 
549  m_data_file->Fstat(&data_stat);
550  m_st_blocks = data_stat.st_blocks;
551 
552  m_resmon_token = Cache::ResMon().register_file_open(m_filename, time(0), data_existed);
553  constexpr long long MB = 1024 * 1024;
554  m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
555  // m_resmon_report_threshold_scaler; // something like 10% of original threshold, to adjust
556  // actual threshold based on return values from register_file_update_stats().
557 
558  m_state_cond.UnLock();
559 
560  return true;
561 }
562 
563 int File::Fstat(struct stat &sbuff)
564 {
565  // Stat on an open file.
566  // Corrects size to actual full size of the file.
567  // Sets atime to 0 if the file is only partially downloaded, in accordance
568  // with pfc.onlyifcached settings.
569  // Called from IO::Fstat() and Cache::Stat() when the file is active.
570  // Returns 0 on success, -errno on error.
571 
572  int res;
573 
574  if ((res = m_data_file->Fstat(&sbuff))) return res;
575 
576  sbuff.st_size = m_file_size;
577 
578  bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
579  if ( ! is_cached)
580  sbuff.st_atime = 0;
581 
582  return 0;
583 }
584 
585 //==============================================================================
586 // Read and helpers
587 //==============================================================================
588 
589 bool File::overlap(int blk, // block to query
590  long long blk_size, //
591  long long req_off, // offset of user request
592  int req_size, // size of user request
593  // output:
594  long long &off, // offset in user buffer
595  long long &blk_off, // offset in block
596  int &size) // size to copy
597 {
598  const long long beg = blk * blk_size;
599  const long long end = beg + blk_size;
600  const long long req_end = req_off + req_size;
601 
602  if (req_off < end && req_end > beg)
603  {
604  const long long ovlp_beg = std::max(beg, req_off);
605  const long long ovlp_end = std::min(end, req_end);
606 
607  off = ovlp_beg - req_off;
608  blk_off = ovlp_beg - beg;
609  size = (int) (ovlp_end - ovlp_beg);
610 
611  assert(size <= blk_size);
612  return true;
613  }
614  else
615  {
616  return false;
617  }
618 }
619 
620 //------------------------------------------------------------------------------
621 
622 Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
623 {
624  // Must be called w/ state_cond locked.
625  // Checks on size etc should be done before.
626  //
627  // Reference count is 0 so increase it in calling function if you want to
628  // catch the block while still in memory.
629 
630  const long long off = i * m_block_size;
631  const int last_block = m_num_blocks - 1;
632  const bool cs_net = cache()->RefConfiguration().is_cschk_net();
633 
634  int blk_size, req_size;
635  if (i == last_block) {
636  blk_size = req_size = m_file_size - off;
637  if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
638  } else {
639  blk_size = req_size = m_block_size;
640  }
641 
642  Block *b = 0;
643  char *buf = cache()->RequestRAM(req_size);
644 
645  if (buf)
646  {
647  b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
648 
649  if (b)
650  {
651  m_block_map[i] = b;
652 
653  // Actual Read request is issued in ProcessBlockRequests().
654 
655  if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
656  {
657  m_prefetch_state = kHold;
658  cache()->DeRegisterPrefetchFile(this);
659  }
660  }
661  else
662  {
663  TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
664  }
665  }
666 
667  return b;
668 }
669 
670 void File::ProcessBlockRequest(Block *b)
671 {
672  // This *must not* be called with block_map locked.
673 
675 
676  if (XRD_TRACE What >= TRACE_Dump) {
677  char buf[256];
678  snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
679  b->get_offset()/m_block_size, b, b->m_prefetch, b->get_offset(), b->get_req_size(), b->get_buff(), brh);
680  TRACEF(Dump, "ProcessBlockRequest() " << buf);
681  }
682 
683  if (b->req_cksum_net())
684  {
685  b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
686  b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
687  } else {
688  b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
689  }
690 }
691 
692 void File::ProcessBlockRequests(BlockList_t& blks)
693 {
694  // This *must not* be called with block_map locked.
695 
696  for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
697  {
698  ProcessBlockRequest(*bi);
699  }
700 }
701 
702 //------------------------------------------------------------------------------
703 
704 void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
705 {
706  int n_chunks = ioVec.size();
707  int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
708 
709  TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
710  ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
711 
712  DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
713 
714  int pos = 0;
715  while (n_chunks > XrdProto::maxRvecsz) {
716  io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
717  pos += XrdProto::maxRvecsz;
718  n_chunks -= XrdProto::maxRvecsz;
719  }
720  io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
721 }
722 
723 //------------------------------------------------------------------------------
724 
725 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
726 {
727  TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
728 
729  long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
730 
731  if (rs < 0)
732  {
733  TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
734  return rs;
735  }
736 
737  if (rs != expected_size)
738  {
739  TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
740  return -EIO;
741  }
742 
743  return (int) rs;
744 }
745 
746 //------------------------------------------------------------------------------
747 
748 int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
749 {
750  // rrc_func is ONLY called from async processing.
751  // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
752  // This streamlines implementation of synchronous IO::Read().
753 
754  TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
755 
756  m_state_cond.Lock();
757 
758  if (m_in_shutdown || io->m_in_detach)
759  {
760  m_state_cond.UnLock();
761  return m_in_shutdown ? -ENOENT : -EBADF;
762  }
763 
764  // Shortcut -- file is fully downloaded.
765 
766  if (m_cfi.IsComplete())
767  {
768  m_state_cond.UnLock();
769  int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
770  if (ret > 0) {
771  XrdSysCondVarHelper _lck(m_state_cond);
772  m_delta_stats.AddBytesHit(ret);
773  check_delta_stats();
774  }
775  return ret;
776  }
777 
778  XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
779 
780  return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
781 }
782 
783 //------------------------------------------------------------------------------
784 
785 int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
786 {
787  TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
788 
789  m_state_cond.Lock();
790 
791  if (m_in_shutdown || io->m_in_detach)
792  {
793  m_state_cond.UnLock();
794  return m_in_shutdown ? -ENOENT : -EBADF;
795  }
796 
797  // Shortcut -- file is fully downloaded.
798 
799  if (m_cfi.IsComplete())
800  {
801  m_state_cond.UnLock();
802  int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
803  if (ret > 0) {
804  XrdSysCondVarHelper _lck(m_state_cond);
805  m_delta_stats.AddBytesHit(ret);
806  check_delta_stats();
807  }
808  return ret;
809  }
810 
811  return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
812 }
813 
814 //------------------------------------------------------------------------------
815 
816 int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
817  ReadReqRH *rh, const char *tpfx)
818 {
819  // Non-trivial processing for Read and ReadV.
820  // Entered under lock.
821  //
822  // loop over reqired blocks:
823  // - if on disk, ok;
824  // - if in ram or incoming, inc ref-count
825  // - otherwise request and inc ref count (unless RAM full => request direct)
826  // unlock
827 
828  int prefetch_cnt = 0;
829 
830  ReadRequest *read_req = nullptr;
831  BlockList_t blks_to_request; // blocks we are issuing a new remote request for
832 
833  std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
834 
835  std::vector<XrdOucIOVec> iovec_disk;
836  std::vector<XrdOucIOVec> iovec_direct;
837  int iovec_disk_total = 0;
838  int iovec_direct_total = 0;
839 
840  for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
841  {
842  const XrdOucIOVec &iov = readV[iov_idx];
843  long long iUserOff = iov.offset;
844  int iUserSize = iov.size;
845  char *iUserBuff = iov.data;
846 
847  const int idx_first = iUserOff / m_block_size;
848  const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
849 
850  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
851 
852  enum LastBlock_e { LB_other, LB_disk, LB_direct };
853 
854  LastBlock_e lbe = LB_other;
855 
856  for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
857  {
858  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
859  BlockMap_i bi = m_block_map.find(block_idx);
860 
861  // overlap and read
862  long long off; // offset in user buffer
863  long long blk_off; // offset in block
864  int size; // size to copy
865 
866  overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
867 
868  // In RAM or incoming?
869  if (bi != m_block_map.end())
870  {
871  inc_ref_count(bi->second);
872  TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
873 
874  if (bi->second->is_finished())
875  {
876  // note, blocks with error should not be here !!!
877  // they should be either removed or reissued in ProcessBlockResponse()
878  assert(bi->second->is_ok());
879 
880  blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
881 
882  if (bi->second->m_prefetch)
883  ++prefetch_cnt;
884  }
885  else
886  {
887  if ( ! read_req)
888  read_req = new ReadRequest(io, rh);
889 
890  // We have a lock on state_cond --> as we register the request before releasing the lock,
891  // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
892 
893  bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
894  ++read_req->m_n_chunk_reqs;
895  }
896 
897  lbe = LB_other;
898  }
899  // On disk?
900  else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
901  {
902  TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
903 
904  if (lbe == LB_disk)
905  iovec_disk.back().size += size;
906  else
907  iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
908  iovec_disk_total += size;
909 
910  if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
911  ++prefetch_cnt;
912 
913  lbe = LB_disk;
914  }
915  // Neither ... then we have to go get it ...
916  else
917  {
918  if ( ! read_req)
919  read_req = new ReadRequest(io, rh);
920 
921  // Is there room for one more RAM Block?
922  Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
923  if (b)
924  {
925  TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
926  inc_ref_count(b);
927  blks_to_request.push_back(b);
928 
929  b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
930  ++read_req->m_n_chunk_reqs;
931 
932  lbe = LB_other;
933  }
934  else // Nope ... read this directly without caching.
935  {
936  TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
937 
938  iovec_direct_total += size;
939  read_req->m_direct_done = false;
940 
941  // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
942  // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
943  // is determined in the RequestBlocksDirect().
944  if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
945  iovec_direct.back().size += size;
946  } else {
947  long long in_offset = block_idx * m_block_size + blk_off;
948  char *out_pos = iUserBuff + off;
949  while (size > XrdProto::maxRVdsz) {
950  iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
951  in_offset += XrdProto::maxRVdsz;
952  out_pos += XrdProto::maxRVdsz;
953  size -= XrdProto::maxRVdsz;
954  }
955  iovec_direct.push_back( { in_offset, size, 0, out_pos } );
956  }
957 
958  lbe = LB_direct;
959  }
960  }
961  } // end for over blocks in an IOVec
962  } // end for over readV IOVec
963 
964  inc_prefetch_hit_cnt(prefetch_cnt);
965 
966  m_state_cond.UnLock();
967 
968  // First, send out remote requests for new blocks.
969  if ( ! blks_to_request.empty())
970  {
971  ProcessBlockRequests(blks_to_request);
972  blks_to_request.clear();
973  }
974 
975  // Second, send out remote direct read requests.
976  if ( ! iovec_direct.empty())
977  {
978  RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
979 
980  TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
981  }
982 
983  // Begin synchronous part where we process data that is already in RAM or on disk.
984 
985  long long bytes_read = 0;
986  int error_cond = 0; // to be set to -errno
987 
988  // Third, process blocks that are available in RAM.
989  if ( ! blks_ready.empty())
990  {
991  for (auto &bvi : blks_ready)
992  {
993  for (auto &cr : bvi.second)
994  {
995  TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
996  memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
997  bytes_read += cr.m_size;
998  }
999  }
1000  }
1001 
1002  // Fourth, read blocks from disk.
1003  if ( ! iovec_disk.empty())
1004  {
1005  int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1006  TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
1007  if (rc >= 0)
1008  {
1009  bytes_read += rc;
1010  }
1011  else
1012  {
1013  error_cond = rc;
1014  TRACEF(Error, tpfx << "failed read from disk");
1015  }
1016  }
1017 
1018  // End synchronous part -- update with sync stats and determine actual state of this read.
1019  // Note: remote reads might have already finished during disk-read!
1020 
1021  m_state_cond.Lock();
1022 
1023  for (auto &bvi : blks_ready)
1024  dec_ref_count(bvi.first, (int) bvi.second.size());
1025 
1026  if (read_req)
1027  {
1028  read_req->m_bytes_read += bytes_read;
1029  if (error_cond)
1030  read_req->update_error_cond(error_cond);
1031  read_req->m_stats.m_BytesHit += bytes_read;
1032  read_req->m_sync_done = true;
1033 
1034  if (read_req->is_complete())
1035  {
1036  // Almost like FinalizeReadRequest(read_req) -- but no callout!
1037  m_delta_stats.AddReadStats(read_req->m_stats);
1038  check_delta_stats();
1039  m_state_cond.UnLock();
1040 
1041  int ret = read_req->return_value();
1042  delete read_req;
1043  return ret;
1044  }
1045  else
1046  {
1047  m_state_cond.UnLock();
1048  return -EWOULDBLOCK;
1049  }
1050  }
1051  else
1052  {
1053  m_delta_stats.m_BytesHit += bytes_read;
1054  check_delta_stats();
1055  m_state_cond.UnLock();
1056 
1057  // !!! No callout.
1058 
1059  return error_cond ? error_cond : bytes_read;
1060  }
1061 }
1062 
1063 
1064 //==============================================================================
1065 // WriteBlock and Sync
1066 //==============================================================================
1067 
1069 {
1070  // write block buffer into disk file
1071  long long offset = b->m_offset - m_offset;
1072  long long size = b->get_size();
1073  ssize_t retval;
1074 
1075  if (m_cfi.IsCkSumCache())
1076  if (b->has_cksums())
1077  retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1078  else
1079  retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1080  else
1081  retval = m_data_file->Write(b->get_buff(), offset, size);
1082 
1083  if (retval < size)
1084  {
1085  if (retval < 0) {
1086  TRACEF(Error, "WriteToDisk() write error " << retval);
1087  } else {
1088  TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1089  }
1090 
1091  XrdSysCondVarHelper _lck(m_state_cond);
1092 
1093  dec_ref_count(b);
1094 
1095  return;
1096  }
1097 
1098  const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1099 
1100  // Set written bit.
1101  TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1102 
1103  bool schedule_sync = false;
1104  {
1105  XrdSysCondVarHelper _lck(m_state_cond);
1106 
1107  m_cfi.SetBitWritten(blk_idx);
1108 
1109  if (b->m_prefetch)
1110  {
1111  m_cfi.SetBitPrefetch(blk_idx);
1112  }
1113  if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1114  {
1115  m_cfi.ResetCkSumNet();
1116  }
1117 
1118  // Set synced bit or stash block index if in actual sync.
1119  // Synced state is only written out to cinfo file when data file is synced.
1120  if (m_in_sync)
1121  {
1122  m_writes_during_sync.push_back(blk_idx);
1123  }
1124  else
1125  {
1126  m_cfi.SetBitSynced(blk_idx);
1127  ++m_non_flushed_cnt;
1128  if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1129  ! m_in_shutdown)
1130  {
1131  schedule_sync = true;
1132  m_in_sync = true;
1133  m_non_flushed_cnt = 0;
1134  }
1135  }
1136  // As soon as the reference count is decreased on the block, the
1137  // file object may be deleted. Thus, to avoid holding both locks at a time,
1138  // we defer the ref count decrease until later if a sync is needed
1139  if (!schedule_sync) {
1140  dec_ref_count(b);
1141  }
1142  }
1143 
1144  if (schedule_sync)
1145  {
1146  cache()->ScheduleFileSync(this);
1147  XrdSysCondVarHelper _lck(m_state_cond);
1148  dec_ref_count(b);
1149  }
1150 }
1151 
1152 //------------------------------------------------------------------------------
1153 
1155 {
1156  TRACEF(Dump, "Sync()");
1157 
1158  int ret = m_data_file->Fsync();
1159  bool errorp = false;
1160  if (ret == XrdOssOK)
1161  {
1162  Stats loc_stats;
1163  {
1164  XrdSysCondVarHelper _lck(&m_state_cond);
1165  report_and_merge_delta_stats();
1166  loc_stats = m_stats;
1167  }
1168  m_cfi.WriteIOStat(loc_stats);
1169  m_cfi.Write(m_info_file, m_filename.c_str());
1170  int cret = m_info_file->Fsync();
1171  if (cret != XrdOssOK)
1172  {
1173  TRACEF(Error, "Sync cinfo file sync error " << cret);
1174  errorp = true;
1175  }
1176  }
1177  else
1178  {
1179  TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1180  errorp = true;
1181  }
1182 
1183  if (errorp)
1184  {
1185  TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1186 
1187  // Unlink will also call this->initiate_emergency_shutdown()
1188  Cache::GetInstance().UnlinkFile(m_filename, false);
1189 
1190  XrdSysCondVarHelper _lck(&m_state_cond);
1191 
1192  m_writes_during_sync.clear();
1193  m_in_sync = false;
1194 
1195  return;
1196  }
1197 
1198  int written_while_in_sync;
1199  bool resync = false;
1200  {
1201  XrdSysCondVarHelper _lck(&m_state_cond);
1202  for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1203  {
1204  m_cfi.SetBitSynced(*i);
1205  }
1206  written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1207  m_writes_during_sync.clear();
1208 
1209  // If there were writes during sync and the file is now complete,
1210  // let us call Sync again without resetting the m_in_sync flag.
1211  if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1212  resync = true;
1213  else
1214  m_in_sync = false;
1215  }
1216  TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1217 
1218  if (resync)
1219  Sync();
1220 }
1221 
1222 
1223 //==============================================================================
1224 // Block processing
1225 //==============================================================================
1226 
1227 void File::free_block(Block* b)
1228 {
1229  // Method always called under lock.
1230  int i = b->m_offset / m_block_size;
1231  TRACEF(Dump, "free_block block " << b << " idx = " << i);
1232  size_t ret = m_block_map.erase(i);
1233  if (ret != 1)
1234  {
1235  // assert might be a better option than a warning
1236  TRACEF(Error, "free_block did not erase " << i << " from map");
1237  }
1238  else
1239  {
1240  cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1241  delete b;
1242  }
1243 
1244  if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
1245  {
1246  m_prefetch_state = kOn;
1247  cache()->RegisterPrefetchFile(this);
1248  }
1249 }
1250 
1251 //------------------------------------------------------------------------------
1252 
1253 bool File::select_current_io_or_disable_prefetching(bool skip_current)
1254 {
1255  // Method always called under lock. It also expects prefetch to be active.
1256 
1257  int io_size = (int) m_io_set.size();
1258  bool io_ok = false;
1259 
1260  if (io_size == 1)
1261  {
1262  io_ok = (*m_io_set.begin())->m_allow_prefetching;
1263  if (io_ok)
1264  {
1265  m_current_io = m_io_set.begin();
1266  }
1267  }
1268  else if (io_size > 1)
1269  {
1270  IoSet_i mi = m_current_io;
1271  if (skip_current && mi != m_io_set.end()) ++mi;
1272 
1273  for (int i = 0; i < io_size; ++i)
1274  {
1275  if (mi == m_io_set.end()) mi = m_io_set.begin();
1276 
1277  if ((*mi)->m_allow_prefetching)
1278  {
1279  m_current_io = mi;
1280  io_ok = true;
1281  break;
1282  }
1283  ++mi;
1284  }
1285  }
1286 
1287  if ( ! io_ok)
1288  {
1289  m_current_io = m_io_set.end();
1290  m_prefetch_state = kStopped;
1291  cache()->DeRegisterPrefetchFile(this);
1292  }
1293 
1294  return io_ok;
1295 }
1296 
1297 //------------------------------------------------------------------------------
1298 
1299 void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1300 {
1301  // Called from DirectResponseHandler.
1302  // NOT under lock.
1303 
1304  if (error_cond)
1305  TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1306 
1307  m_state_cond.Lock();
1308 
1309  if (error_cond)
1310  rreq->update_error_cond(error_cond);
1311  else {
1312  rreq->m_stats.m_BytesBypassed += bytes_read;
1313  rreq->m_bytes_read += bytes_read;
1314  }
1315 
1316  rreq->m_direct_done = true;
1317 
1318  bool rreq_complete = rreq->is_complete();
1319 
1320  m_state_cond.UnLock();
1321 
1322  if (rreq_complete)
1323  FinalizeReadRequest(rreq);
1324 }
1325 
1326 void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1327 {
1328  // Called from ProcessBlockResponse().
1329  // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1330  // Does not manage m_read_req.
1331  // Will not complete the request.
1332 
1333  TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1334  " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1335 
1336  rreq->update_error_cond(b->get_error());
1337  --rreq->m_n_chunk_reqs;
1338 
1339  dec_ref_count(b);
1340 }
1341 
1342 void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1343 {
1344  // Called from ProcessBlockResponse().
1345  // NOT under lock as it does memcopy ofor exisf block data.
1346  // Acquires lock for block, m_read_req and rreq state update.
1347 
1348  ReadRequest *rreq = creq.m_read_req;
1349 
1350  TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1351  memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1352 
1353  m_state_cond.Lock();
1354 
1355  rreq->m_bytes_read += creq.m_size;
1356 
1357  if (b->get_req_id() == (void*) rreq)
1358  rreq->m_stats.m_BytesMissed += creq.m_size;
1359  else
1360  rreq->m_stats.m_BytesHit += creq.m_size;
1361 
1362  --rreq->m_n_chunk_reqs;
1363 
1364  if (b->m_prefetch)
1365  inc_prefetch_hit_cnt(1);
1366 
1367  dec_ref_count(b);
1368 
1369  bool rreq_complete = rreq->is_complete();
1370 
1371  m_state_cond.UnLock();
1372 
1373  if (rreq_complete)
1374  FinalizeReadRequest(rreq);
1375 }
1376 
1377 void File::FinalizeReadRequest(ReadRequest *rreq)
1378 {
1379  // called from ProcessBlockResponse()
1380  // NOT under lock -- does callout
1381  {
1382  XrdSysCondVarHelper _lck(m_state_cond);
1383  m_delta_stats.AddReadStats(rreq->m_stats);
1384  check_delta_stats();
1385  }
1386 
1387  rreq->m_rh->Done(rreq->return_value());
1388  delete rreq;
1389 }
1390 
1391 void File::ProcessBlockResponse(Block *b, int res)
1392 {
1393  static const char* tpfx = "ProcessBlockResponse ";
1394 
1395  TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1396 
1397  if (res >= 0 && res != b->get_size())
1398  {
1399  // Incorrect number of bytes received, apparently size of the file on the remote
1400  // is different than what the cache expects it to be.
1401  TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1402  Cache::GetInstance().UnlinkFile(m_filename, false);
1403  }
1404 
1405  m_state_cond.Lock();
1406 
1407  // Deregister block from IO's prefetch count, if needed.
1408  if (b->m_prefetch)
1409  {
1410  IO *io = b->get_io();
1411  IoSet_i mi = m_io_set.find(io);
1412  if (mi != m_io_set.end())
1413  {
1414  --io->m_active_prefetches;
1415 
1416  // If failed and IO is still prefetching -- disable prefetching on this IO.
1417  if (res < 0 && io->m_allow_prefetching)
1418  {
1419  TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1420  io->m_allow_prefetching = false;
1421 
1422  // Check if any IO is still available for prfetching. If not, stop it.
1423  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1424  {
1425  if ( ! select_current_io_or_disable_prefetching(false) )
1426  {
1427  TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1428  }
1429  }
1430  }
1431 
1432  // If failed with no subscribers -- delete the block and exit.
1433  if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1434  {
1435  free_block(b);
1436  m_state_cond.UnLock();
1437  return;
1438  }
1439  m_prefetch_bytes += b->get_size();
1440  }
1441  else
1442  {
1443  TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1444  }
1445  }
1446 
1447  if (res == b->get_size())
1448  {
1449  b->set_downloaded();
1450  TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1451  if ( ! m_in_shutdown)
1452  {
1453  // Increase ref-count for the writer.
1454  inc_ref_count(b);
1455  m_delta_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1456  // No check for writes, report-and-merge forced during Sync().
1457  cache()->AddWriteTask(b, true);
1458  }
1459 
1460  // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1461  vChunkRequest_t creqs_to_notify;
1462  creqs_to_notify.swap( b->m_chunk_reqs );
1463 
1464  m_state_cond.UnLock();
1465 
1466  for (auto &creq : creqs_to_notify)
1467  {
1468  ProcessBlockSuccess(b, creq);
1469  }
1470  }
1471  else
1472  {
1473  if (res < 0) {
1474  bool new_error = b->get_io()->register_block_error(res);
1475  int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1476  TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1477  << ", io=" << b->get_io() << ", error=" << res);
1478  } else {
1479  bool first_p = b->get_io()->register_incomplete_read();
1480  int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1481  TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1482  << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1483 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1484  res = -EIO;
1485 #else
1486  res = -EREMOTEIO;
1487 #endif
1488  }
1489  b->set_error(res);
1490 
1491  // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1492  // Collect others with a different IO, the first of them will be used to reissue the request.
1493  // This is then done outside of lock.
1494  std::list<ReadRequest*> rreqs_to_complete;
1495  vChunkRequest_t creqs_to_keep;
1496 
1497  for(ChunkRequest &creq : b->m_chunk_reqs)
1498  {
1499  ReadRequest *rreq = creq.m_read_req;
1500 
1501  if (rreq->m_io == b->get_io())
1502  {
1503  ProcessBlockError(b, rreq);
1504  if (rreq->is_complete())
1505  {
1506  rreqs_to_complete.push_back(rreq);
1507  }
1508  }
1509  else
1510  {
1511  creqs_to_keep.push_back(creq);
1512  }
1513  }
1514 
1515  bool reissue = false;
1516  if ( ! creqs_to_keep.empty())
1517  {
1518  ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1519 
1520  TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1521  b->get_io() << " - reissuing request with my io " << rreq->m_io);
1522 
1523  b->reset_error_and_set_io(rreq->m_io, rreq);
1524  b->m_chunk_reqs.swap( creqs_to_keep );
1525  reissue = true;
1526  }
1527 
1528  m_state_cond.UnLock();
1529 
1530  for (auto rreq : rreqs_to_complete)
1531  FinalizeReadRequest(rreq);
1532 
1533  if (reissue)
1534  ProcessBlockRequest(b);
1535  }
1536 }
1537 
1538 //------------------------------------------------------------------------------
1539 
1540 const char* File::lPath() const
1541 {
1542  return m_filename.c_str();
1543 }
1544 
1545 //------------------------------------------------------------------------------
1546 
1547 int File::offsetIdx(int iIdx) const
1548 {
1549  return iIdx - m_offset/m_block_size;
1550 }
1551 
1552 
1553 //------------------------------------------------------------------------------
1554 
1556 {
1557  // Check that block is not on disk and not in RAM.
1558  // TODO: Could prefetch several blocks at once!
1559  // blks_max could be an argument
1560 
1561  BlockList_t blks;
1562 
1563  TRACEF(DumpXL, "Prefetch() entering.");
1564  {
1565  XrdSysCondVarHelper _lck(m_state_cond);
1566 
1567  if (m_prefetch_state != kOn)
1568  {
1569  return;
1570  }
1571 
1572  if ( ! select_current_io_or_disable_prefetching(true) )
1573  {
1574  TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1575  return;
1576  }
1577 
1578  // Select block(s) to fetch.
1579  for (int f = 0; f < m_num_blocks; ++f)
1580  {
1581  if ( ! m_cfi.TestBitWritten(f))
1582  {
1583  int f_act = f + m_offset / m_block_size;
1584 
1585  BlockMap_i bi = m_block_map.find(f_act);
1586  if (bi == m_block_map.end())
1587  {
1588  Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1589  if (b)
1590  {
1591  TRACEF(Dump, "Prefetch take block " << f_act);
1592  blks.push_back(b);
1593  // Note: block ref_cnt not increased, it will be when placed into write queue.
1594 
1595  inc_prefetch_read_cnt(1);
1596  }
1597  else
1598  {
1599  // This shouldn't happen as prefetching stops when RAM is 70% full.
1600  TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1601  }
1602  break;
1603  }
1604  }
1605  }
1606 
1607  if (blks.empty())
1608  {
1609  TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1610  m_prefetch_state = kComplete;
1611  cache()->DeRegisterPrefetchFile(this);
1612  }
1613  else
1614  {
1615  (*m_current_io)->m_active_prefetches += (int) blks.size();
1616  }
1617  }
1618 
1619  if ( ! blks.empty())
1620  {
1621  ProcessBlockRequests(blks);
1622  }
1623 }
1624 
1625 
1626 //------------------------------------------------------------------------------
1627 
1629 {
1630  return m_prefetch_score;
1631 }
1632 
1634 {
1635  return Cache::GetInstance().GetLog();
1636 }
1637 
1639 {
1640  return Cache::GetInstance().GetTrace();
1641 }
1642 
1643 void File::insert_remote_location(const std::string &loc)
1644 {
1645  if ( ! loc.empty())
1646  {
1647  size_t p = loc.find_first_of('@');
1648  m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1649  }
1650 }
1651 
1652 std::string File::GetRemoteLocations() const
1653 {
1654  std::string s;
1655  if ( ! m_remote_locations.empty())
1656  {
1657  size_t sl = 0;
1658  int nl = 0;
1659  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1660  {
1661  sl += i->size();
1662  }
1663  s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1664  s = '[';
1665  int j = 1;
1666  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1667  {
1668  s += '"'; s += *i; s += '"';
1669  if (j < nl) s += ',';
1670  }
1671  s += ']';
1672  }
1673  else
1674  {
1675  s = "[]";
1676  }
1677  return s;
1678 }
1679 
1680 //==============================================================================
1681 //======================= RESPONSE HANDLERS ==============================
1682 //==============================================================================
1683 
1685 {
1686  m_block->m_file->ProcessBlockResponse(m_block, res);
1687  delete this;
1688 }
1689 
1690 //------------------------------------------------------------------------------
1691 
1693 {
1694  m_mutex.Lock();
1695 
1696  int n_left = --m_to_wait;
1697 
1698  if (res < 0) {
1699  if (m_errno == 0) m_errno = res; // store first reported error
1700  } else {
1701  m_bytes_read += res;
1702  }
1703 
1704  m_mutex.UnLock();
1705 
1706  if (n_left == 0)
1707  {
1708  m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1709  delete this;
1710  }
1711 }
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
#define TRACE_Error
Definition: XrdPfcTrace.hh:7
#define TRACE_Dump
Definition: XrdPfcTrace.hh:11
#define TRACEF(act, x)
Definition: XrdPfcTrace.hh:67
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
#define TRACEF_INT(act, x)
Definition: XrdPfcTrace.hh:71
int stat(const char *path, struct stat *buf)
#define XRD_TRACE
Definition: XrdScheduler.cc:48
bool Debug
XrdOucString File
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
@ Error
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual int Fsync()
Definition: XrdOss.hh:144
virtual int Ftruncate(unsigned long long flen)
Definition: XrdOss.hh:164
virtual int Fstat(struct stat *buf)
Definition: XrdOss.hh:136
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:198
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition: XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:345
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
Definition: XrdOucCache.cc:39
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
Definition: XrdOucCache.cc:86
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
void Done(int result) override
Definition: XrdPfcFile.cc:1684
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:163
int get_size() const
Definition: XrdPfcFile.hh:136
int get_error() const
Definition: XrdPfcFile.hh:150
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:162
void * get_req_id() const
Definition: XrdPfcFile.hh:142
long long get_offset() const
Definition: XrdPfcFile.hh:138
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:125
void set_error(int err)
Definition: XrdPfcFile.hh:149
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:161
char * get_buff() const
Definition: XrdPfcFile.hh:135
IO * get_io() const
Definition: XrdPfcFile.hh:141
void set_downloaded()
Definition: XrdPfcFile.hh:148
bool req_cksum_net() const
Definition: XrdPfcFile.hh:159
bool has_cksums() const
Definition: XrdPfcFile.hh:160
long long m_offset
Definition: XrdPfcFile.hh:114
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:152
int get_req_size() const
Definition: XrdPfcFile.hh:137
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:154
XrdOss * GetOss() const
Definition: XrdPfc.hh:270
XrdSysTrace * GetTrace()
Definition: XrdPfc.hh:285
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:206
static ResourceMonitor & ResMon()
Definition: XrdPfc.cc:135
XrdSysError * GetLog()
Definition: XrdPfc.hh:284
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:132
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1195
void Done(int result) override
Definition: XrdPfcFile.cc:1692
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:322
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1540
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
Definition: XrdPfcFile.cc:785
XrdSysTrace * GetTrace()
Definition: XrdPfcFile.cc:1638
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:1068
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:138
float GetPrefetchScore() const
Definition: XrdPfcFile.cc:1628
friend class BlockResponseHandler
Definition: XrdPfcFile.hh:205
XrdSysError * GetLog()
Definition: XrdPfcFile.cc:1633
void Prefetch()
Definition: XrdPfcFile.cc:1555
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1652
int Fstat(struct stat &sbuff)
Definition: XrdPfcFile.cc:563
void AddIO(IO *io)
Definition: XrdPfcFile.cc:346
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
Definition: XrdPfcFile.cc:316
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:216
friend class DirectResponseHandler
Definition: XrdPfcFile.hh:206
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1154
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
Definition: XrdPfcFile.cc:748
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
Definition: XrdPfcFile.cc:230
long long initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:151
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:383
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Definition: XrdPfcFile.cc:208
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Definition: XrdPfcFile.cc:239
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
bool register_incomplete_read()
Definition: XrdPfcIO.hh:90
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:31
bool register_block_error(int res)
Definition: XrdPfcIO.hh:93
RAtomic_int m_active_read_reqs
number of active read requests
Definition: XrdPfcIO.hh:70
const char * GetLocation()
Definition: XrdPfcIO.hh:44
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
Definition: XrdPfcInfo.hh:365
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309
void SetBitSynced(int i)
Mark block as synced to disk.
Definition: XrdPfcInfo.hh:387
time_t GetNoCkSumTimeForUVKeep() const
Definition: XrdPfcInfo.hh:301
CkSumCheck_e GetCkSumState() const
Definition: XrdPfcInfo.hh:286
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
Definition: XrdPfcInfo.cc:420
void ResetCkSumNet()
Definition: XrdPfcInfo.cc:213
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:266
void DowngradeCkSumState(CkSumCheck_e css_ref)
Definition: XrdPfcInfo.hh:295
bool IsCkSumNet() const
Definition: XrdPfcInfo.hh:290
void ResetAllAccessStats()
Reset IO Stats.
Definition: XrdPfcInfo.cc:359
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
Definition: XrdPfcInfo.hh:376
bool IsComplete() const
Get complete status.
Definition: XrdPfcInfo.hh:447
bool IsCkSumCache() const
Definition: XrdPfcInfo.hh:289
void SetBitWritten(int i)
Mark block as written to disk.
Definition: XrdPfcInfo.hh:352
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:469
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:429
long long GetExpectedDataFileSize() const
Get expected data file size.
Definition: XrdPfcInfo.hh:420
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
Definition: XrdPfcInfo.hh:343
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:294
void SetCkSumState(CkSumCheck_e css)
Definition: XrdPfcInfo.hh:294
void ResetNoCkSumTime()
Definition: XrdPfcInfo.hh:302
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
Definition: XrdPfcInfo.cc:161
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:438
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:437
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
void IoAttach()
Definition: XrdPfcStats.hh:85
void AddReadStats(const Stats &s)
Definition: XrdPfcStats.hh:67
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
Definition: XrdPfcStats.hh:43
long long m_BytesBypassed
number of bytes served directly through XrdCl
Definition: XrdPfcStats.hh:41
void AddUp(const Stats &s)
Definition: XrdPfcStats.hh:119
void AddWriteStats(long long bytes_written, int n_cks_errs)
Definition: XrdPfcStats.hh:79
long long BytesReadAndWritten() const
Definition: XrdPfcStats.hh:102
void AddBytesHit(long long bh)
Definition: XrdPfcStats.hh:74
long long m_BytesHit
number of bytes served from disk
Definition: XrdPfcStats.hh:39
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:42
void IoDetach(int duration)
Definition: XrdPfcStats.hh:90
@ Warning
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, uint16_t timeout=0)
Factory for creating ReadImpl objects.
Definition: XrdPfc.hh:41
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:101
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:166
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:167
static const int maxRVdsz
Definition: XProtocol.hh:688
static const int maxRvecsz
Definition: XProtocol.hh:686
@ hex1
Definition: XrdSysTrace.hh:42
long long offset
Definition: XrdOucIOVec.hh:42
char * data
Definition: XrdOucIOVec.hh:45
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:91
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:117
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition: XrdPfc.hh:80
CkSumCheck_e get_cs_Chk() const
Definition: XrdPfc.hh:73
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:114
bool should_uvkeep_purge(time_t delta) const
Definition: XrdPfc.hh:82
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:89
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:109
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:90
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:88
unsigned short m_seq_id
Definition: XrdPfcFile.hh:53
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:81
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:66
bool is_complete() const
Definition: XrdPfcFile.hh:83
int return_value() const
Definition: XrdPfcFile.hh:84
long long m_bytes_read
Definition: XrdPfcFile.hh:68