XRootD
XrdPfcFile.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 
20 #include "XrdPfcFile.hh"
21 #include "XrdPfc.hh"
22 #include "XrdPfcResourceMonitor.hh"
23 #include "XrdPfcIO.hh"
24 #include "XrdPfcTrace.hh"
25 
26 #include "XProtocol/XProtocol.hh"
27 #include "XrdSys/XrdSysTimer.hh"
28 #include "XrdOss/XrdOss.hh"
29 #include "XrdOuc/XrdOucEnv.hh"
30 #include "XrdOuc/XrdOucJson.hh"
32 
33 #include "XrdCl/XrdClURL.hh"
35 
36 #include <cassert>
37 #include <cstdio>
38 #include <sstream>
39 #include <unordered_map>
40 
41 #include <fcntl.h>
42 
43 using namespace XrdPfc;
44 
45 namespace
46 {
47 
48 const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
49 
50 Cache* cache() { return &Cache::GetInstance(); }
51 
52 }
53 
54 const char *File::m_traceID = "File";
55 
56 //------------------------------------------------------------------------------
57 
58 File::File(const std::string& path, long long iOffset, long long iFileSize) :
59  m_ref_cnt(0),
60  m_data_file(0),
61  m_info_file(0),
62  m_cfi(Cache::TheOne().GetTrace(), Cache::TheOne().is_prefetch_enabled()),
63  m_filename(path),
64  m_offset(iOffset),
65  m_file_size(iFileSize),
66  m_current_io(m_io_set.end()),
67  m_ios_in_detach(0),
68  m_non_flushed_cnt(0),
69  m_in_sync(false),
70  m_detach_time_logged(false),
71  m_in_shutdown(false),
72  m_state_cond(0),
73  m_block_size(0),
74  m_num_blocks(0),
75  m_resmon_token(-1),
76  m_prefetch_state(kOff),
77  m_prefetch_bytes(0),
78  m_prefetch_read_cnt(0),
79  m_prefetch_hit_cnt(0),
80  m_prefetch_score(0)
81 {}
82 
83 File::~File()
84 {
85  TRACEF(Debug, "~File() for ");
86 }
87 
88 void File::Close()
89 {
90  // Close is called while nullptr is put into Cache::m_active map, see Cache::dec_ref_count(File*).
91  // A stat is called after close to re-check that m_stat_blocks have been reported correctly
92  // to the resource-monitor. Note that the reporting is already clamped down to m_file_size
93  // in report_and_merge_delta_stats() below.
94  //
95  // XFS can pre-allocate significant amount of blocks (1 GB at 1GB mark, 4 GB above 4GB) and those
96  // get reported in as stat.st_blocks.
97  // The reported number is correct in a stat immediately following a close.
98  // If one starts off by writing the last byte of the file, this pre-allocation does not get
99  // triggered up to that point. But comes back with a vengeance right after.
100  //
101  // To be determined if other FSes do something similar (Ceph, ZFS, ...). Ext4 doesn't.
102 
103  if (m_info_file)
104  {
105  TRACEF(Debug, "Close() closing info-file ");
106  m_info_file->Close();
107  delete m_info_file;
108  m_info_file = nullptr;
109  }
110 
111  if (m_data_file)
112  {
113  TRACEF(Debug, "Close() closing data-file ");
114  m_data_file->Close();
115  delete m_data_file;
116  m_data_file = nullptr;
117  }
118 
119  if (m_resmon_token >= 0)
120  {
121  // Last update of file stats has been sent from the final Sync unless we are in_shutdown --
122  // but in this case the file will get unlinked by the cache and reported as purge event.
123  // We check if the reported st_blocks so far is correct.
124  if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
125  struct stat s;
126  int sr = Cache::GetInstance().GetOss()->Stat(m_filename.c_str(), &s);
127  if (sr == 0 && s.st_blocks != m_st_blocks) {
128  Stats stats;
129  stats.m_StBlocksAdded = s.st_blocks - m_st_blocks;
130  m_st_blocks = s.st_blocks;
131  Cache::ResMon().register_file_update_stats(m_resmon_token, stats);
132  }
133  }
134 
135  Cache::ResMon().register_file_close(m_resmon_token, time(0), m_stats);
136  }
137 
138  TRACEF(Debug, "Close() finished, prefetch score = " << m_prefetch_score);
139 }
140 
141 //------------------------------------------------------------------------------
142 
143 File* File::FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
144 {
145  File *file = new File(path, offset, fileSize);
146  if ( ! file->Open(inputIO))
147  {
148  delete file;
149  file = 0;
150  }
151  return file;
152 }
153 
154 //------------------------------------------------------------------------------
155 
157 {
158  // Called from Cache::Unlink() when the file is currently open.
159  // Cache::Unlink is also called on FSync error and when wrong number of bytes
160  // is received from a remote read.
161  //
162  // From this point onward the file will not be written to, cinfo file will
163  // not be updated, and all new read requests will return -ENOENT.
164  //
165  // File's entry in the Cache's active map is set to nullptr and will be
166  // removed from there shortly, in any case, well before this File object
167  // shuts down. Cache::Unlink() also reports the appropriate purge event.
168 
169  XrdSysCondVarHelper _lck(m_state_cond);
170 
171  m_in_shutdown = true;
172 
173  if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
174  {
175  m_prefetch_state = kStopped;
176  cache()->DeRegisterPrefetchFile(this);
177  }
178 
179  report_and_merge_delta_stats();
180 
181  return m_st_blocks;
182 }
183 
184 //------------------------------------------------------------------------------
185 
186 void File::check_delta_stats()
187 {
188  // Called under m_state_cond lock.
189  // BytesWritten indirectly trigger an unconditional merge through periodic Sync().
190  if (m_delta_stats.BytesReadAndWritten() >= m_resmon_report_threshold && ! m_in_shutdown)
191  report_and_merge_delta_stats();
192 }
193 
194 void File::report_and_merge_delta_stats()
195 {
196  // Called under m_state_cond lock.
197  struct stat s;
198  m_data_file->Fstat(&s);
199  // Do not report st_blocks beyond 4kB round-up over m_file_size. Some FSs report
200  // aggressive pre-allocation in this field (XFS, 4GB).
201  long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
202  : m_file_size >> 9;
203  long long st_blocks_to_report = std::min((long long) s.st_blocks, max_st_blocks_to_report);
204  m_delta_stats.m_StBlocksAdded = st_blocks_to_report - m_st_blocks;
205  m_st_blocks = st_blocks_to_report;
206  Cache::ResMon().register_file_update_stats(m_resmon_token, m_delta_stats);
207  m_stats.AddUp(m_delta_stats);
208  m_delta_stats.Reset();
209 }
210 
211 //------------------------------------------------------------------------------
212 
214 {
215  TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
216 
217  XrdSysCondVarHelper _lck(m_state_cond);
218  dec_ref_count(b);
219 }
220 
221 void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
222 {
223  TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
224 
225  XrdSysCondVarHelper _lck(m_state_cond);
226 
227  for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
228  {
229  dec_ref_count(*i);
230  }
231 }
232 
233 //------------------------------------------------------------------------------
234 
235 void File::ioUpdated(IO *io)
236 {
237  std::string loc(io->GetLocation());
238  XrdSysCondVarHelper _lck(m_state_cond);
239  insert_remote_location(loc);
240 }
241 
242 //------------------------------------------------------------------------------
243 
244 bool File::ioActive(IO *io)
245 {
246  // Returns true if delay is needed.
247 
248  TRACEF(Debug, "ioActive start for io " << io);
249 
250  std::string loc(io->GetLocation());
251 
252  {
253  XrdSysCondVarHelper _lck(m_state_cond);
254 
255  IoSet_i mi = m_io_set.find(io);
256 
257  if (mi != m_io_set.end())
258  {
259  unsigned int n_active_reads = io->m_active_read_reqs;
260 
261  TRACE(Info, "ioActive for io " << io <<
262  ", active_reads " << n_active_reads <<
263  ", active_prefetches " << io->m_active_prefetches <<
264  ", allow_prefetching " << io->m_allow_prefetching <<
265  ", ios_in_detach " << m_ios_in_detach);
266  TRACEF(Info,
267  "\tio_map.size() " << m_io_set.size() <<
268  ", block_map.size() " << m_block_map.size() << ", file");
269 
270  insert_remote_location(loc);
271 
272  io->m_allow_prefetching = false;
273  io->m_in_detach = true;
274 
275  // Check if any IO is still available for prfetching. If not, stop it.
276  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
277  {
278  if ( ! select_current_io_or_disable_prefetching(false) )
279  {
280  TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
281  }
282  }
283 
284  // On last IO, consider write queue blocks. Note, this also contains
285  // blocks being prefetched.
286 
287  bool io_active_result;
288 
289  if (n_active_reads > 0)
290  {
291  io_active_result = true;
292  }
293  else if (m_io_set.size() - m_ios_in_detach == 1)
294  {
295  io_active_result = ! m_block_map.empty();
296  }
297  else
298  {
299  io_active_result = io->m_active_prefetches > 0;
300  }
301 
302  if ( ! io_active_result)
303  {
304  ++m_ios_in_detach;
305  }
306 
307  TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
308 
309  return io_active_result;
310  }
311  else
312  {
313  TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
314  return false;
315  }
316  }
317 }
318 
319 //------------------------------------------------------------------------------
320 
322 {
323  XrdSysCondVarHelper _lck(m_state_cond);
324  m_detach_time_logged = false;
325 }
326 
328 {
329  // Returns true if sync is required.
330  // This method is called after corresponding IO is detached from PosixCache.
331 
332  XrdSysCondVarHelper _lck(m_state_cond);
333  if ( ! m_in_shutdown)
334  {
335  if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
336  {
337  report_and_merge_delta_stats();
338  m_cfi.WriteIOStatDetach(m_stats);
339  m_detach_time_logged = true;
340  m_in_sync = true;
341  TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
342  return true;
343  }
344  }
345  TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
346  return false;
347 }
348 
349 //------------------------------------------------------------------------------
350 
351 void File::AddIO(IO *io)
352 {
353  // Called from Cache::GetFile() when a new IO asks for the file.
354 
355  TRACEF(Debug, "AddIO() io = " << (void*)io);
356 
357  time_t now = time(0);
358  std::string loc(io->GetLocation());
359 
360  m_state_cond.Lock();
361 
362  IoSet_i mi = m_io_set.find(io);
363 
364  if (mi == m_io_set.end())
365  {
366  m_io_set.insert(io);
367  io->m_attach_time = now;
368  m_delta_stats.IoAttach();
369 
370  insert_remote_location(loc);
371 
372  if (m_prefetch_state == kStopped)
373  {
374  m_prefetch_state = kOn;
375  cache()->RegisterPrefetchFile(this);
376  }
377  }
378  else
379  {
380  TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
381  }
382 
383  m_state_cond.UnLock();
384 }
385 
386 //------------------------------------------------------------------------------
387 
388 void File::RemoveIO(IO *io)
389 {
390  // Called from Cache::ReleaseFile.
391 
392  TRACEF(Debug, "RemoveIO() io = " << (void*)io);
393 
394  time_t now = time(0);
395 
396  m_state_cond.Lock();
397 
398  IoSet_i mi = m_io_set.find(io);
399 
400  if (mi != m_io_set.end())
401  {
402  if (mi == m_current_io)
403  {
404  ++m_current_io;
405  }
406 
407  m_delta_stats.IoDetach(now - io->m_attach_time);
408  m_io_set.erase(mi);
409  --m_ios_in_detach;
410 
411  if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
412  {
413  TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
414  m_prefetch_state = kStopped;
415  cache()->DeRegisterPrefetchFile(this);
416  }
417  }
418  else
419  {
420  TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
421  }
422 
423  m_state_cond.UnLock();
424 }
425 
426 //------------------------------------------------------------------------------
427 
428 bool File::Open(XrdOucCacheIO *inputIO)
429 {
430  // Sets errno accordingly.
431 
432  static const char *tpfx = "Open() ";
433 
434  TRACEF(Dump, tpfx << "entered");
435 
436  // Before touching anything, check with ResourceMonitor if a scan is in progress.
437  // This function will wait internally if needed until it is safe to proceed.
438  Cache::ResMon().CrossCheckIfScanIsInProgress(m_filename, m_state_cond);
439 
441 
442  XrdOss &myOss = * Cache::GetInstance().GetOss();
443  const char *myUser = conf.m_username.c_str();
444  XrdOucEnv myEnv;
445  struct stat data_stat, info_stat;
446 
447  std::string ifn = m_filename + Info::s_infoExtension;
448 
449  bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
450  bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
451 
452  // Create the data file itself.
453  char size_str[32]; sprintf(size_str, "%lld", m_file_size);
454  myEnv.Put("oss.asize", size_str);
455  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
456 
457  int res;
458 
459  if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
460  {
461  TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
462  errno = -res;
463  return false;
464  }
465 
466  m_data_file = myOss.newFile(myUser);
467  if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
468  {
469  TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
470  errno = -res;
471  delete m_data_file; m_data_file = 0;
472  return false;
473  }
474 
475  myEnv.Put("oss.asize", "64k"); // Advisory, block-map and access list lengths vary.
476  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
477  if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
478  {
479  TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
480  errno = -res;
481  m_data_file->Close(); delete m_data_file; m_data_file = 0;
482  return false;
483  }
484 
485  m_info_file = myOss.newFile(myUser);
486  if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
487  {
488  TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
489  errno = -res;
490  delete m_info_file; m_info_file = 0;
491  m_data_file->Close(); delete m_data_file; m_data_file = 0;
492  return false;
493  }
494 
495  bool initialize_info_file = true;
496 
497  if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
498  {
499  TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
500  ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
501  ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
502  ", block_size=" << (m_cfi.GetBufferSize() >> 10) << "k)");
503 
504  // Check if data file exists and is of reasonable size.
505  if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
506  {
507  initialize_info_file = false;
508  } else {
509  TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
510  m_cfi.ResetAllAccessStats();
511  m_data_file->Ftruncate(0);
512  // data-file might not have existed at entry -- data_stat is then undefined
513  if (data_existed)
514  Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
515  }
516  }
517 
518  if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
519  {
520  if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
521  conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
522  {
523  TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
524  initialize_info_file = true;
525  m_cfi.ResetAllAccessStats();
526  m_data_file->Ftruncate(0);
527  // data-file is known to exist due to checks in the previous if block
528  Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
529  } else {
530  // TODO: If the file is complete, we don't need to reset net cksums.
531  m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
532  }
533  }
534 
535  // Check if we have pfc url arguments.
536  long long pfc_blocksize = conf.m_bufferSize;
537  int pfc_prefetch = conf.m_prefetch_max_blocks;
539  {
540  parse_pfc_url_args(inputIO, pfc_blocksize, pfc_prefetch);
541  }
542 
543  if (initialize_info_file)
544  {
545  m_cfi.SetBufferSizeFileSizeAndCreationTime(pfc_blocksize, m_file_size);
546  m_cfi.SetCkSumState(conf.get_cs_Chk());
547  m_cfi.ResetNoCkSumTime();
548  m_cfi.Write(m_info_file, ifn.c_str());
549  m_info_file->Fsync();
550  cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
551 
552  if (cache()->RefConfiguration().m_httpcc)
553  {
554  std::string responseFctl;
555  int resFctl = inputIO->Fcntl(XrdOucCacheOp::Code::QFinfo, "head", responseFctl);
556  if (resFctl == 0)
557  {
558  std::string cc_str = responseFctl;
559  nlohmann::json cc_json = nlohmann::json::parse(cc_str);
560  if (cc_json.contains("max-age"))
561  {
562  time_t ma = cc_json["max-age"];
563  ma += time(NULL);
564  cc_json["expire"] = ma;
565  cc_str = cc_json.dump();
566  }
567  TRACE(Error, "GetFile() XrdCl::File::Fcntl value " << cc_str);
568  cache()->WriteCacheControlXAttr(m_info_file->getFD(), nullptr, cc_str);
569  }
570  else if (resFctl != kXR_Unsupported)
571  {
572  TRACE(Error, "GetFile() XrdCl::File::Fcntl query XrdCl::QueryCode::FInfo failed " << inputIO->Path());
573  }
574  }
575 
576  TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size <<
577  " num blocks = " << m_cfi.GetNBlocks() <<
578  " block size = " << pfc_blocksize);
579  }
580  else
581  {
582  if (futimens(m_info_file->getFD(), NULL)) {
583  TRACEF(Error, tpfx << "failed setting modification time " << ERRNO_AND_ERRSTR(errno));
584  }
585  if (pfc_blocksize != conf.m_bufferSize) {
586  TRACEF(Info, tpfx << "URL CGI pfc.blocksize ignored for an already existing file");
587  }
588  }
589 
590  m_cfi.WriteIOStatAttach();
591  m_state_cond.Lock();
592  m_block_size = m_cfi.GetBufferSize();
593  m_num_blocks = m_cfi.GetNBlocks();
594  m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
595  m_prefetch_max_blocks_in_flight = pfc_prefetch;
596  if (pfc_prefetch != conf.m_prefetch_max_blocks)
597  TRACEF(Debug, tpfx << "pfc.prefetch set to " << pfc_prefetch << " via CGI parameter");
598 
599  m_data_file->Fstat(&data_stat);
600  m_st_blocks = data_stat.st_blocks;
601 
602  m_resmon_token = Cache::ResMon().register_file_open(m_filename, time(0), data_existed);
603  constexpr long long MB = 1024 * 1024;
604  m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
605  // m_resmon_report_threshold_scaler; // something like 10% of original threshold, to adjust
606  // actual threshold based on return values from register_file_update_stats().
607 
608  m_state_cond.UnLock();
609 
610  return true;
611 }
612 
613 void File::parse_pfc_url_args(XrdOucCacheIO* inputIO, long long &pfc_blocksize, int &pfc_prefetch) const
614 {
615  const Configuration &conf = Cache::TheOne().RefConfiguration();
616 
617  XrdCl::URL url(inputIO->Path());
618  auto const & urlp = url.GetParams();
619 
620  auto extract = [&](const std::string &key, std::string &value) -> bool {
621  auto it = urlp.find(key);
622  if (it != urlp.end()) {
623  value = it->second;
624  return true;
625  } else {
626  value.clear();
627  return false;
628  }
629  };
630 
631  std::string val;
632  if (conf.m_cgi_blocksize_allowed && extract("pfc.blocksize", val))
633  {
634  const char *tpfx = "File::Open::urlcgi pfc.blocksize ";
635  long long bsize;
636  if (Cache::TheOne().blocksize_str2value(tpfx, val.c_str(), bsize,
638  {
639  pfc_blocksize = bsize;
640  } else {
641  TRACEF(Error, tpfx << "Error processing the parameter.");
642  }
643  }
644  if (conf.m_cgi_prefetch_allowed && extract("pfc.prefetch", val))
645  {
646  const char *tpfx = "File::Open::urlcgi pfc.prefetch ";
647  int pref;
648  if (Cache::TheOne().prefetch_str2value(tpfx, val.c_str(), pref,
650  {
651  pfc_prefetch = pref;
652  } else {
653  TRACEF(Error, tpfx << "Error processing the parameter.");
654  }
655  }
656 }
657 
658 //------------------------------------------------------------------------------
659 
660 int File::Fstat(struct stat &sbuff)
661 {
662  // Stat on an open file.
663  // Corrects size to actual full size of the file.
664  // Sets atime to 0 if the file is only partially downloaded, in accordance
665  // with pfc.onlyifcached settings.
666  // Called from IO::Fstat() and Cache::Stat() when the file is active.
667  // Returns 0 on success, -errno on error.
668 
669  int res;
670 
671  if ((res = m_data_file->Fstat(&sbuff))) return res;
672 
673  sbuff.st_size = m_file_size;
674 
675  bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
676  if ( ! is_cached)
677  sbuff.st_atime = 0;
678 
679  return 0;
680 }
681 
682 //==============================================================================
683 // Read and helpers
684 //==============================================================================
685 
686 bool File::overlap(int blk, // block to query
687  long long blk_size, //
688  long long req_off, // offset of user request
689  int req_size, // size of user request
690  // output:
691  long long &off, // offset in user buffer
692  long long &blk_off, // offset in block
693  int &size) // size to copy
694 {
695  const long long beg = blk * blk_size;
696  const long long end = beg + blk_size;
697  const long long req_end = req_off + req_size;
698 
699  if (req_off < end && req_end > beg)
700  {
701  const long long ovlp_beg = std::max(beg, req_off);
702  const long long ovlp_end = std::min(end, req_end);
703 
704  off = ovlp_beg - req_off;
705  blk_off = ovlp_beg - beg;
706  size = (int) (ovlp_end - ovlp_beg);
707 
708  assert(size <= blk_size);
709  return true;
710  }
711  else
712  {
713  return false;
714  }
715 }
716 
717 //------------------------------------------------------------------------------
718 
719 Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
720 {
721  // Must be called w/ state_cond locked.
722  // Checks on size etc should be done before.
723  //
724  // Reference count is 0 so increase it in calling function if you want to
725  // catch the block while still in memory.
726 
727  const long long off = i * m_block_size;
728  const int last_block = m_num_blocks - 1;
729  const bool cs_net = cache()->RefConfiguration().is_cschk_net();
730 
731  int blk_size, req_size;
732  if (i == last_block) {
733  blk_size = req_size = m_file_size - off;
734  if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
735  } else {
736  blk_size = req_size = m_block_size;
737  }
738 
739  Block *b = 0;
740  char *buf = cache()->RequestRAM(req_size);
741 
742  if (buf)
743  {
744  b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
745 
746  if (b)
747  {
748  m_block_map[i] = b;
749 
750  // Actual Read request is issued in ProcessBlockRequests().
751 
752  if (m_prefetch_state == kOn && (int) m_block_map.size() >= m_prefetch_max_blocks_in_flight)
753  {
754  m_prefetch_state = kHold;
755  cache()->DeRegisterPrefetchFile(this);
756  }
757  }
758  else
759  {
760  TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
761  }
762  }
763 
764  return b;
765 }
766 
767 void File::ProcessBlockRequest(Block *b)
768 {
769  // This *must not* be called with block_map locked.
770 
772 
773  if (XRD_TRACE What >= TRACE_Dump) {
774  char buf[256];
775  snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
776  b->get_offset()/m_block_size, (void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (void*)b->get_buff(), (void*)brh);
777  TRACEF(Dump, "ProcessBlockRequest() " << buf);
778  }
779 
780  if (b->req_cksum_net())
781  {
782  b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
783  b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
784  } else {
785  b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
786  }
787 }
788 
789 void File::ProcessBlockRequests(BlockList_t& blks)
790 {
791  // This *must not* be called with block_map locked.
792 
793  for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
794  {
795  ProcessBlockRequest(*bi);
796  }
797 }
798 
799 //------------------------------------------------------------------------------
800 
801 void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
802 {
803  int n_chunks = ioVec.size();
804  int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
805 
806  TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
807  ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
808 
809  DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
810 
811  int pos = 0;
812  while (n_chunks > XrdProto::maxRvecsz) {
813  io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
814  pos += XrdProto::maxRvecsz;
815  n_chunks -= XrdProto::maxRvecsz;
816  }
817  io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
818 }
819 
820 //------------------------------------------------------------------------------
821 
822 int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
823 {
824  TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
825 
826  long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
827 
828  if (rs < 0)
829  {
830  TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
831  return rs;
832  }
833 
834  if (rs != expected_size)
835  {
836  TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
837  return -EIO;
838  }
839 
840  return (int) rs;
841 }
842 
843 //------------------------------------------------------------------------------
844 
845 int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
846 {
847  // rrc_func is ONLY called from async processing.
848  // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
849  // This streamlines implementation of synchronous IO::Read().
850 
851  TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
852 
853  m_state_cond.Lock();
854 
855  if (m_in_shutdown || io->m_in_detach)
856  {
857  m_state_cond.UnLock();
858  return m_in_shutdown ? -ENOENT : -EBADF;
859  }
860 
861  // Shortcut -- file is fully downloaded.
862 
863  if (m_cfi.IsComplete())
864  {
865  m_state_cond.UnLock();
866  int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
867  if (ret > 0) {
868  XrdSysCondVarHelper _lck(m_state_cond);
869  m_delta_stats.AddBytesHit(ret);
870  check_delta_stats();
871  }
872  return ret;
873  }
874 
875  XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
876 
877  return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
878 }
879 
880 //------------------------------------------------------------------------------
881 
882 int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
883 {
884  TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
885 
886  m_state_cond.Lock();
887 
888  if (m_in_shutdown || io->m_in_detach)
889  {
890  m_state_cond.UnLock();
891  return m_in_shutdown ? -ENOENT : -EBADF;
892  }
893 
894  // Shortcut -- file is fully downloaded.
895 
896  if (m_cfi.IsComplete())
897  {
898  m_state_cond.UnLock();
899  int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
900  if (ret > 0) {
901  XrdSysCondVarHelper _lck(m_state_cond);
902  m_delta_stats.AddBytesHit(ret);
903  check_delta_stats();
904  }
905  return ret;
906  }
907 
908  return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
909 }
910 
911 //------------------------------------------------------------------------------
912 
913 int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
914  ReadReqRH *rh, const char *tpfx)
915 {
916  // Non-trivial processing for Read and ReadV.
917  // Entered under lock.
918  //
919  // loop over reqired blocks:
920  // - if on disk, ok;
921  // - if in ram or incoming, inc ref-count
922  // - otherwise request and inc ref count (unless RAM full => request direct)
923  // unlock
924 
925  int prefetch_cnt = 0;
926 
927  ReadRequest *read_req = nullptr;
928  BlockList_t blks_to_request; // blocks we are issuing a new remote request for
929 
930  std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
931 
932  std::vector<XrdOucIOVec> iovec_disk;
933  std::vector<XrdOucIOVec> iovec_direct;
934  int iovec_disk_total = 0;
935  int iovec_direct_total = 0;
936 
937  for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
938  {
939  const XrdOucIOVec &iov = readV[iov_idx];
940  long long iUserOff = iov.offset;
941  int iUserSize = iov.size;
942  char *iUserBuff = iov.data;
943 
944  const int idx_first = iUserOff / m_block_size;
945  const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
946 
947  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
948 
949  enum LastBlock_e { LB_other, LB_disk, LB_direct };
950 
951  LastBlock_e lbe = LB_other;
952 
953  for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
954  {
955  TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
956  BlockMap_i bi = m_block_map.find(block_idx);
957 
958  // overlap and read
959  long long off; // offset in user buffer
960  long long blk_off; // offset in block
961  int size; // size to copy
962 
963  overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
964 
965  // In RAM or incoming?
966  if (bi != m_block_map.end())
967  {
968  inc_ref_count(bi->second);
969  TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
970 
971  if (bi->second->is_finished())
972  {
973  // note, blocks with error should not be here !!!
974  // they should be either removed or reissued in ProcessBlockResponse()
975  assert(bi->second->is_ok());
976 
977  blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
978 
979  if (bi->second->m_prefetch)
980  ++prefetch_cnt;
981  }
982  else
983  {
984  if ( ! read_req)
985  read_req = new ReadRequest(io, rh);
986 
987  // We have a lock on state_cond --> as we register the request before releasing the lock,
988  // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
989 
990  bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
991  ++read_req->m_n_chunk_reqs;
992  }
993 
994  lbe = LB_other;
995  }
996  // On disk?
997  else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
998  {
999  TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
1000 
1001  if (lbe == LB_disk)
1002  iovec_disk.back().size += size;
1003  else
1004  iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
1005  iovec_disk_total += size;
1006 
1007  if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
1008  ++prefetch_cnt;
1009 
1010  lbe = LB_disk;
1011  }
1012  // Neither ... then we have to go get it ...
1013  else
1014  {
1015  if ( ! read_req)
1016  read_req = new ReadRequest(io, rh);
1017 
1018  // Is there room for one more RAM Block?
1019  Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
1020  if (b)
1021  {
1022  TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
1023  inc_ref_count(b);
1024  blks_to_request.push_back(b);
1025 
1026  b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
1027  ++read_req->m_n_chunk_reqs;
1028 
1029  lbe = LB_other;
1030  }
1031  else // Nope ... read this directly without caching.
1032  {
1033  TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
1034 
1035  iovec_direct_total += size;
1036  read_req->m_direct_done = false;
1037 
1038  // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
1039  // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
1040  // is determined in the RequestBlocksDirect().
1041  if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
1042  iovec_direct.back().size += size;
1043  } else {
1044  long long in_offset = block_idx * m_block_size + blk_off;
1045  char *out_pos = iUserBuff + off;
1046  while (size > XrdProto::maxRVdsz) {
1047  iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
1048  in_offset += XrdProto::maxRVdsz;
1049  out_pos += XrdProto::maxRVdsz;
1050  size -= XrdProto::maxRVdsz;
1051  }
1052  iovec_direct.push_back( { in_offset, size, 0, out_pos } );
1053  }
1054 
1055  lbe = LB_direct;
1056  }
1057  }
1058  } // end for over blocks in an IOVec
1059  } // end for over readV IOVec
1060 
1061  inc_prefetch_hit_cnt(prefetch_cnt);
1062 
1063  m_state_cond.UnLock();
1064 
1065  // First, send out remote requests for new blocks.
1066  if ( ! blks_to_request.empty())
1067  {
1068  ProcessBlockRequests(blks_to_request);
1069  blks_to_request.clear();
1070  }
1071 
1072  // Second, send out remote direct read requests.
1073  if ( ! iovec_direct.empty())
1074  {
1075  RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
1076 
1077  TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
1078  }
1079 
1080  // Begin synchronous part where we process data that is already in RAM or on disk.
1081 
1082  long long bytes_read = 0;
1083  int error_cond = 0; // to be set to -errno
1084 
1085  // Third, process blocks that are available in RAM.
1086  if ( ! blks_ready.empty())
1087  {
1088  for (auto &bvi : blks_ready)
1089  {
1090  for (auto &cr : bvi.second)
1091  {
1092  TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
1093  memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
1094  bytes_read += cr.m_size;
1095  }
1096  }
1097  }
1098 
1099  // Fourth, read blocks from disk.
1100  if ( ! iovec_disk.empty())
1101  {
1102  int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1103  TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
1104  if (rc >= 0)
1105  {
1106  bytes_read += rc;
1107  }
1108  else
1109  {
1110  error_cond = rc;
1111  TRACEF(Error, tpfx << "failed read from disk");
1112  }
1113  }
1114 
1115  // End synchronous part -- update with sync stats and determine actual state of this read.
1116  // Note: remote reads might have already finished during disk-read!
1117 
1118  m_state_cond.Lock();
1119 
1120  for (auto &bvi : blks_ready)
1121  dec_ref_count(bvi.first, (int) bvi.second.size());
1122 
1123  if (read_req)
1124  {
1125  read_req->m_bytes_read += bytes_read;
1126  if (error_cond)
1127  read_req->update_error_cond(error_cond);
1128  read_req->m_stats.m_BytesHit += bytes_read;
1129  read_req->m_sync_done = true;
1130 
1131  if (read_req->is_complete())
1132  {
1133  // Almost like FinalizeReadRequest(read_req) -- but no callout!
1134  m_delta_stats.AddReadStats(read_req->m_stats);
1135  check_delta_stats();
1136  m_state_cond.UnLock();
1137 
1138  int ret = read_req->return_value();
1139  delete read_req;
1140  return ret;
1141  }
1142  else
1143  {
1144  m_state_cond.UnLock();
1145  return -EWOULDBLOCK;
1146  }
1147  }
1148  else
1149  {
1150  m_delta_stats.m_BytesHit += bytes_read;
1151  check_delta_stats();
1152  m_state_cond.UnLock();
1153 
1154  // !!! No callout.
1155 
1156  return error_cond ? error_cond : bytes_read;
1157  }
1158 }
1159 
1160 
1161 //==============================================================================
1162 // WriteBlock and Sync
1163 //==============================================================================
1164 
1166 {
1167  // write block buffer into disk file
1168  long long offset = b->m_offset - m_offset;
1169  long long size = b->get_size();
1170  ssize_t retval;
1171 
1172  if (m_cfi.IsCkSumCache())
1173  if (b->has_cksums())
1174  retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1175  else
1176  retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1177  else
1178  retval = m_data_file->Write(b->get_buff(), offset, size);
1179 
1180  if (retval < size)
1181  {
1182  if (retval < 0) {
1183  TRACEF(Error, "WriteToDisk() write error " << retval);
1184  } else {
1185  TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1186  }
1187 
1188  XrdSysCondVarHelper _lck(m_state_cond);
1189 
1190  dec_ref_count(b);
1191 
1192  return;
1193  }
1194 
1195  const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1196 
1197  // Set written bit.
1198  TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1199 
1200  bool schedule_sync = false;
1201  {
1202  XrdSysCondVarHelper _lck(m_state_cond);
1203 
1204  m_cfi.SetBitWritten(blk_idx);
1205 
1206  if (b->m_prefetch)
1207  {
1208  m_cfi.SetBitPrefetch(blk_idx);
1209  }
1210  if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1211  {
1212  m_cfi.ResetCkSumNet();
1213  }
1214 
1215  // Set synced bit or stash block index if in actual sync.
1216  // Synced state is only written out to cinfo file when data file is synced.
1217  if (m_in_sync)
1218  {
1219  m_writes_during_sync.push_back(blk_idx);
1220  }
1221  else
1222  {
1223  m_cfi.SetBitSynced(blk_idx);
1224  ++m_non_flushed_cnt;
1225  if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1226  ! m_in_shutdown)
1227  {
1228  schedule_sync = true;
1229  m_in_sync = true;
1230  m_non_flushed_cnt = 0;
1231  }
1232  }
1233  // As soon as the reference count is decreased on the block, the
1234  // file object may be deleted. Thus, to avoid holding both locks at a time,
1235  // we defer the ref count decrease until later if a sync is needed
1236  if (!schedule_sync) {
1237  dec_ref_count(b);
1238  }
1239  }
1240 
1241  if (schedule_sync)
1242  {
1243  cache()->ScheduleFileSync(this);
1244  XrdSysCondVarHelper _lck(m_state_cond);
1245  dec_ref_count(b);
1246  }
1247 }
1248 
1249 //------------------------------------------------------------------------------
1250 
1251 void File::Sync()
1252 {
1253  TRACEF(Dump, "Sync()");
1254 
1255  int ret = m_data_file->Fsync();
1256  bool errorp = false;
1257  if (ret == XrdOssOK)
1258  {
1259  Stats loc_stats;
1260  {
1261  XrdSysCondVarHelper _lck(&m_state_cond);
1262  report_and_merge_delta_stats();
1263  loc_stats = m_stats;
1264  }
1265  m_cfi.WriteIOStat(loc_stats);
1266  m_cfi.Write(m_info_file, m_filename.c_str());
1267  int cret = m_info_file->Fsync();
1268  if (cret != XrdOssOK)
1269  {
1270  TRACEF(Error, "Sync cinfo file sync error " << cret);
1271  errorp = true;
1272  }
1273  }
1274  else
1275  {
1276  TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1277  errorp = true;
1278  }
1279 
1280  if (errorp)
1281  {
1282  TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1283 
1284  // Unlink will also call this->initiate_emergency_shutdown()
1285  Cache::GetInstance().UnlinkFile(m_filename, false);
1286 
1287  XrdSysCondVarHelper _lck(&m_state_cond);
1288 
1289  m_writes_during_sync.clear();
1290  m_in_sync = false;
1291 
1292  return;
1293  }
1294 
1295  int written_while_in_sync;
1296  bool resync = false;
1297  {
1298  XrdSysCondVarHelper _lck(&m_state_cond);
1299  for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1300  {
1301  m_cfi.SetBitSynced(*i);
1302  }
1303  written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1304  m_writes_during_sync.clear();
1305 
1306  // If there were writes during sync and the file is now complete,
1307  // let us call Sync again without resetting the m_in_sync flag.
1308  if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1309  resync = true;
1310  else
1311  m_in_sync = false;
1312  }
1313  TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1314 
1315  if (resync)
1316  Sync();
1317 }
1318 
1319 
1320 //==============================================================================
1321 // Block processing
1322 //==============================================================================
1323 
1324 void File::free_block(Block* b)
1325 {
1326  // Method always called under lock.
1327  int i = b->m_offset / m_block_size;
1328  TRACEF(Dump, "free_block block " << b << " idx = " << i);
1329  size_t ret = m_block_map.erase(i);
1330  if (ret != 1)
1331  {
1332  // assert might be a better option than a warning
1333  TRACEF(Error, "free_block did not erase " << i << " from map");
1334  }
1335  else
1336  {
1337  cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1338  delete b;
1339  }
1340 
1341  if (m_prefetch_state == kHold && (int) m_block_map.size() < m_prefetch_max_blocks_in_flight)
1342  {
1343  m_prefetch_state = kOn;
1344  cache()->RegisterPrefetchFile(this);
1345  }
1346 }
1347 
1348 //------------------------------------------------------------------------------
1349 
1350 bool File::select_current_io_or_disable_prefetching(bool skip_current)
1351 {
1352  // Method always called under lock. It also expects prefetch to be active.
1353 
1354  int io_size = (int) m_io_set.size();
1355  bool io_ok = false;
1356 
1357  if (io_size == 1)
1358  {
1359  io_ok = (*m_io_set.begin())->m_allow_prefetching;
1360  if (io_ok)
1361  {
1362  m_current_io = m_io_set.begin();
1363  }
1364  }
1365  else if (io_size > 1)
1366  {
1367  IoSet_i mi = m_current_io;
1368  if (skip_current && mi != m_io_set.end()) ++mi;
1369 
1370  for (int i = 0; i < io_size; ++i)
1371  {
1372  if (mi == m_io_set.end()) mi = m_io_set.begin();
1373 
1374  if ((*mi)->m_allow_prefetching)
1375  {
1376  m_current_io = mi;
1377  io_ok = true;
1378  break;
1379  }
1380  ++mi;
1381  }
1382  }
1383 
1384  if ( ! io_ok)
1385  {
1386  m_current_io = m_io_set.end();
1387  m_prefetch_state = kStopped;
1388  cache()->DeRegisterPrefetchFile(this);
1389  }
1390 
1391  return io_ok;
1392 }
1393 
1394 //------------------------------------------------------------------------------
1395 
1396 void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1397 {
1398  // Called from DirectResponseHandler.
1399  // NOT under lock.
1400 
1401  if (error_cond)
1402  TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1403 
1404  m_state_cond.Lock();
1405 
1406  if (error_cond)
1407  rreq->update_error_cond(error_cond);
1408  else {
1409  rreq->m_stats.m_BytesBypassed += bytes_read;
1410  rreq->m_bytes_read += bytes_read;
1411  }
1412 
1413  rreq->m_direct_done = true;
1414 
1415  bool rreq_complete = rreq->is_complete();
1416 
1417  m_state_cond.UnLock();
1418 
1419  if (rreq_complete)
1420  FinalizeReadRequest(rreq);
1421 }
1422 
1423 void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1424 {
1425  // Called from ProcessBlockResponse().
1426  // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1427  // Does not manage m_read_req.
1428  // Will not complete the request.
1429 
1430  TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1431  " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1432 
1433  rreq->update_error_cond(b->get_error());
1434  --rreq->m_n_chunk_reqs;
1435 
1436  dec_ref_count(b);
1437 }
1438 
1439 void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1440 {
1441  // Called from ProcessBlockResponse().
1442  // NOT under lock as it does memcopy ofor exisf block data.
1443  // Acquires lock for block, m_read_req and rreq state update.
1444 
1445  ReadRequest *rreq = creq.m_read_req;
1446 
1447  TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1448  memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1449 
1450  m_state_cond.Lock();
1451 
1452  rreq->m_bytes_read += creq.m_size;
1453 
1454  if (b->get_req_id() == (void*) rreq)
1455  rreq->m_stats.m_BytesMissed += creq.m_size;
1456  else
1457  rreq->m_stats.m_BytesHit += creq.m_size;
1458 
1459  --rreq->m_n_chunk_reqs;
1460 
1461  if (b->m_prefetch)
1462  inc_prefetch_hit_cnt(1);
1463 
1464  dec_ref_count(b);
1465 
1466  bool rreq_complete = rreq->is_complete();
1467 
1468  m_state_cond.UnLock();
1469 
1470  if (rreq_complete)
1471  FinalizeReadRequest(rreq);
1472 }
1473 
1474 void File::FinalizeReadRequest(ReadRequest *rreq)
1475 {
1476  // called from ProcessBlockResponse()
1477  // NOT under lock -- does callout
1478  {
1479  XrdSysCondVarHelper _lck(m_state_cond);
1480  m_delta_stats.AddReadStats(rreq->m_stats);
1481  check_delta_stats();
1482  }
1483 
1484  rreq->m_rh->Done(rreq->return_value());
1485  delete rreq;
1486 }
1487 
1488 void File::ProcessBlockResponse(Block *b, int res)
1489 {
1490  static const char* tpfx = "ProcessBlockResponse ";
1491 
1492  TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1493 
1494  if (res >= 0 && res != b->get_size())
1495  {
1496  // Incorrect number of bytes received, apparently size of the file on the remote
1497  // is different than what the cache expects it to be.
1498  TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1499  Cache::GetInstance().UnlinkFile(m_filename, false);
1500  }
1501 
1502  m_state_cond.Lock();
1503 
1504  // Deregister block from IO's prefetch count, if needed.
1505  if (b->m_prefetch)
1506  {
1507  IO *io = b->get_io();
1508  IoSet_i mi = m_io_set.find(io);
1509  if (mi != m_io_set.end())
1510  {
1511  --io->m_active_prefetches;
1512 
1513  // If failed and IO is still prefetching -- disable prefetching on this IO.
1514  if (res < 0 && io->m_allow_prefetching)
1515  {
1516  TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1517  io->m_allow_prefetching = false;
1518 
1519  // Check if any IO is still available for prfetching. If not, stop it.
1520  if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1521  {
1522  if ( ! select_current_io_or_disable_prefetching(false) )
1523  {
1524  TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1525  }
1526  }
1527  }
1528 
1529  // If failed with no subscribers -- delete the block and exit.
1530  if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1531  {
1532  free_block(b);
1533  m_state_cond.UnLock();
1534  return;
1535  }
1536  m_prefetch_bytes += b->get_size();
1537  }
1538  else
1539  {
1540  TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1541  }
1542  }
1543 
1544  if (res == b->get_size())
1545  {
1546  b->set_downloaded();
1547  TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1548  if ( ! m_in_shutdown)
1549  {
1550  // Increase ref-count for the writer.
1551  inc_ref_count(b);
1552  m_delta_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1553  // No check for writes, report-and-merge forced during Sync().
1554  cache()->AddWriteTask(b, true);
1555  }
1556 
1557  // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1558  vChunkRequest_t creqs_to_notify;
1559  creqs_to_notify.swap( b->m_chunk_reqs );
1560 
1561  m_state_cond.UnLock();
1562 
1563  for (auto &creq : creqs_to_notify)
1564  {
1565  ProcessBlockSuccess(b, creq);
1566  }
1567  }
1568  else
1569  {
1570  if (res < 0) {
1571  bool new_error = b->get_io()->register_block_error(res);
1572  int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1573  TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1574  << ", io=" << b->get_io() << ", error=" << res);
1575  } else {
1576  bool first_p = b->get_io()->register_incomplete_read();
1577  int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1578  TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1579  << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1580 #if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1581  res = -EIO;
1582 #else
1583  res = -EREMOTEIO;
1584 #endif
1585  }
1586  b->set_error(res);
1587 
1588  // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1589  // Collect others with a different IO, the first of them will be used to reissue the request.
1590  // This is then done outside of lock.
1591  std::list<ReadRequest*> rreqs_to_complete;
1592  vChunkRequest_t creqs_to_keep;
1593 
1594  for(ChunkRequest &creq : b->m_chunk_reqs)
1595  {
1596  ReadRequest *rreq = creq.m_read_req;
1597 
1598  if (rreq->m_io == b->get_io())
1599  {
1600  ProcessBlockError(b, rreq);
1601  if (rreq->is_complete())
1602  {
1603  rreqs_to_complete.push_back(rreq);
1604  }
1605  }
1606  else
1607  {
1608  creqs_to_keep.push_back(creq);
1609  }
1610  }
1611 
1612  bool reissue = false;
1613  if ( ! creqs_to_keep.empty())
1614  {
1615  ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1616 
1617  TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1618  b->get_io() << " - reissuing request with my io " << rreq->m_io);
1619 
1620  b->reset_error_and_set_io(rreq->m_io, rreq);
1621  b->m_chunk_reqs.swap( creqs_to_keep );
1622  reissue = true;
1623  }
1624 
1625  m_state_cond.UnLock();
1626 
1627  for (auto rreq : rreqs_to_complete)
1628  FinalizeReadRequest(rreq);
1629 
1630  if (reissue)
1631  ProcessBlockRequest(b);
1632  }
1633 }
1634 
1635 //------------------------------------------------------------------------------
1636 
1637 const char* File::lPath() const
1638 {
1639  return m_filename.c_str();
1640 }
1641 
1642 //------------------------------------------------------------------------------
1643 
1644 int File::offsetIdx(int iIdx) const
1645 {
1646  return iIdx - m_offset/m_block_size;
1647 }
1648 
1649 
1650 //------------------------------------------------------------------------------
1651 
1652 void File::Prefetch()
1653 {
1654  // Check that block is not on disk and not in RAM.
1655  // TODO: Could prefetch several blocks at once!
1656  // blks_max could be an argument
1657 
1658  BlockList_t blks;
1659 
1660  TRACEF(DumpXL, "Prefetch() entering.");
1661  {
1662  XrdSysCondVarHelper _lck(m_state_cond);
1663 
1664  if (m_prefetch_state != kOn)
1665  {
1666  return;
1667  }
1668 
1669  if ( ! select_current_io_or_disable_prefetching(true) )
1670  {
1671  TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1672  return;
1673  }
1674 
1675  // Select block(s) to fetch.
1676  for (int f = 0; f < m_num_blocks; ++f)
1677  {
1678  if ( ! m_cfi.TestBitWritten(f))
1679  {
1680  int f_act = f + m_offset / m_block_size;
1681 
1682  BlockMap_i bi = m_block_map.find(f_act);
1683  if (bi == m_block_map.end())
1684  {
1685  Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1686  if (b)
1687  {
1688  TRACEF(Dump, "Prefetch take block " << f_act);
1689  blks.push_back(b);
1690  // Note: block ref_cnt not increased, it will be when placed into write queue.
1691 
1692  inc_prefetch_read_cnt(1);
1693  }
1694  else
1695  {
1696  // This shouldn't happen as prefetching stops when RAM is 70% full.
1697  TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1698  }
1699  break;
1700  }
1701  }
1702  }
1703 
1704  if (blks.empty())
1705  {
1706  TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1707  m_prefetch_state = kComplete;
1708  cache()->DeRegisterPrefetchFile(this);
1709  }
1710  else
1711  {
1712  (*m_current_io)->m_active_prefetches += (int) blks.size();
1713  }
1714  }
1715 
1716  if ( ! blks.empty())
1717  {
1718  ProcessBlockRequests(blks);
1719  }
1720 }
1721 
1722 
1723 //------------------------------------------------------------------------------
1724 
1725 float File::GetPrefetchScore() const
1726 {
1727  return m_prefetch_score;
1728 }
1729 
1730 XrdSysError* File::GetLog() const
1731 {
1732  return Cache::TheOne().GetLog();
1733 }
1734 
1735 XrdSysTrace* File::GetTrace() const
1736 {
1737  return Cache::TheOne().GetTrace();
1738 }
1739 
1740 void File::insert_remote_location(const std::string &loc)
1741 {
1742  if ( ! loc.empty())
1743  {
1744  size_t p = loc.find_first_of('@');
1745  m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1746  }
1747 }
1748 
1749 std::string File::GetRemoteLocations() const
1750 {
1751  std::string s;
1752  if ( ! m_remote_locations.empty())
1753  {
1754  size_t sl = 0;
1755  int nl = 0;
1756  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1757  {
1758  sl += i->size();
1759  }
1760  s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1761  s = '[';
1762  int j = 1;
1763  for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1764  {
1765  s += '"'; s += *i; s += '"';
1766  if (j < nl) s += ',';
1767  }
1768  s += ']';
1769  }
1770  else
1771  {
1772  s = "[]";
1773  }
1774  return s;
1775 }
1776 
1777 //==============================================================================
1778 //======================= RESPONSE HANDLERS ==============================
1779 //==============================================================================
1780 
1782 {
1783  m_block->m_file->ProcessBlockResponse(m_block, res);
1784  delete this;
1785 }
1786 
1787 //------------------------------------------------------------------------------
1788 
1790 {
1791  m_mutex.Lock();
1792 
1793  int n_left = --m_to_wait;
1794 
1795  if (res < 0) {
1796  if (m_errno == 0) m_errno = res; // store first reported error
1797  } else {
1798  m_bytes_read += res;
1799  }
1800 
1801  m_mutex.UnLock();
1802 
1803  if (n_left == 0)
1804  {
1805  m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1806  delete this;
1807  }
1808 }
@ kXR_Unsupported
Definition: XProtocol.hh:1045
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
nlohmann::json json
#define XrdOssOK
Definition: XrdOss.hh:54
#define XRDOSS_mkpath
Definition: XrdOss.hh:526
#define TRACE_Error
Definition: XrdPfcTrace.hh:7
#define TRACE_Dump
Definition: XrdPfcTrace.hh:11
#define TRACEF(act, x)
Definition: XrdPfcTrace.hh:67
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
#define TRACEF_INT(act, x)
Definition: XrdPfcTrace.hh:71
#define stat(a, b)
Definition: XrdPosix.hh:105
#define XRD_TRACE
Definition: XrdScheduler.cc:48
bool Debug
XrdOucString File
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
@ Error
#define TRACE(act, x)
Definition: XrdTrace.hh:63
URL representation.
Definition: XrdClURL.hh:31
virtual int Fsync()
Definition: XrdOss.hh:172
virtual int Ftruncate(unsigned long long flen)
Definition: XrdOss.hh:192
virtual int Fstat(struct stat *buf)
Definition: XrdOss.hh:164
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:486
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:228
virtual ssize_t Read(off_t offset, size_t size)
Definition: XrdOss.hh:310
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition: XrdOss.cc:198
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition: XrdOss.cc:252
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition: XrdOss.hh:385
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual const char * Path()=0
virtual int Fcntl(XrdOucCacheOp::Code opc, const std::string &args, std::string &resp)
Definition: XrdOucCache.hh:148
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
Definition: XrdOucCache.cc:39
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
Definition: XrdOucCache.cc:86
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
void Done(int result) override
Definition: XrdPfcFile.cc:1781
int * ptr_n_cksum_errors()
Definition: XrdPfcFile.hh:163
int get_size() const
Definition: XrdPfcFile.hh:136
int get_error() const
Definition: XrdPfcFile.hh:150
int get_n_cksum_errors()
Definition: XrdPfcFile.hh:162
void * get_req_id() const
Definition: XrdPfcFile.hh:142
long long get_offset() const
Definition: XrdPfcFile.hh:138
vChunkRequest_t m_chunk_reqs
Definition: XrdPfcFile.hh:125
void set_error(int err)
Definition: XrdPfcFile.hh:149
vCkSum_t & ref_cksum_vec()
Definition: XrdPfcFile.hh:161
char * get_buff() const
Definition: XrdPfcFile.hh:135
IO * get_io() const
Definition: XrdPfcFile.hh:141
void set_downloaded()
Definition: XrdPfcFile.hh:148
bool req_cksum_net() const
Definition: XrdPfcFile.hh:159
bool has_cksums() const
Definition: XrdPfcFile.hh:160
long long m_offset
Definition: XrdPfcFile.hh:114
void reset_error_and_set_io(IO *io, void *rid)
Definition: XrdPfcFile.hh:152
int get_req_size() const
Definition: XrdPfcFile.hh:137
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:169
XrdOss * GetOss() const
Definition: XrdPfc.hh:290
bool blocksize_str2value(const char *from, const char *str, long long &val, long long min, long long max) const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:225
static ResourceMonitor & ResMon()
Definition: XrdPfc.cc:139
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:136
XrdSysTrace * GetTrace() const
Definition: XrdPfc.hh:305
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1264
static const Cache & TheOne()
Definition: XrdPfc.cc:137
XrdSysError * GetLog() const
Definition: XrdPfc.hh:304
void Done(int result) override
Definition: XrdPfcFile.cc:1789
const char * lPath() const
Log path.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
friend class BlockResponseHandler
Definition: XrdPfcFile.hh:205
void RemoveIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
void Sync()
Sync file cache inf o and output data with disk.
float GetPrefetchScore() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
friend class DirectResponseHandler
Definition: XrdPfcFile.hh:206
void Prefetch()
void WriteBlockToDisk(Block *b)
long long initiate_emergency_shutdown()
XrdSysError * GetLog() const
std::string GetRemoteLocations() const
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
XrdSysTrace * GetTrace() const
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
bool register_incomplete_read()
Definition: XrdPfcIO.hh:87
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:31
bool register_block_error(int res)
Definition: XrdPfcIO.hh:90
RAtomic_int m_active_read_reqs
number of active read requests
Definition: XrdPfcIO.hh:67
const char * GetLocation()
Definition: XrdPfcIO.hh:41
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
Definition: XrdPfcInfo.hh:365
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309
void SetBitSynced(int i)
Mark block as synced to disk.
Definition: XrdPfcInfo.hh:387
time_t GetNoCkSumTimeForUVKeep() const
Definition: XrdPfcInfo.hh:301
CkSumCheck_e GetCkSumState() const
Definition: XrdPfcInfo.hh:286
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
Definition: XrdPfcInfo.cc:420
void ResetCkSumNet()
Definition: XrdPfcInfo.cc:213
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:266
void DowngradeCkSumState(CkSumCheck_e css_ref)
Definition: XrdPfcInfo.hh:295
bool IsCkSumNet() const
Definition: XrdPfcInfo.hh:290
void ResetAllAccessStats()
Reset IO Stats.
Definition: XrdPfcInfo.cc:359
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
Definition: XrdPfcInfo.hh:376
bool IsComplete() const
Get complete status.
Definition: XrdPfcInfo.hh:447
bool IsCkSumCache() const
Definition: XrdPfcInfo.hh:289
void SetBitWritten(int i)
Mark block as written to disk.
Definition: XrdPfcInfo.hh:352
long long GetBufferSize() const
Get prefetch buffer size.
Definition: XrdPfcInfo.hh:469
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:429
long long GetExpectedDataFileSize() const
Get expected data file size.
Definition: XrdPfcInfo.hh:420
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
Definition: XrdPfcInfo.hh:343
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:294
void SetCkSumState(CkSumCheck_e css)
Definition: XrdPfcInfo.hh:294
void ResetNoCkSumTime()
Definition: XrdPfcInfo.hh:302
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
Definition: XrdPfcInfo.cc:161
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
Definition: XrdPfcInfo.cc:438
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Definition: XrdPfcInfo.hh:437
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
void IoAttach()
Definition: XrdPfcStats.hh:85
void AddReadStats(const Stats &s)
Definition: XrdPfcStats.hh:67
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
Definition: XrdPfcStats.hh:43
long long m_BytesBypassed
number of bytes served directly through XrdCl
Definition: XrdPfcStats.hh:41
void AddUp(const Stats &s)
Definition: XrdPfcStats.hh:119
void AddWriteStats(long long bytes_written, int n_cks_errs)
Definition: XrdPfcStats.hh:79
long long BytesReadAndWritten() const
Definition: XrdPfcStats.hh:102
void AddBytesHit(long long bh)
Definition: XrdPfcStats.hh:74
long long m_BytesHit
number of bytes served from disk
Definition: XrdPfcStats.hh:39
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:42
void IoDetach(int duration)
Definition: XrdPfcStats.hh:90
@ Warning
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, time_t timeout=0)
Factory for creating ReadImpl objects.
Definition: XrdPfc.hh:43
std::vector< ChunkRequest > vChunkRequest_t
Definition: XrdPfcFile.hh:101
std::list< Block * > BlockList_t
Definition: XrdPfcFile.hh:166
std::list< Block * >::iterator BlockList_i
Definition: XrdPfcFile.hh:167
static const int maxRVdsz
Definition: XProtocol.hh:724
static const int maxRvecsz
Definition: XProtocol.hh:722
@ hex1
Definition: XrdSysTrace.hh:42
long long offset
Definition: XrdOucIOVec.hh:42
char * data
Definition: XrdOucIOVec.hh:45
ReadRequest * m_read_req
Definition: XrdPfcFile.hh:91
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:66
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:125
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition: XrdPfc.hh:118
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition: XrdPfc.hh:119
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition: XrdPfc.hh:82
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition: XrdPfc.hh:122
CkSumCheck_e get_cs_Chk() const
Definition: XrdPfc.hh:75
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:115
bool should_uvkeep_purge(time_t delta) const
Definition: XrdPfc.hh:84
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:91
long long m_bufferSize
cache block size, default 128 kB
Definition: XrdPfc.hh:110
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition: XrdPfc.hh:117
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:92
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition: XrdPfc.hh:120
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:90
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition: XrdPfc.hh:121
unsigned short m_seq_id
Definition: XrdPfcFile.hh:53
void update_error_cond(int ec)
Definition: XrdPfcFile.hh:81
ReadReqRH * m_rh
Definition: XrdPfcFile.hh:66
bool is_complete() const
Definition: XrdPfcFile.hh:83
int return_value() const
Definition: XrdPfcFile.hh:84
long long m_bytes_read
Definition: XrdPfcFile.hh:68