XRootD
XrdPfc.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 #include <fcntl.h>
20 #include <sstream>
21 #include <algorithm>
22 #include <sys/statvfs.h>
23 
24 #include "XrdCl/XrdClURL.hh"
25 #include "XrdCl/XrdClFileSystem.hh"
27 
28 #include "XrdOuc/XrdOucEnv.hh"
29 #include "XrdOuc/XrdOucUtils.hh"
31 #include "XrdOuc/XrdOucJson.hh"
32 
33 #include "XrdSys/XrdSysTimer.hh"
34 #include "XrdSys/XrdSysTrace.hh"
35 #include "XrdSys/XrdSysXAttr.hh"
36 
38 
39 #include "XrdOss/XrdOss.hh"
41 
42 #include "XrdPfc.hh"
43 #include "XrdPfcTrace.hh"
44 #include "XrdPfcFSctl.hh"
45 #include "XrdPfcInfo.hh"
46 #include "XrdPfcIOFile.hh"
47 #include "XrdPfcIOFileBlock.hh"
48 #include "XrdPfcResourceMonitor.hh"
49 
51 
52 using namespace XrdPfc;
53 
54 Cache *Cache::m_instance = nullptr;
55 XrdScheduler *Cache::schedP = nullptr;
56 
57 
59 {
61  return 0;
62 }
63 
65 {
67  return 0;
68 }
69 
70 void *PrefetchThread(void*)
71 {
73  return 0;
74 }
75 
76 //==============================================================================
77 
78 extern "C"
79 {
81  const char *config_filename,
82  const char *parameters,
83  XrdOucEnv *env)
84 {
85  XrdSysError err(logger, "");
86  err.Say("++++++ Proxy file cache initialization started.");
87 
88  if ( ! env ||
89  ! (XrdPfc::Cache::schedP = (XrdScheduler*) env->GetPtr("XrdScheduler*")))
90  {
93  }
94 
95  Cache &instance = Cache::CreateInstance(logger, env);
96 
97  if (! instance.Config(config_filename, parameters, env))
98  {
99  err.Say("Config Proxy file cache initialization failed.");
100  return 0;
101  }
102  err.Say("++++++ Proxy file cache initialization completed.");
103 
104  {
105  pthread_t tid;
106 
107  XrdSysThread::Run(&tid, ResourceMonitorThread, 0, 0, "XrdPfc ResourceMonitor");
108 
109  for (int wti = 0; wti < instance.RefConfiguration().m_wqueue_threads; ++wti)
110  {
111  XrdSysThread::Run(&tid, ProcessWriteTaskThread, 0, 0, "XrdPfc WriteTasks ");
112  }
113 
114  if (instance.is_prefetch_enabled())
115  {
116  XrdSysThread::Run(&tid, PrefetchThread, 0, 0, "XrdPfc Prefetch ");
117  }
118  }
119 
120  XrdPfcFSctl* pfcFSctl = new XrdPfcFSctl(instance, logger);
121  env->PutPtr("XrdFSCtl_PC*", pfcFSctl);
122 
123  return &instance;
124 }
125 }
126 
127 //==============================================================================
128 
130 {
131  assert (m_instance == 0);
132  m_instance = new Cache(logger, env);
133  return *m_instance;
134 }
135 
136  Cache& Cache::GetInstance() { return *m_instance; }
137 const Cache& Cache::TheOne() { return *m_instance; }
138 const Configuration& Cache::Conf() { return m_instance->RefConfiguration(); }
139  ResourceMonitor& Cache::ResMon() { return m_instance->RefResMon(); }
140 
142 {
143  if (! m_decisionpoints.empty())
144  {
145  XrdCl::URL url(io->Path());
146  std::string filename = url.GetPath();
147  std::vector<Decision*>::const_iterator it;
148  for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
149  {
150  XrdPfc::Decision *d = *it;
151  if (! d) continue;
152  if (! d->Decide(filename, *m_oss))
153  {
154  return false;
155  }
156  }
157  }
158 
159  return true;
160 }
161 
163  XrdOucCache("pfc"),
164  m_env(env),
165  m_log(logger, "XrdPfc_"),
166  m_trace(new XrdSysTrace("XrdPfc", logger)),
167  m_traceID("Cache"),
168  m_oss(0),
169  m_gstream(0),
170  m_purge_pin(0),
171  m_prefetch_condVar(0),
172  m_prefetch_enabled(false),
173  m_RAM_used(0),
174  m_RAM_write_queue(0),
175  m_RAM_std_size(0),
176  m_isClient(false),
177  m_active_cond(0)
178 {
179  // Default log level is Warning.
180  m_trace->What = 2;
181 }
182 
184 {
185  const char* tpfx = "Attach() ";
186 
187  if (Options & XrdOucCache::optRW)
188  {
189  TRACE(Info, tpfx << "passing through write operation" << obfuscateAuth(io->Path()));
190  }
191  else if (Cache::GetInstance().Decide(io))
192  {
193  TRACE(Info, tpfx << obfuscateAuth(io->Path()));
194 
195  IO *cio;
196 
197  if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
198  {
199  cio = new IOFileBlock(io, *this);
200  }
201  else
202  {
203  IOFile *iof = new IOFile(io, *this);
204 
205  if ( ! iof->HasFile())
206  {
207  delete iof;
208  // TODO - redirect instead. But this is kind of an awkward place for it.
209  // errno is set during IOFile construction.
210  TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
211  return io;
212  }
213 
214  cio = iof;
215  }
216 
217  TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
218  ((loc && loc[0] != 0) ? loc : "<deferred open>"));
219 
220  return cio;
221  }
222  else
223  {
224  TRACE(Info, tpfx << "decision decline " << io->Path());
225  }
226  return io;
227 }
228 
229 void Cache::AddWriteTask(Block* b, bool fromRead)
230 {
231  TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
232 
233  {
234  XrdSysMutexHelper lock(&m_RAM_mutex);
235  m_RAM_write_queue += b->get_size();
236  }
237 
238  m_writeQ.condVar.Lock();
239  if (fromRead)
240  m_writeQ.queue.push_back(b);
241  else
242  m_writeQ.queue.push_front(b);
243  m_writeQ.size++;
244  m_writeQ.condVar.Signal();
245  m_writeQ.condVar.UnLock();
246 }
247 
249 {
250  std::list<Block*> removed_blocks;
251  long long sum_size = 0;
252 
253  m_writeQ.condVar.Lock();
254  std::list<Block*>::iterator i = m_writeQ.queue.begin();
255  while (i != m_writeQ.queue.end())
256  {
257  if ((*i)->m_file == file)
258  {
259  TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
260  std::list<Block*>::iterator j = i++;
261  removed_blocks.push_back(*j);
262  sum_size += (*j)->get_size();
263  m_writeQ.queue.erase(j);
264  --m_writeQ.size;
265  }
266  else
267  {
268  ++i;
269  }
270  }
271  m_writeQ.condVar.UnLock();
272 
273  {
274  XrdSysMutexHelper lock(&m_RAM_mutex);
275  m_RAM_write_queue -= sum_size;
276  }
277 
278  file->BlocksRemovedFromWriteQ(removed_blocks);
279 }
280 
282 {
283  std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
284 
285  while (true)
286  {
287  m_writeQ.condVar.Lock();
288  while (m_writeQ.size == 0)
289  {
290  m_writeQ.condVar.Wait();
291  }
292 
293  // MT -- optimize to pop several blocks if they are available (or swap the list).
294  // This makes sense especially for smallish block sizes.
295 
296  int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
297  long long sum_size = 0;
298 
299  for (int bi = 0; bi < n_pushed; ++bi)
300  {
301  Block* block = m_writeQ.queue.front();
302  m_writeQ.queue.pop_front();
303  m_writeQ.writes_between_purges += block->get_size();
304  sum_size += block->get_size();
305 
306  blks_to_write[bi] = block;
307 
308  TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
309  }
310  m_writeQ.size -= n_pushed;
311 
312  m_writeQ.condVar.UnLock();
313 
314  {
315  XrdSysMutexHelper lock(&m_RAM_mutex);
316  m_RAM_write_queue -= sum_size;
317  }
318 
319  for (int bi = 0; bi < n_pushed; ++bi)
320  {
321  Block* block = blks_to_write[bi];
322 
323  block->m_file->WriteBlockToDisk(block);
324  }
325  }
326 }
327 
329 {
330  // Called from ResourceMonitor for an alternative estimation of disk writes.
331  XrdSysCondVarHelper lock(&m_writeQ.condVar);
332  long long ret = m_writeQ.writes_between_purges;
333  m_writeQ.writes_between_purges = 0;
334  return ret;
335 }
336 
337 //==============================================================================
338 
339 char* Cache::RequestRAM(long long size)
340 {
341  static const size_t s_block_align = sysconf(_SC_PAGESIZE);
342 
343  bool std_size = (size == m_configuration.m_bufferSize);
344 
345  m_RAM_mutex.Lock();
346 
347  long long total = m_RAM_used + size;
348 
349  if (total <= m_configuration.m_RamAbsAvailable)
350  {
351  m_RAM_used = total;
352  if (std_size && m_RAM_std_size > 0)
353  {
354  char *buf = m_RAM_std_blocks.back();
355  m_RAM_std_blocks.pop_back();
356  --m_RAM_std_size;
357 
358  m_RAM_mutex.UnLock();
359 
360  return buf;
361  }
362  else
363  {
364  m_RAM_mutex.UnLock();
365  char *buf;
366  if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
367  {
368  // Report out of mem? Probably should report it at least the first time,
369  // then periodically.
370  return 0;
371  }
372  return buf;
373  }
374  }
375  m_RAM_mutex.UnLock();
376  return 0;
377 }
378 
379 void Cache::ReleaseRAM(char* buf, long long size)
380 {
381  bool std_size = (size == m_configuration.m_bufferSize);
382  {
383  XrdSysMutexHelper lock(&m_RAM_mutex);
384 
385  m_RAM_used -= size;
386 
387  if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
388  {
389  m_RAM_std_blocks.push_back(buf);
390  ++m_RAM_std_size;
391  return;
392  }
393  }
394  free(buf);
395 }
396 
397 File* Cache::GetFile(const std::string& path, IO* io, long long off, long long filesize)
398 {
399  // Called from virtual IOFile constructor.
400 
401  TRACE(Debug, "GetFile " << path << ", io " << io);
402 
403  ActiveMap_i it;
404 
405  {
406  XrdSysCondVarHelper lock(&m_active_cond);
407 
408  while (true)
409  {
410  it = m_active.find(path);
411 
412  // File is not open or being opened. Mark it as being opened and
413  // proceed to opening it outside of while loop.
414  if (it == m_active.end())
415  {
416  it = m_active.insert(std::make_pair(path, (File*) 0)).first;
417  break;
418  }
419 
420  if (it->second != 0)
421  {
422  it->second->AddIO(io);
423  inc_ref_cnt(it->second, false, true);
424 
425  return it->second;
426  }
427  else
428  {
429  // Wait for some change in m_active, then recheck.
430  m_active_cond.Wait();
431  }
432  }
433  }
434 
435  // This is always true, now that IOFileBlock is unsupported.
436 
437  if (filesize == 0)
438  {
439  struct stat st;
440  int res = io->Fstat(st);
441  if (res < 0) {
442  errno = res;
443  TRACE(Error, "GetFile, could not get valid stat");
444  } else if (res > 0) {
445  errno = ENOTSUP;
446  TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
447  } else {
448  filesize = st.st_size;
449  }
450  }
451 
452  File *file = 0;
453 
454  if (filesize >= 0)
455  {
456  file = File::FileOpen(path, off, filesize, io->GetInput());
457  }
458 
459  {
460  XrdSysCondVarHelper lock(&m_active_cond);
461 
462  if (file)
463  {
464  inc_ref_cnt(file, false, true);
465  it->second = file;
466 
467  file->AddIO(io);
468  }
469  else
470  {
471  m_active.erase(it);
472  }
473 
474  m_active_cond.Broadcast();
475  }
476 
477  return file;
478 }
479 
481 {
482  // Called from virtual IO::DetachFinalize.
483 
484  TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
485 
486  {
487  XrdSysCondVarHelper lock(&m_active_cond);
488 
489  f->RemoveIO(io);
490  }
491  dec_ref_cnt(f, true);
492 }
493 
494 
495 namespace
496 {
497 
498 class DiskSyncer : public XrdJob
499 {
500 private:
501  File *m_file;
502  bool m_high_debug;
503 
504 public:
505  DiskSyncer(File *f, bool high_debug, const char *desc = "") :
506  XrdJob(desc),
507  m_file(f),
508  m_high_debug(high_debug)
509  {}
510 
511  void DoIt()
512  {
513  m_file->Sync();
514  Cache::GetInstance().FileSyncDone(m_file, m_high_debug);
515  delete this;
516  }
517 };
518 
519 
520 class CommandExecutor : public XrdJob
521 {
522 private:
523  std::string m_command_url;
524 
525 public:
526  CommandExecutor(const std::string& command, const char *desc = "") :
527  XrdJob(desc),
528  m_command_url(command)
529  {}
530 
531  void DoIt()
532  {
533  Cache::GetInstance().ExecuteCommandUrl(m_command_url);
534  delete this;
535  }
536 };
537 
538 }
539 
540 //==============================================================================
541 
542 void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set, bool high_debug)
543 {
544  DiskSyncer* ds = new DiskSyncer(f, high_debug);
545 
546  if ( ! ref_cnt_already_set) inc_ref_cnt(f, true, high_debug);
547 
548  schedP->Schedule(ds);
549 }
550 
551 void Cache::FileSyncDone(File* f, bool high_debug)
552 {
553  dec_ref_cnt(f, high_debug);
554 }
555 
556 void Cache::inc_ref_cnt(File* f, bool lock, bool high_debug)
557 {
558  // called from GetFile() or SheduleFileSync();
559 
560  int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
561 
562  if (lock) m_active_cond.Lock();
563  int rc = f->inc_ref_cnt();
564  if (lock) m_active_cond.UnLock();
565 
566  TRACE_INT(tlvl, "inc_ref_cnt " << f->GetLocalPath() << ", cnt at exit = " << rc);
567 }
568 
569 void Cache::dec_ref_cnt(File* f, bool high_debug)
570 {
571  // NOT under active lock.
572  // Called from ReleaseFile(), DiskSync callback and stat-like functions.
573 
574  int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
575  int cnt;
576 
577  bool emergency_close = false;
578  {
579  XrdSysCondVarHelper lock(&m_active_cond);
580 
581  cnt = f->get_ref_cnt();
582  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);
583 
584  if (f->is_in_emergency_shutdown())
585  {
586  // In this case file has been already removed from m_active map and
587  // does not need to be synced.
588 
589  if (cnt == 1)
590  {
591  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
592  << " -- closing and deleting File object without further ado");
593  emergency_close = true;
594  }
595  else
596  {
597  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
598  << " -- waiting");
599  f->dec_ref_cnt();
600  return;
601  }
602  }
603  if (cnt > 1)
604  {
605  f->dec_ref_cnt();
606  return;
607  }
608  }
609  if (emergency_close)
610  {
611  f->Close();
612  delete f;
613  return;
614  }
615 
616  if (cnt == 1)
617  {
618  if (f->FinalizeSyncBeforeExit())
619  {
620  // Note, here we "reuse" the existing reference count for the
621  // final sync.
622 
623  TRACE(Debug, "dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync");
624  schedule_file_sync(f, true, true);
625  return;
626  }
627  }
628 
629  bool finished_p = false;
630  ActiveMap_i act_it;
631  {
632  XrdSysCondVarHelper lock(&m_active_cond);
633 
634  cnt = f->dec_ref_cnt();
635  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
636  if (cnt == 0)
637  {
638  act_it = m_active.find(f->GetLocalPath());
639  act_it->second = 0;
640 
641  finished_p = true;
642  }
643  }
644  if (finished_p)
645  {
646  f->Close();
647  {
648  XrdSysCondVarHelper lock(&m_active_cond);
649  m_active.erase(act_it);
650  m_active_cond.Broadcast();
651  }
652 
653  if (m_gstream)
654  {
655  const Stats &st = f->RefStats();
656  const Info::AStat *as = f->GetLastAccessStats();
657 
658  char buf[4096];
659  int len = snprintf(buf, 4096, "{\"event\":\"file_close\","
660  "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d,"
661  "\"access_cnt\":%lu,\"attach_t\":%lld,\"detach_t\":%lld,\"remotes\":%s,"
662  "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld,"
663  "\"b_todisk\":%lld,\"b_prefetch\":%lld,\"n_cks_errs\":%d}",
664  f->GetLocalPath().c_str(), f->GetFileSize(), f->GetBlockSize(),
665  f->GetNBlocks(), f->GetNDownloadedBlocks(),
666  (unsigned long) f->GetAccessCnt(), (long long) as->AttachTime, (long long) as->DetachTime,
667  f->GetRemoteLocations().c_str(),
668  as->BytesHit, as->BytesMissed, as->BytesBypassed,
670  );
671  bool suc = false;
672  if (len < 4096)
673  {
674  suc = m_gstream->Insert(buf, len + 1);
675  }
676  if ( ! suc)
677  {
678  TRACE(Error, "Failed g-stream insertion of file_close record, len=" << len);
679  }
680  }
681 
682  delete f;
683  }
684 }
685 
686 bool Cache::IsFileActiveOrPurgeProtected(const std::string& path) const
687 {
688  XrdSysCondVarHelper lock(&m_active_cond);
689 
690  return m_active.find(path) != m_active.end() ||
691  m_purge_delay_set.find(path) != m_purge_delay_set.end();
692 }
693 
695 {
696  XrdSysCondVarHelper lock(&m_active_cond);
697  m_purge_delay_set.clear();
698 }
699 
700 //==============================================================================
701 //=== PREFETCH
702 //==============================================================================
703 
705 {
706  // Can be called with other locks held.
707 
708  if ( ! m_prefetch_enabled)
709  {
710  return;
711  }
712 
713  m_prefetch_condVar.Lock();
714  m_prefetchList.push_back(file);
715  m_prefetch_condVar.Signal();
716  m_prefetch_condVar.UnLock();
717 }
718 
719 
721 {
722  // Can be called with other locks held.
723 
724  if ( ! m_prefetch_enabled)
725  {
726  return;
727  }
728 
729  m_prefetch_condVar.Lock();
730  for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
731  {
732  if (*it == file)
733  {
734  m_prefetchList.erase(it);
735  break;
736  }
737  }
738  m_prefetch_condVar.UnLock();
739 }
740 
741 
743 {
744  m_prefetch_condVar.Lock();
745  while (m_prefetchList.empty())
746  {
747  m_prefetch_condVar.Wait();
748  }
749 
750  // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
751 
752  size_t l = m_prefetchList.size();
753  int idx = rand() % l;
754  File* f = m_prefetchList[idx];
755 
756  m_prefetch_condVar.UnLock();
757  return f;
758 }
759 
760 
762 {
763  const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
764 
765  while (true)
766  {
767  m_RAM_mutex.Lock();
768  bool doPrefetch = (m_RAM_used < limit_RAM);
769  m_RAM_mutex.UnLock();
770 
771  if (doPrefetch)
772  {
774  f->Prefetch();
775  }
776  else
777  {
779  }
780  }
781 }
782 
783 
784 //==============================================================================
785 //=== Virtuals from XrdOucCache
786 //==============================================================================
787 
788 //------------------------------------------------------------------------------
802 
803 int Cache::LocalFilePath(const char *curl, char *buff, int blen,
804  LFP_Reason why, bool forall)
805 {
806  static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
807  static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
808  static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
809 
810  TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
811 
812  if (buff && blen > 0) buff[0] = 0;
813 
814  XrdCl::URL url(curl);
815  std::string f_name = url.GetPath();
816  std::string i_name = f_name + Info::s_infoExtension;
817 
818  if (why == ForPath)
819  {
820  int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
821  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
822  return ret;
823  }
824 
825  {
826  XrdSysCondVarHelper lock(&m_active_cond);
827  m_purge_delay_set.insert(f_name);
828  }
829 
830  struct stat sbuff, sbuff2;
831  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
832  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
833  {
834  if (S_ISDIR(sbuff.st_mode))
835  {
836  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
837  return -EISDIR;
838  }
839  else
840  {
841  bool read_ok = false;
842  bool is_complete = false;
843 
844  // Lock and check if the file is active. If NOT, keep the lock
845  // and add dummy access after successful reading of info file.
846  // If it IS active, just release the lock, this ongoing access will
847  // assure the file continues to exist.
848 
849  // XXXX How can I just loop over the cinfo file when active?
850  // Can I not get is_complete from the existing file?
851  // Do I still want to inject access record?
852  // Oh, it writes only if not active .... still let's try to use existing File.
853 
854  m_active_cond.Lock();
855 
856  bool is_active = m_active.find(f_name) != m_active.end();
857 
858  if (is_active) m_active_cond.UnLock();
859 
860  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
861  XrdOucEnv myEnv;
862  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
863  if (res >= 0)
864  {
865  Info info(m_trace, 0);
866  if (info.Read(infoFile, i_name.c_str()))
867  {
868  read_ok = true;
869 
870  is_complete = info.IsComplete();
871 
872  // Add full-size access if reason is for access.
873  if ( ! is_active && is_complete && why == ForAccess)
874  {
875  info.WriteIOStatSingle(info.GetFileSize());
876  info.Write(infoFile, i_name.c_str());
877  }
878  }
879  infoFile->Close();
880  }
881  delete infoFile;
882 
883  if ( ! is_active) m_active_cond.UnLock();
884 
885  if (read_ok)
886  {
887  if ((is_complete || why == ForInfo) && buff != 0)
888  {
889  int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
890  if (res2 < 0)
891  return res2;
892 
893  // Normally, files are owned by us but when direct cache access
894  // is wanted and possible, make sure the file is world readable.
895  if (why == ForAccess)
896  {mode_t mode = (forall ? worldReadable : groupReadable);
897  if (((sbuff.st_mode & worldReadable) != mode)
898  && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
899  {is_complete = false;
900  *buff = 0;
901  }
902  }
903  }
904 
905  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
906  (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
907 
908  return is_complete ? 0 : -EREMOTE;
909  }
910  }
911  }
912 
913  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
914  return -ENOENT;
915 }
916 
917 //______________________________________________________________________________
918 // If supported, write Cache-Control as xattr to cinfo file.
919 // One can use file descriptor or full path interchangeably
920 //------------------------------------------------------------------------------
921 void Cache::WriteCacheControlXAttr(int cinfo_fd, const char* path, const std::string& cc)
922 {
923  if (m_metaXattr) {
924  int res = XrdSysXAttrActive->Set("pfc.cache-control", cc.c_str(), cc.size(), path, cinfo_fd, 0);
925  if (res != 0) {
926  TRACE(Error, "WritecacheControlXAttr error setting xattr " << res);
927  }
928  }
929 }
930 
931 //______________________________________________________________________________
932 // If supported, write file_size as xattr to cinfo file.
933 //------------------------------------------------------------------------------
934 void Cache::WriteFileSizeXAttr(int cinfo_fd, long long file_size)
935 {
936  if (m_metaXattr) {
937  int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
938  if (res != 0) {
939  TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
940  }
941  }
942 }
943 
944 //______________________________________________________________________________
945 // Determine full size of the data file from the corresponding cinfo-file name.
946 // Attempts to read xattr first and falls back to reading of the cinfo file.
947 // Returns -error on failure.
948 //------------------------------------------------------------------------------
949 long long Cache::DetermineFullFileSize(const std::string &cinfo_fname)
950 {
951  if (m_metaXattr) {
952  char pfn[4096];
953  m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
954  long long fsize = -1ll;
955  int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
956  if (res == sizeof(long long))
957  {
958  return fsize;
959  }
960  else
961  {
962  TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
963  }
964  }
965 
966  XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
967  XrdOucEnv env;
968  long long ret;
969  int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
970  if (res < 0) {
971  ret = res;
972  } else {
973  Info info(m_trace, 0);
974  if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
975  ret = -EBADF;
976  } else {
977  ret = info.GetFileSize();
978  }
979  infoFile->Close();
980  }
981  delete infoFile;
982  return ret;
983 }
984 
985 //______________________________________________________________________________
986 // Get cache control attributes from the corresponding cinfo-file name.
987 // Returns -error on failure.
988 //------------------------------------------------------------------------------
989 int Cache::GetCacheControlXAttr(const std::string &cinfo_fname, std::string& ival) const
990 {
991  if (m_metaXattr) {
992 
993  char pfn[4096];
994  m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
995 
996  char cc[512];
997  int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, pfn, -1);
998  if (res > 0)
999  {
1000  std::string tmp(cc, res);
1001  ival = tmp;
1002  }
1003  return res;
1004  }
1005  return 0;
1006 }
1007 
1008 //______________________________________________________________________________
1009 // Get cache control attributes from the corresponding cinfo-file name.
1010 // Returns -error on failure.
1011 //------------------------------------------------------------------------------
1012 int Cache::GetCacheControlXAttr(int fd, std::string& ival) const
1013 {
1014  if (m_metaXattr) {
1015  char cc[512];
1016  int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, nullptr, fd);
1017  if (res > 0)
1018  {
1019  ival = std::string(cc, res);
1020  return res;
1021  }
1022  }
1023  return 0;
1024 }
1025 
1026 //______________________________________________________________________________
1027 // Calculate if the file is to be considered cached for the purposes of
1028 // only-if-cached and setting of atime of the Stat() calls.
1029 // Returns true if the file is to be conidered cached.
1030 //------------------------------------------------------------------------------
1031 bool Cache::DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
1032 {
1033  if (file_size == 0 || bytes_on_disk >= file_size)
1034  return true;
1035 
1036  double frac_on_disk = (double) bytes_on_disk / file_size;
1037 
1038  if (file_size <= m_configuration.m_onlyIfCachedMinSize)
1039  {
1040  if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
1041  return true;
1042  }
1043  else
1044  {
1045  if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
1046  frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
1047  return true;
1048  }
1049  return false;
1050 }
1051 
1052 //______________________________________________________________________________
1053 // Check if the file is cached including m_onlyIfCachedMinSize and m_onlyIfCachedMinFrac
1054 // pfc configuration parameters. The logic of accessing the Info file is the same
1055 // as in Cache::LocalFilePath.
1063 //------------------------------------------------------------------------------
1064 int Cache::ConsiderCached(const char *curl)
1065 {
1066  static const char* tpfx = "ConsiderCached ";
1067 
1068  TRACE(Debug, tpfx << curl);
1069 
1070  XrdCl::URL url(curl);
1071  std::string f_name = url.GetPath();
1072 
1073  File *file = nullptr;
1074  {
1075  XrdSysCondVarHelper lock(&m_active_cond);
1076  auto it = m_active.find(f_name);
1077  if (it != m_active.end()) {
1078  file = it->second;
1079  // If the file-open is in progress, `file` is a nullptr
1080  // so we cannot increase the reference count. For now,
1081  // simply treat it as if the file open doesn't exist instead
1082  // of trying to wait and see if it succeeds.
1083  if (file) {
1084  inc_ref_cnt(file, false, false);
1085  }
1086  }
1087  }
1088  if (file) {
1089  struct stat sbuff;
1090  int res = file->Fstat(sbuff);
1091  dec_ref_cnt(file, false);
1092  if (res)
1093  return res;
1094  // DecideIfConsideredCached() already called in File::Fstat().
1095  return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1096  }
1097 
1098  struct stat sbuff;
1099  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1100  if (res != XrdOssOK) {
1101  TRACE(Debug, tpfx << curl << " -> " << res);
1102  return res;
1103  }
1104  if (S_ISDIR(sbuff.st_mode))
1105  {
1106  TRACE(Debug, tpfx << curl << " -> EISDIR");
1107  return -EISDIR;
1108  }
1109 
1110  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1111  if (file_size < 0) {
1112  TRACE(Debug, tpfx << curl << " -> " << file_size);
1113  return (int) file_size;
1114  }
1115  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1116 
1117  return is_cached ? 0 : -EREMOTE;
1118 }
1119 
1120 //______________________________________________________________________________
1128 //------------------------------------------------------------------------------
1129 
1130 int Cache::Prepare(const char *curl, int oflags, mode_t mode)
1131 {
1132  XrdCl::URL url(curl);
1133  std::string f_name = url.GetPath();
1134  std::string i_name = f_name + Info::s_infoExtension;
1135 
1136  // Do not allow write access.
1137  if ((oflags & O_ACCMODE) != O_RDONLY)
1138  {
1139  if (Cache::GetInstance().RefConfiguration().m_write_through)
1140  {
1141  return 0;
1142  }
1143  TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1144  return -EROFS;
1145  }
1146 
1147  // Intercept xrdpfc_command requests.
1148  if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1149  {
1150  // Schedule a job to process command request.
1151  {
1152  CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1153 
1154  schedP->Schedule(ce);
1155  }
1156 
1157  return -EAGAIN;
1158  }
1159 
1160  {
1161  XrdSysCondVarHelper lock(&m_active_cond);
1162  m_purge_delay_set.insert(f_name);
1163  }
1164 
1165  struct stat sbuff;
1166  if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1167  {
1168 
1169  if (m_configuration.m_httpcc && !is_http_cache_valid(f_name, i_name, url))
1170  {
1171  TRACE(Info, "Http cache not valid " << f_name);
1172  UnlinkFile(f_name, false);
1173  return 0;
1174  }
1175 
1176  TRACE(Dump, "Prepare defer open " << f_name);
1177  return 1;
1178  }
1179  else
1180  {
1181  return 0;
1182  }
1183 }
1184 
1185 //______________________________________________________________________________
1186 // virtual method of XrdOucCache.
1191 //------------------------------------------------------------------------------
1192 
1193 int Cache::Stat(const char *curl, struct stat &sbuff)
1194 {
1195  const char *tpfx = "Stat ";
1196 
1197  XrdCl::URL url(curl);
1198  std::string f_name = url.GetPath();
1199 
1200  File *file = nullptr;
1201  {
1202  XrdSysCondVarHelper lock(&m_active_cond);
1203  auto it = m_active.find(f_name);
1204  if (it != m_active.end()) {
1205  file = it->second;
1206  // If `file` is nullptr, the file-open is in progress; instead
1207  // of waiting for the file-open to finish, simply treat it as if
1208  // the file-open doesn't exist.
1209  if (file) {
1210  inc_ref_cnt(file, false, false);
1211  }
1212  }
1213  }
1214  if (file) {
1215  int res = file->Fstat(sbuff);
1216  dec_ref_cnt(file, false);
1217  TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1218  return res;
1219  }
1220 
1221  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1222  if (res != XrdOssOK) {
1223  TRACE(Debug, tpfx << curl << " -> " << res);
1224  return 1; // res; -- for only-if-cached
1225  }
1226  if (S_ISDIR(sbuff.st_mode))
1227  {
1228  TRACE(Debug, tpfx << curl << " -> EISDIR");
1229  return -EISDIR;
1230  }
1231 
1232  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1233  if (file_size < 0) {
1234  TRACE(Debug, tpfx << curl << " -> " << file_size);
1235  return 1; // (int) file_size; -- for only-if-cached
1236  }
1237  sbuff.st_size = file_size;
1238  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1239  if ( ! is_cached)
1240  sbuff.st_atime = 0;
1241 
1242  TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1243 
1244  return 0;
1245 }
1246 
1247 //______________________________________________________________________________
1248 // virtual method of XrdOucCache.
1252 //------------------------------------------------------------------------------
1253 
1254 int Cache::Unlink(const char *curl)
1255 {
1256  XrdCl::URL url(curl);
1257  std::string f_name = url.GetPath();
1258 
1259  // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1260 
1261  return UnlinkFile(f_name, false);
1262 }
1263 
1264 int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open)
1265 {
1266  static const char* trc_pfx = "UnlinkFile ";
1267  ActiveMap_i it;
1268  File *file = 0;
1269  long long st_blocks_to_purge = 0;
1270  {
1271  XrdSysCondVarHelper lock(&m_active_cond);
1272 
1273  it = m_active.find(f_name);
1274 
1275  if (it != m_active.end())
1276  {
1277  if (fail_if_open)
1278  {
1279  TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1280  return -EBUSY;
1281  }
1282 
1283  // Null File* in m_active map means an operation is ongoing, probably
1284  // Attach() with possible File::Open(). Ask for retry.
1285  if (it->second == 0)
1286  {
1287  TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1288  return -EAGAIN;
1289  }
1290 
1291  file = it->second;
1292  st_blocks_to_purge = file->initiate_emergency_shutdown();
1293  it->second = 0;
1294  }
1295  else
1296  {
1297  it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1298  }
1299  }
1300 
1301  if (file) {
1302  RemoveWriteQEntriesFor(file);
1303  } else {
1304  struct stat f_stat;
1305  if (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK)
1306  st_blocks_to_purge = f_stat.st_blocks;
1307  }
1308 
1309  std::string i_name = f_name + Info::s_infoExtension;
1310 
1311  // Unlink file & cinfo
1312  int f_ret = m_oss->Unlink(f_name.c_str());
1313  int i_ret = m_oss->Unlink(i_name.c_str());
1314 
1315  if (st_blocks_to_purge)
1316  m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1317 
1318  TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1319 
1320  {
1321  XrdSysCondVarHelper lock(&m_active_cond);
1322  m_active.erase(it);
1323  m_active_cond.Broadcast();
1324  }
1325 
1326  return std::min(f_ret, i_ret);
1327 }
1328 
1329 //---------------------------------------------------------------------
1333 //---------------------------------------------------------------------
1334 bool Cache::is_http_cache_valid(const std::string& f_name,const std::string& i_name, XrdCl::URL& url)
1335 {
1336  std::string icc;
1337 
1338  // return true if nothing is written in cinfo extended file attributes
1339  if (GetCacheControlXAttr(i_name, icc) <= 0)
1340  return true;
1341 
1342  bool ccIsValid = true;
1343  using namespace nlohmann;
1344  json cc_json = json::parse(icc);
1345 
1346  bool mustRevalidate = cc_json.contains("revalidate") && (cc_json["revalidate"] == true);
1347  bool hasExpired = false;
1348  if (cc_json.contains("expire"))
1349  {
1350  time_t current_time;
1351  current_time = time(NULL);
1352  if (current_time > cc_json["expire"])
1353  hasExpired = true;
1354  }
1355 
1356  if (cc_json.contains("ETag") && (mustRevalidate || hasExpired))
1357  {
1358  // Compare cinfo etag and the etag from file system query response
1359  // Add XrdCl:URL parameter as additional XrdOucCacheOp::Code::QFSinfo sub-command
1360  url.SetParam("code", "head");
1361  std::string response;
1362  std::string fsctlarg = url.GetURL();
1363  int st = XrdPosixExtra::FSctl(XrdOucCacheOp::Code::QFSinfo, fsctlarg, response, false, m_configuration.m_qfsredir);
1364 
1365  if (st >= 0)
1366  {
1367  // client response keeps the \0 at the end of the string
1368  std::string etag = response.substr(0, response.find('\0'));
1369  std::string jval = cc_json["ETag"].get<std::string>();
1370  ccIsValid = (etag == jval);
1371 
1372  TRACE(Info, "Prepare " << i_name << ", ETag valid res: " << ccIsValid);
1373 
1374  // update expiration time if Etag is valid
1375  if (cc_json.contains("max-age"))
1376  {
1377  time_t ma = cc_json["max-age"];
1378  cc_json["expire"] = ma + time(NULL);
1379  char pfn[4096];
1380  m_oss->Lfn2Pfn(i_name.c_str(), pfn, 4096);
1381  WriteCacheControlXAttr(-1, pfn, cc_json.dump());
1382  }
1383  }
1384  else
1385  {
1386  // Message has a status because we are in the block condition for cache-control xattr
1387  TRACE(Error, "Prepare() XrdCl::FileSystem::Query failed " << f_name.c_str());
1388  ccIsValid = false;
1389  }
1390  }
1391 
1392  return ccIsValid;
1393 }
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
Definition: XrdAccTest.cc:262
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
nlohmann::json json
#define XrdOssOK
Definition: XrdOss.hh:54
std::string obfuscateAuth(const std::string &input)
#define TRACE_Dump
Definition: XrdPfcTrace.hh:11
#define TRACE_PC(act, pre_code, x)
Definition: XrdPfcTrace.hh:59
#define TRACE_INT(act, x)
Definition: XrdPfcTrace.hh:52
void * ResourceMonitorThread(void *)
Definition: XrdPfc.cc:58
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:80
void * PrefetchThread(void *)
Definition: XrdPfc.cc:70
XrdSysXAttr * XrdSysXAttrActive
Definition: XrdSysFAttr.cc:61
void * ProcessWriteTaskThread(void *)
Definition: XrdPfc.cc:64
#define stat(a, b)
Definition: XrdPosix.hh:105
bool Debug
@ Error
#define TRACE(act, x)
Definition: XrdTrace.hh:63
URL representation.
Definition: XrdClURL.hh:31
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
void SetParam(const std::string &name, const std::string &value)
Set a single param.
Definition: XrdClURL.hh:283
Definition: XrdJob.hh:43
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:228
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:954
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
virtual const char * Path()=0
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:175
virtual const char * Location(bool refresh=false)
Definition: XrdOucCache.hh:188
static const int optRW
File is read/write (o/w read/only)
Definition: XrdOucCache.hh:555
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
void PutPtr(const char *varname, void *value)
Definition: XrdOucEnv.cc:316
int get_size() const
Definition: XrdPfcFile.hh:136
File * get_file() const
Definition: XrdPfcFile.hh:140
long long m_offset
Definition: XrdPfcFile.hh:114
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:169
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition: XrdPfc.cc:949
void FileSyncDone(File *, bool high_debug)
Definition: XrdPfc.cc:551
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition: XrdPfc.cc:397
static const Configuration & Conf()
Definition: XrdPfc.cc:138
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
Definition: XrdPfc.cc:803
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1193
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:225
static ResourceMonitor & ResMon()
Definition: XrdPfc.cc:139
bool IsFileActiveOrPurgeProtected(const std::string &) const
Definition: XrdPfc.cc:686
void ClearPurgeProtectedSet()
Definition: XrdPfc.cc:694
void ReleaseRAM(char *buf, long long size)
Definition: XrdPfc.cc:379
virtual int ConsiderCached(const char *url)
Definition: XrdPfc.cc:1064
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:136
bool Config(const char *config_filename, const char *parameters, XrdOucEnv *env)
Parse configuration file.
int GetCacheControlXAttr(const std::string &cinfo_fname, std::string &res) const
Definition: XrdPfc.cc:989
void DeRegisterPrefetchFile(File *)
Definition: XrdPfc.cc:720
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
Definition: XrdPfc.cc:704
void WriteFileSizeXAttr(int cinfo_fd, long long file_size)
Definition: XrdPfc.cc:934
void Prefetch()
Definition: XrdPfc.cc:761
void ReleaseFile(File *, IO *)
Definition: XrdPfc.cc:480
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Definition: XrdPfc.cc:229
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition: XrdPfc.cc:162
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition: XrdPfc.cc:141
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1264
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *ioP, int opts=0)=0
Obtain a new IO object that fronts existing XrdOucCacheIO.
static XrdScheduler * schedP
Definition: XrdPfc.hh:312
File * GetNextFileToPrefetch()
Definition: XrdPfc.cc:742
ResourceMonitor & RefResMon()
Definition: XrdPfc.hh:307
long long WritesSinceLastCall()
Definition: XrdPfc.cc:328
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
Definition: XrdPfc.cc:281
virtual int Unlink(const char *url)
Definition: XrdPfc.cc:1254
void WriteCacheControlXAttr(int cinfo_fd, const char *path, const std::string &cc)
Definition: XrdPfc.cc:921
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition: XrdPfc.cc:248
static const Cache & TheOne()
Definition: XrdPfc.cc:137
char * RequestRAM(long long size)
Definition: XrdPfc.cc:339
virtual int Prepare(const char *url, int oflags, mode_t mode)
Definition: XrdPfc.cc:1130
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition: XrdPfc.cc:1031
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Definition: XrdPfc.cc:129
bool is_prefetch_enabled() const
Definition: XrdPfc.hh:317
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0
const char * lPath() const
Log path.
void RemoveIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
int GetNBlocks() const
Definition: XrdPfcFile.hh:278
const Info::AStat * GetLastAccessStats() const
Definition: XrdPfcFile.hh:275
size_t GetAccessCnt() const
Definition: XrdPfcFile.hh:276
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
void Sync()
Sync file cache inf o and output data with disk.
long long GetPrefetchedBytes() const
Definition: XrdPfcFile.hh:280
int GetBlockSize() const
Definition: XrdPfcFile.hh:277
int GetNDownloadedBlocks() const
Definition: XrdPfcFile.hh:279
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
int inc_ref_cnt()
Definition: XrdPfcFile.hh:287
int dec_ref_cnt()
Definition: XrdPfcFile.hh:288
void Prefetch()
int get_ref_cnt()
Definition: XrdPfcFile.hh:286
void WriteBlockToDisk(Block *b)
long long initiate_emergency_shutdown()
long long GetFileSize() const
Definition: XrdPfcFile.hh:267
std::string GetRemoteLocations() const
const Stats & RefStats() const
Definition: XrdPfcFile.hh:281
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:262
bool is_in_emergency_shutdown()
Definition: XrdPfcFile.hh:291
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:39
bool HasFile() const
Check if File was opened successfully.
Definition: XrdPfcIOFile.hh:48
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:31
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
Definition: XrdPfcInfo.cc:444
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:266
bool IsComplete() const
Get complete status.
Definition: XrdPfcInfo.hh:447
long long GetFileSize() const
Get file size.
Definition: XrdPfcInfo.hh:442
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:294
void register_file_purge(DirState *target, long long size_in_st_blocks)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
int m_NCksumErrors
number of checksum errors while getting data from remote
Definition: XrdPfcStats.hh:44
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:42
static int FSctl(XrdOucCacheOp::Code opc, const std::string &args, std::string &resp, bool viaCache=false, bool viaRedir=false)
void Schedule(XrdJob *jp)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:162
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
bool Insert(const char *data, int dlen)
@ Warning
Definition: XrdPfc.hh:43
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:66
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:111
bool m_httpcc
enable http cache control
Definition: XrdPfc.hh:139
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:88
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:112
long long m_bufferSize
cache block size, default 128 kB
Definition: XrdPfc.hh:110
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:113
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:90
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:132
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:131
Access statistics.
Definition: XrdPfcInfo.hh:57
long long BytesHit
read from cache
Definition: XrdPfcInfo.hh:64
long long BytesBypassed
read from remote and dropped
Definition: XrdPfcInfo.hh:66
time_t DetachTime
close time
Definition: XrdPfcInfo.hh:59
long long BytesMissed
read from remote and cached
Definition: XrdPfcInfo.hh:65
time_t AttachTime
open time
Definition: XrdPfcInfo.hh:58