XRootD
XrdPfc.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 #include <fcntl.h>
20 #include <sstream>
21 #include <algorithm>
22 #include <sys/statvfs.h>
23 
24 #include "XrdCl/XrdClConstants.hh"
25 #include "XrdCl/XrdClURL.hh"
26 
27 #include "XrdOuc/XrdOucEnv.hh"
28 #include "XrdOuc/XrdOucUtils.hh"
30 
31 #include "XrdSys/XrdSysPthread.hh"
32 #include "XrdSys/XrdSysTimer.hh"
33 #include "XrdSys/XrdSysTrace.hh"
34 
36 
37 #include "XrdOss/XrdOss.hh"
38 
39 #include "XrdPfc.hh"
40 #include "XrdPfcTrace.hh"
41 #include "XrdPfcFSctl.hh"
42 #include "XrdPfcInfo.hh"
43 #include "XrdPfcIOFile.hh"
44 #include "XrdPfcIOFileBlock.hh"
45 
46 using namespace XrdPfc;
47 
48 Cache * Cache::m_instance = 0;
49 
51 
52 
54 {
56  return 0;
57 }
58 
59 void *PurgeThread(void*)
60 {
62  return 0;
63 }
64 
66 {
68  return 0;
69 }
70 
71 void *PrefetchThread(void*)
72 {
74  return 0;
75 }
76 
77 //==============================================================================
78 
79 extern "C"
80 {
82  const char *config_filename,
83  const char *parameters,
84  XrdOucEnv *env)
85 {
86  XrdSysError err(logger, "");
87  err.Say("++++++ Proxy file cache initialization started.");
88 
89  if ( ! env ||
90  ! (XrdPfc::Cache::schedP = (XrdScheduler*) env->GetPtr("XrdScheduler*")))
91  {
94  }
95 
96  Cache &instance = Cache::CreateInstance(logger, env);
97 
98  if (! instance.Config(config_filename, parameters))
99  {
100  err.Say("Config Proxy file cache initialization failed.");
101  return 0;
102  }
103  err.Say("------ Proxy file cache initialization completed.");
104 
105  {
106  pthread_t tid;
107 
108  for (int wti = 0; wti < instance.RefConfiguration().m_wqueue_threads; ++wti)
109  {
110  XrdSysThread::Run(&tid, ProcessWriteTaskThread, 0, 0, "XrdPfc WriteTasks ");
111  }
112 
113  if (instance.RefConfiguration().m_prefetch_max_blocks > 0)
114  {
115  XrdSysThread::Run(&tid, PrefetchThread, 0, 0, "XrdPfc Prefetch ");
116  }
117 
118  XrdSysThread::Run(&tid, ResourceMonitorHeartBeatThread, 0, 0, "XrdPfc ResourceMonitorHeartBeat");
119 
120  XrdSysThread::Run(&tid, PurgeThread, 0, 0, "XrdPfc Purge");
121  }
122 
123  XrdPfcFSctl* pfcFSctl = new XrdPfcFSctl(instance, logger);
124  env->PutPtr("XrdFSCtl_PC*", pfcFSctl);
125 
126  return &instance;
127 }
128 }
129 
130 //==============================================================================
131 
132 void Configuration::calculate_fractional_usages(long long du, long long fu,
133  double &frac_du, double &frac_fu)
134 {
135  // Calculate fractional disk / file usage and clamp them to [0, 1].
136 
137  // Fractional total usage above LWM:
138  // - can be > 1 if usage is above HWM;
139  // - can be < 0 if triggered via age-based-purging.
140  frac_du = (double) (du - m_diskUsageLWM) / (m_diskUsageHWM - m_diskUsageLWM);
141 
142  // Fractional file usage above baseline.
143  // - can be > 1 if file usage is above max;
144  // - can be < 0 if file usage is below baseline.
145  frac_fu = (double) (fu - m_fileUsageBaseline) / (m_fileUsageMax - m_fileUsageBaseline);
146 
147  frac_du = std::min( std::max( frac_du, 0.0), 1.0 );
148  frac_fu = std::min( std::max( frac_fu, 0.0), 1.0 );
149 }
150 
151 //==============================================================================
152 
154 {
155  assert (m_instance == 0);
156  m_instance = new Cache(logger, env);
157  return *m_instance;
158 }
159 
160  Cache& Cache::GetInstance() { return *m_instance; }
161 const Cache& Cache::TheOne() { return *m_instance; }
162 const Configuration& Cache::Conf() { return m_instance->RefConfiguration(); }
163 
165 {
166  if (! m_decisionpoints.empty())
167  {
168  XrdCl::URL url(io->Path());
169  std::string filename = url.GetPath();
170  std::vector<Decision*>::const_iterator it;
171  for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
172  {
173  XrdPfc::Decision *d = *it;
174  if (! d) continue;
175  if (! d->Decide(filename, *m_oss))
176  {
177  return false;
178  }
179  }
180  }
181 
182  return true;
183 }
184 
186  XrdOucCache("pfc"),
187  m_env(env),
188  m_log(logger, "XrdPfc_"),
189  m_trace(new XrdSysTrace("XrdPfc", logger)),
190  m_traceID("Cache"),
191  m_oss(0),
192  m_gstream(0),
193  m_prefetch_condVar(0),
194  m_prefetch_enabled(false),
195  m_RAM_used(0),
196  m_RAM_write_queue(0),
197  m_RAM_std_size(0),
198  m_isClient(false),
199  m_in_purge(false),
200  m_active_cond(0),
201  m_stats_n_purge_cond(0),
202  m_fs_state(0),
203  m_last_scan_duration(0),
204  m_last_purge_duration(0),
205  m_spt_state(SPTS_Idle)
206 {
207  // Default log level is Warning.
208  m_trace->What = 2;
209 }
210 
212 {
213  const char* tpfx = "Attach() ";
214 
215  if (Cache::GetInstance().Decide(io))
216  {
217  TRACE(Info, tpfx << obfuscateAuth(io->Path()));
218 
219  IO *cio;
220 
221  if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
222  {
223  cio = new IOFileBlock(io, *this);
224  }
225  else
226  {
227  IOFile *iof = new IOFile(io, *this);
228 
229  if ( ! iof->HasFile())
230  {
231  delete iof;
232  // TODO - redirect instead. But this is kind of an awkward place for it.
233  // errno is set during IOFile construction.
234  TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
235  return io;
236  }
237 
238  cio = iof;
239  }
240 
241  TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
242  ((loc && loc[0] != 0) ? loc : "<deferred open>"));
243 
244  return cio;
245  }
246  else
247  {
248  TRACE(Info, tpfx << "decision decline " << io->Path());
249  }
250  return io;
251 }
252 
253 void Cache::AddWriteTask(Block* b, bool fromRead)
254 {
255  TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
256 
257  {
258  XrdSysMutexHelper lock(&m_RAM_mutex);
259  m_RAM_write_queue += b->get_size();
260  }
261 
262  m_writeQ.condVar.Lock();
263  if (fromRead)
264  m_writeQ.queue.push_back(b);
265  else
266  m_writeQ.queue.push_front(b);
267  m_writeQ.size++;
268  m_writeQ.condVar.Signal();
269  m_writeQ.condVar.UnLock();
270 }
271 
273 {
274  std::list<Block*> removed_blocks;
275  long long sum_size = 0;
276 
277  m_writeQ.condVar.Lock();
278  std::list<Block*>::iterator i = m_writeQ.queue.begin();
279  while (i != m_writeQ.queue.end())
280  {
281  if ((*i)->m_file == file)
282  {
283  TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
284  std::list<Block*>::iterator j = i++;
285  removed_blocks.push_back(*j);
286  sum_size += (*j)->get_size();
287  m_writeQ.queue.erase(j);
288  --m_writeQ.size;
289  }
290  else
291  {
292  ++i;
293  }
294  }
295  m_writeQ.condVar.UnLock();
296 
297  {
298  XrdSysMutexHelper lock(&m_RAM_mutex);
299  m_RAM_write_queue -= sum_size;
300  }
301 
302  file->BlocksRemovedFromWriteQ(removed_blocks);
303 }
304 
306 {
307  std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
308 
309  while (true)
310  {
311  m_writeQ.condVar.Lock();
312  while (m_writeQ.size == 0)
313  {
314  m_writeQ.condVar.Wait();
315  }
316 
317  // MT -- optimize to pop several blocks if they are available (or swap the list).
318  // This makes sense especially for smallish block sizes.
319 
320  int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
321  long long sum_size = 0;
322 
323  for (int bi = 0; bi < n_pushed; ++bi)
324  {
325  Block* block = m_writeQ.queue.front();
326  m_writeQ.queue.pop_front();
327  m_writeQ.writes_between_purges += block->get_size();
328  sum_size += block->get_size();
329 
330  blks_to_write[bi] = block;
331 
332  TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
333  }
334  m_writeQ.size -= n_pushed;
335 
336  m_writeQ.condVar.UnLock();
337 
338  {
339  XrdSysMutexHelper lock(&m_RAM_mutex);
340  m_RAM_write_queue -= sum_size;
341  }
342 
343  for (int bi = 0; bi < n_pushed; ++bi)
344  {
345  Block* block = blks_to_write[bi];
346 
347  block->m_file->WriteBlockToDisk(block);
348  }
349  }
350 }
351 
352 //==============================================================================
353 
354 char* Cache::RequestRAM(long long size)
355 {
356  static const size_t s_block_align = sysconf(_SC_PAGESIZE);
357 
358  bool std_size = (size == m_configuration.m_bufferSize);
359 
360  m_RAM_mutex.Lock();
361 
362  long long total = m_RAM_used + size;
363 
364  if (total <= m_configuration.m_RamAbsAvailable)
365  {
366  m_RAM_used = total;
367  if (std_size && m_RAM_std_size > 0)
368  {
369  char *buf = m_RAM_std_blocks.back();
370  m_RAM_std_blocks.pop_back();
371  --m_RAM_std_size;
372 
373  m_RAM_mutex.UnLock();
374 
375  return buf;
376  }
377  else
378  {
379  m_RAM_mutex.UnLock();
380  char *buf;
381  if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
382  {
383  // Report out of mem? Probably should report it at least the first time,
384  // then periodically.
385  return 0;
386  }
387  return buf;
388  }
389  }
390  m_RAM_mutex.UnLock();
391  return 0;
392 }
393 
394 void Cache::ReleaseRAM(char* buf, long long size)
395 {
396  bool std_size = (size == m_configuration.m_bufferSize);
397  {
398  XrdSysMutexHelper lock(&m_RAM_mutex);
399 
400  m_RAM_used -= size;
401 
402  if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
403  {
404  m_RAM_std_blocks.push_back(buf);
405  ++m_RAM_std_size;
406  return;
407  }
408  }
409  free(buf);
410 }
411 
412 File* Cache::GetFile(const std::string& path, IO* io, long long off, long long filesize)
413 {
414  // Called from virtual IO::Attach
415 
416  TRACE(Debug, "GetFile " << path << ", io " << io);
417 
418  ActiveMap_i it;
419 
420  {
421  XrdSysCondVarHelper lock(&m_active_cond);
422 
423  while (true)
424  {
425  it = m_active.find(path);
426 
427  // File is not open or being opened. Mark it as being opened and
428  // proceed to opening it outside of while loop.
429  if (it == m_active.end())
430  {
431  it = m_active.insert(std::make_pair(path, (File*) 0)).first;
432  break;
433  }
434 
435  if (it->second != 0)
436  {
437  it->second->AddIO(io);
438  inc_ref_cnt(it->second, false, true);
439 
440  return it->second;
441  }
442  else
443  {
444  // Wait for some change in m_active, then recheck.
445  m_active_cond.Wait();
446  }
447  }
448  }
449 
450  if (filesize == 0)
451  {
452  struct stat st;
453  int res = io->Fstat(st);
454  if (res < 0) {
455  errno = res;
456  TRACE(Error, "GetFile, could not get valid stat");
457  } else if (res > 0) {
458  errno = ENOTSUP;
459  TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
460  } else {
461  filesize = st.st_size;
462  }
463  }
464 
465  File *file = 0;
466 
467  if (filesize >= 0)
468  {
469  file = File::FileOpen(path, off, filesize);
470  }
471 
472  {
473  XrdSysCondVarHelper lock(&m_active_cond);
474 
475  if (file)
476  {
477  inc_ref_cnt(file, false, true);
478  it->second = file;
479 
480  file->AddIO(io);
481  }
482  else
483  {
484  m_active.erase(it);
485  }
486 
487  m_active_cond.Broadcast();
488  }
489 
490  return file;
491 }
492 
494 {
495  // Called from virtual IO::DetachFinalize.
496 
497  TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
498 
499  {
500  XrdSysCondVarHelper lock(&m_active_cond);
501 
502  f->RemoveIO(io);
503  }
504  dec_ref_cnt(f, true);
505 }
506 
507 
508 namespace
509 {
510 
511 class DiskSyncer : public XrdJob
512 {
513 private:
514  File *m_file;
515  bool m_high_debug;
516 
517 public:
518  DiskSyncer(File *f, bool high_debug, const char *desc = "") :
519  XrdJob(desc),
520  m_file(f),
521  m_high_debug(high_debug)
522  {}
523 
524  void DoIt()
525  {
526  m_file->Sync();
527  Cache::GetInstance().FileSyncDone(m_file, m_high_debug);
528  delete this;
529  }
530 };
531 
532 
533 class CommandExecutor : public XrdJob
534 {
535 private:
536  std::string m_command_url;
537 
538 public:
539  CommandExecutor(const std::string& command, const char *desc = "") :
540  XrdJob(desc),
541  m_command_url(command)
542  {}
543 
544  void DoIt()
545  {
546  Cache::GetInstance().ExecuteCommandUrl(m_command_url);
547  delete this;
548  }
549 };
550 
551 }
552 
553 //==============================================================================
554 
555 void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set, bool high_debug)
556 {
557  DiskSyncer* ds = new DiskSyncer(f, high_debug);
558 
559  if ( ! ref_cnt_already_set) inc_ref_cnt(f, true, high_debug);
560 
561  schedP->Schedule(ds);
562 }
563 
564 void Cache::FileSyncDone(File* f, bool high_debug)
565 {
566  dec_ref_cnt(f, high_debug);
567 }
568 
569 void Cache::inc_ref_cnt(File* f, bool lock, bool high_debug)
570 {
571  // called from GetFile() or SheduleFileSync();
572 
573  int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
574 
575  if (lock) m_active_cond.Lock();
576  int rc = f->inc_ref_cnt();
577  if (lock) m_active_cond.UnLock();
578 
579  TRACE_INT(tlvl, "inc_ref_cnt " << f->GetLocalPath() << ", cnt at exit = " << rc);
580 }
581 
582 void Cache::dec_ref_cnt(File* f, bool high_debug)
583 {
584  // Called from ReleaseFile() or DiskSync callback.
585 
586  int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
587  int cnt;
588 
589  {
590  XrdSysCondVarHelper lock(&m_active_cond);
591 
592  cnt = f->get_ref_cnt();
593 
594  if (f->is_in_emergency_shutdown())
595  {
596  // In this case file has been already removed from m_active map and
597  // does not need to be synced.
598 
599  if (cnt == 1)
600  {
601  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
602  << " -- deleting File object without further ado");
603  delete f;
604  }
605  else
606  {
607  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
608  << " -- waiting");
609  }
610 
611  return;
612  }
613  }
614 
615  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);
616 
617  if (cnt == 1)
618  {
619  if (f->FinalizeSyncBeforeExit())
620  {
621  // Note, here we "reuse" the existing reference count for the
622  // final sync.
623 
624  TRACE(Debug, "dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync");
625  schedule_file_sync(f, true, true);
626  return;
627  }
628  }
629 
630  {
631  XrdSysCondVarHelper lock(&m_active_cond);
632 
633  cnt = f->dec_ref_cnt();
634  TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
635  if (cnt == 0)
636  {
637  ActiveMap_i it = m_active.find(f->GetLocalPath());
638  m_active.erase(it);
639 
640  m_closed_files_stats.insert(std::make_pair(f->GetLocalPath(), f->DeltaStatsFromLastCall()));
641 
642  if (m_gstream)
643  {
644  const Stats &st = f->RefStats();
645  const Info::AStat *as = f->GetLastAccessStats();
646 
647  char buf[4096];
648  int len = snprintf(buf, 4096, "{\"event\":\"file_close\","
649  "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d,"
650  "\"access_cnt\":%lu,\"attach_t\":%lld,\"detach_t\":%lld,\"remotes\":%s,"
651  "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld,\"n_cks_errs\":%d}",
652  f->GetLocalPath().c_str(), f->GetFileSize(), f->GetBlockSize(),
653  f->GetNBlocks(), f->GetNDownloadedBlocks(),
654  (unsigned long) f->GetAccessCnt(), (long long) as->AttachTime, (long long) as->DetachTime,
655  f->GetRemoteLocations().c_str(),
657  );
658  bool suc = false;
659  if (len < 4096)
660  {
661  suc = m_gstream->Insert(buf, len + 1);
662  }
663  if ( ! suc)
664  {
665  TRACE(Error, "Failed g-stream insertion of file_close record, len=" << len);
666  }
667  }
668 
669  delete f;
670  }
671  }
672 }
673 
674 bool Cache::IsFileActiveOrPurgeProtected(const std::string& path)
675 {
676  XrdSysCondVarHelper lock(&m_active_cond);
677 
678  return m_active.find(path) != m_active.end() ||
679  m_purge_delay_set.find(path) != m_purge_delay_set.end();
680 }
681 
682 
683 //==============================================================================
684 //=== PREFETCH
685 //==============================================================================
686 
688 {
689  // Can be called with other locks held.
690 
691  if ( ! m_prefetch_enabled)
692  {
693  return;
694  }
695 
696  m_prefetch_condVar.Lock();
697  m_prefetchList.push_back(file);
698  m_prefetch_condVar.Signal();
699  m_prefetch_condVar.UnLock();
700 }
701 
702 
704 {
705  // Can be called with other locks held.
706 
707  if ( ! m_prefetch_enabled)
708  {
709  return;
710  }
711 
712  m_prefetch_condVar.Lock();
713  for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
714  {
715  if (*it == file)
716  {
717  m_prefetchList.erase(it);
718  break;
719  }
720  }
721  m_prefetch_condVar.UnLock();
722 }
723 
724 
726 {
727  m_prefetch_condVar.Lock();
728  while (m_prefetchList.empty())
729  {
730  m_prefetch_condVar.Wait();
731  }
732 
733  // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
734 
735  size_t l = m_prefetchList.size();
736  int idx = rand() % l;
737  File* f = m_prefetchList[idx];
738 
739  m_prefetch_condVar.UnLock();
740  return f;
741 }
742 
743 
745 {
746  const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
747 
748  while (true)
749  {
750  m_RAM_mutex.Lock();
751  bool doPrefetch = (m_RAM_used < limit_RAM);
752  m_RAM_mutex.UnLock();
753 
754  if (doPrefetch)
755  {
757  f->Prefetch();
758  }
759  else
760  {
762  }
763  }
764 }
765 
766 
767 //==============================================================================
768 //=== Virtuals from XrdOucCache
769 //==============================================================================
770 
771 //------------------------------------------------------------------------------
785 
786 int Cache::LocalFilePath(const char *curl, char *buff, int blen,
787  LFP_Reason why, bool forall)
788 {
789  static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
790  static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
791  static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
792 
793  TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
794 
795  if (buff && blen > 0) buff[0] = 0;
796 
797  XrdCl::URL url(curl);
798  std::string f_name = url.GetPath();
799  std::string i_name = f_name + Info::s_infoExtension;
800 
801  if (why == ForPath)
802  {
803  int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
804  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
805  return ret;
806  }
807 
808  {
809  XrdSysCondVarHelper lock(&m_active_cond);
810  m_purge_delay_set.insert(f_name);
811  }
812 
813  struct stat sbuff, sbuff2;
814  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
815  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
816  {
817  if (S_ISDIR(sbuff.st_mode))
818  {
819  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
820  return -EISDIR;
821  }
822  else
823  {
824  bool read_ok = false;
825  bool is_complete = false;
826 
827  // Lock and check if the file is active. If NOT, keep the lock
828  // and add dummy access after successful reading of info file.
829  // If it IS active, just release the lock, this ongoing access will
830  // assure the file continues to exist.
831 
832  // XXXX How can I just loop over the cinfo file when active?
833  // Can I not get is_complete from the existing file?
834  // Do I still want to inject access record?
835  // Oh, it writes only if not active .... still let's try to use existing File.
836 
837  m_active_cond.Lock();
838 
839  bool is_active = m_active.find(f_name) != m_active.end();
840 
841  if (is_active) m_active_cond.UnLock();
842 
843  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
844  XrdOucEnv myEnv;
845  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
846  if (res >= 0)
847  {
848  Info info(m_trace, 0);
849  if (info.Read(infoFile, i_name.c_str()))
850  {
851  read_ok = true;
852 
853  is_complete = info.IsComplete();
854 
855  // Add full-size access if reason is for access.
856  if ( ! is_active && is_complete && why == ForAccess)
857  {
858  info.WriteIOStatSingle(info.GetFileSize());
859  info.Write(infoFile, i_name.c_str());
860  }
861  }
862  infoFile->Close();
863  }
864  delete infoFile;
865 
866  if ( ! is_active) m_active_cond.UnLock();
867 
868  if (read_ok)
869  {
870  if ((is_complete || why == ForInfo) && buff != 0)
871  {
872  int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
873  if (res2 < 0)
874  return res2;
875 
876  // Normally, files are owned by us but when direct cache access
877  // is wanted and possible, make sure the file is world readable.
878  if (why == ForAccess)
879  {mode_t mode = (forall ? worldReadable : groupReadable);
880  if (((sbuff.st_mode & worldReadable) != mode)
881  && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
882  {is_complete = false;
883  *buff = 0;
884  }
885  }
886  }
887 
888  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
889  (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
890 
891  return is_complete ? 0 : -EREMOTE;
892  }
893  }
894  }
895 
896  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
897  return -ENOENT;
898 }
899 
900 //______________________________________________________________________________
901 // Check if the file is cached including m_onlyIfCachedMinSize and m_onlyIfCachedMinFrac
902 // pfc configuration parameters. The logic of accessing the Info file is the same
903 // as in Cache::LocalFilePath.
913 //------------------------------------------------------------------------------
914 int Cache::ConsiderCached(const char *curl)
915 {
916  TRACE(Debug, "ConsiderFileCached '" << curl << "'" );
917 
918  XrdCl::URL url(curl);
919  std::string f_name = url.GetPath();
920  std::string i_name = f_name + Info::s_infoExtension;
921 
922  {
923  XrdSysCondVarHelper lock(&m_active_cond);
924  m_purge_delay_set.insert(f_name);
925  }
926 
927  struct stat sbuff, sbuff2;
928  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
929  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
930  {
931  if (S_ISDIR(sbuff.st_mode))
932  {
933  TRACE(Info, "ConsiderCached '" << curl << ", why=ForInfo" << " -> EISDIR");
934  return -EISDIR;
935  }
936  else
937  {
938  bool read_ok = false;
939  bool is_cached = false;
940 
941  // Lock and check if the file is active. If NOT, keep the lock
942  // and add dummy access after successful reading of info file.
943  // If it IS active, just release the lock, this ongoing access will
944  // assure the file continues to exist.
945 
946  // XXXX How can I just loop over the cinfo file when active?
947  // Can I not get is_complete from the existing file?
948  // Do I still want to inject access record?
949  // Oh, it writes only if not active .... still let's try to use existing File.
950 
951  m_active_cond.Lock();
952 
953  bool is_active = m_active.find(f_name) != m_active.end();
954 
955  if (is_active)
956  m_active_cond.UnLock();
957 
958  XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
959  XrdOucEnv myEnv;
960  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
961  if (res >= 0)
962  {
963  Info info(m_trace, 0);
964  if (info.Read(infoFile, i_name.c_str()))
965  {
966  read_ok = true;
967 
968  if (info.IsComplete())
969  {
970  is_cached = true;
971  }
972  else if (info.GetFileSize() == 0)
973  {
974  is_cached = true;
975  }
976  else
977  {
978  long long fileSize = info.GetFileSize();
979  long long bytesRead = info.GetNDownloadedBytes();
980 
981  if (fileSize < m_configuration.m_onlyIfCachedMinSize)
982  {
983  if ((float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
984  is_cached = true;
985  }
986  else
987  {
988  if (bytesRead > m_configuration.m_onlyIfCachedMinSize &&
989  (float)bytesRead / fileSize > m_configuration.m_onlyIfCachedMinFrac)
990  is_cached = true;
991  }
992  }
993  }
994  infoFile->Close();
995  }
996  delete infoFile;
997 
998  if (!is_active) m_active_cond.UnLock();
999 
1000  if (read_ok)
1001  {
1002  TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << (is_cached ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
1003  return is_cached ? 0 : -EREMOTE;
1004  }
1005  }
1006  }
1007 
1008  TRACE(Info, "ConsiderCached '" << curl << "', why=ForInfo" << " -> ENOENT");
1009  return -ENOENT;
1010 }
1011 
1012 //______________________________________________________________________________
1020 //------------------------------------------------------------------------------
1021 
1022 int Cache::Prepare(const char *curl, int oflags, mode_t mode)
1023 {
1024  XrdCl::URL url(curl);
1025  std::string f_name = url.GetPath();
1026  std::string i_name = f_name + Info::s_infoExtension;
1027 
1028  // Do not allow write access.
1029  if (oflags & (O_WRONLY | O_RDWR | O_APPEND | O_CREAT))
1030  {
1031  TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1032  return -EROFS;
1033  }
1034 
1035  // Intercept xrdpfc_command requests.
1036  if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1037  {
1038  // Schedule a job to process command request.
1039  {
1040  CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1041 
1042  schedP->Schedule(ce);
1043  }
1044 
1045  return -EAGAIN;
1046  }
1047 
1048  {
1049  XrdSysCondVarHelper lock(&m_active_cond);
1050  m_purge_delay_set.insert(f_name);
1051  }
1052 
1053  struct stat sbuff;
1054  int res = m_oss->Stat(i_name.c_str(), &sbuff);
1055  if (res == 0)
1056  {
1057  TRACE(Dump, "Prepare defer open " << f_name);
1058  return 1;
1059  }
1060  else
1061  {
1062  return 0;
1063  }
1064 }
1065 
1066 //______________________________________________________________________________
1067 // virtual method of XrdOucCache.
1072 //------------------------------------------------------------------------------
1073 
1074 int Cache::Stat(const char *curl, struct stat &sbuff)
1075 {
1076  XrdCl::URL url(curl);
1077  std::string f_name = url.GetPath();
1078 
1079  {
1080  XrdSysCondVarHelper lock(&m_active_cond);
1081  m_purge_delay_set.insert(f_name);
1082  }
1083 
1084  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK)
1085  {
1086  if (S_ISDIR(sbuff.st_mode))
1087  {
1088  return 0;
1089  }
1090  else
1091  {
1092  bool success = false;
1093  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
1094  XrdOucEnv myEnv;
1095 
1096  f_name += Info::s_infoExtension;
1097  int res = infoFile->Open(f_name.c_str(), O_RDONLY, 0600, myEnv);
1098  if (res >= 0)
1099  {
1100  Info info(m_trace, 0);
1101  if (info.Read(infoFile, f_name.c_str()))
1102  {
1103  sbuff.st_size = info.GetFileSize();
1104  success = true;
1105  }
1106  }
1107  infoFile->Close();
1108  delete infoFile;
1109  return success ? 0 : 1;
1110  }
1111  }
1112 
1113  return 1;
1114 }
1115 
1116 //______________________________________________________________________________
1117 // virtual method of XrdOucCache.
1121 //------------------------------------------------------------------------------
1122 
1123 int Cache::Unlink(const char *curl)
1124 {
1125  XrdCl::URL url(curl);
1126  std::string f_name = url.GetPath();
1127 
1128  // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1129 
1130  return UnlinkFile(f_name, false);
1131 }
1132 
1133 int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open)
1134 {
1135  ActiveMap_i it;
1136  File *file = 0;
1137  {
1138  XrdSysCondVarHelper lock(&m_active_cond);
1139 
1140  it = m_active.find(f_name);
1141 
1142  if (it != m_active.end())
1143  {
1144  if (fail_if_open)
1145  {
1146  TRACE(Info, "UnlinkCommon " << f_name << ", file currently open and force not requested - denying request");
1147  return -EBUSY;
1148  }
1149 
1150  // Null File* in m_active map means an operation is ongoing, probably
1151  // Attach() with possible File::Open(). Ask for retry.
1152  if (it->second == 0)
1153  {
1154  TRACE(Info, "UnlinkCommon " << f_name << ", an operation on this file is ongoing - denying request");
1155  return -EAGAIN;
1156  }
1157 
1158  file = it->second;
1160  it->second = 0;
1161  }
1162  else
1163  {
1164  it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1165  }
1166  }
1167 
1168  if (file)
1169  {
1170  RemoveWriteQEntriesFor(file);
1171  }
1172 
1173  std::string i_name = f_name + Info::s_infoExtension;
1174 
1175  // Unlink file & cinfo
1176  int f_ret = m_oss->Unlink(f_name.c_str());
1177  int i_ret = m_oss->Unlink(i_name.c_str());
1178 
1179  TRACE(Debug, "UnlinkCommon " << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1180 
1181  {
1182  XrdSysCondVarHelper lock(&m_active_cond);
1183 
1184  m_active.erase(it);
1185  }
1186 
1187  return std::min(f_ret, i_ret);
1188 }
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
Definition: XrdAccTest.cc:262
#define TRACE_Debug
Definition: XrdCmsTrace.hh:37
#define XrdOssOK
Definition: XrdOss.hh:50
std::string obfuscateAuth(const std::string &input)
#define TRACE_Dump
Definition: XrdPfcTrace.hh:11
#define TRACE_PC(act, pre_code, x)
Definition: XrdPfcTrace.hh:55
#define TRACE_INT(act, x)
Definition: XrdPfcTrace.hh:47
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:81
void * PrefetchThread(void *)
Definition: XrdPfc.cc:71
void * PurgeThread(void *)
Definition: XrdPfc.cc:59
void * ResourceMonitorHeartBeatThread(void *)
Definition: XrdPfc.cc:53
void * ProcessWriteTaskThread(void *)
Definition: XrdPfc.cc:65
int stat(const char *path, struct stat *buf)
@ Warning
#define TRACE(act, x)
Definition: XrdTrace.hh:63
URL representation.
Definition: XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
Definition: XrdJob.hh:43
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:873
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
virtual const char * Path()=0
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:148
virtual const char * Location(bool refresh=false)
Definition: XrdOucCache.hh:161
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:263
void PutPtr(const char *varname, void *value)
Definition: XrdOucEnv.cc:298
int get_size() const
Definition: XrdPfcFile.hh:146
File * get_file() const
Definition: XrdPfcFile.hh:150
long long m_offset
Definition: XrdPfcFile.hh:124
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:267
void FileSyncDone(File *, bool high_debug)
Definition: XrdPfc.cc:564
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition: XrdPfc.cc:412
static const Configuration & Conf()
Definition: XrdPfc.cc:162
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
Definition: XrdPfc.cc:786
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1074
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:315
void Purge()
Thread function invoked to scan and purge files from disk when needed.
Definition: XrdPfcPurge.cc:698
void ReleaseRAM(char *buf, long long size)
Definition: XrdPfc.cc:394
virtual int ConsiderCached(const char *url)
Definition: XrdPfc.cc:914
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:160
void ResourceMonitorHeartBeat()
Thread function checking resource usage periodically.
Definition: XrdPfcPurge.cc:606
void DeRegisterPrefetchFile(File *)
Definition: XrdPfc.cc:703
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
Definition: XrdPfc.cc:687
void Prefetch()
Definition: XrdPfc.cc:744
void ReleaseFile(File *, IO *)
Definition: XrdPfc.cc:493
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Definition: XrdPfc.cc:253
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition: XrdPfc.cc:185
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition: XrdPfc.cc:164
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1133
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *ioP, int opts=0)=0
Obtain a new IO object that fronts existing XrdOucCacheIO.
static XrdScheduler * schedP
Definition: XrdPfc.hh:404
bool IsFileActiveOrPurgeProtected(const std::string &)
Definition: XrdPfc.cc:674
File * GetNextFileToPrefetch()
Definition: XrdPfc.cc:725
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
Definition: XrdPfc.cc:305
virtual int Unlink(const char *url)
Definition: XrdPfc.cc:1123
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition: XrdPfc.cc:272
static const Cache & TheOne()
Definition: XrdPfc.cc:161
char * RequestRAM(long long size)
Definition: XrdPfc.cc:354
virtual int Prepare(const char *url, int oflags, mode_t mode)
Definition: XrdPfc.cc:1022
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Definition: XrdPfc.cc:153
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
Definition: XrdPfcFile.cc:271
const char * lPath() const
Log path.
Definition: XrdPfcFile.cc:1416
void WriteBlockToDisk(Block *b)
Definition: XrdPfcFile.cc:963
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition: XrdPfcFile.cc:99
int GetNBlocks() const
Definition: XrdPfcFile.hh:292
void Prefetch()
Definition: XrdPfcFile.cc:1431
std::string GetRemoteLocations() const
Definition: XrdPfcFile.cc:1528
const Info::AStat * GetLastAccessStats() const
Definition: XrdPfcFile.hh:289
size_t GetAccessCnt() const
Definition: XrdPfcFile.hh:290
void AddIO(IO *io)
Definition: XrdPfcFile.cc:295
int GetBlockSize() const
Definition: XrdPfcFile.hh:291
int GetNDownloadedBlocks() const
Definition: XrdPfcFile.hh:293
long long GetFileSize()
Definition: XrdPfcFile.hh:279
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
Definition: XrdPfcFile.cc:165
void initiate_emergency_shutdown()
Definition: XrdPfcFile.cc:112
int inc_ref_cnt()
Definition: XrdPfcFile.hh:298
void Sync()
Sync file cache inf o and output data with disk.
Definition: XrdPfcFile.cc:1046
int dec_ref_cnt()
Definition: XrdPfcFile.hh:299
int get_ref_cnt()
Definition: XrdPfcFile.hh:297
const Stats & RefStats() const
Definition: XrdPfcFile.hh:294
void RemoveIO(IO *io)
Definition: XrdPfcFile.cc:332
std::string & GetLocalPath()
Definition: XrdPfcFile.hh:274
Stats DeltaStatsFromLastCall()
Definition: XrdPfcFile.cc:142
bool is_in_emergency_shutdown()
Definition: XrdPfcFile.hh:302
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:40
bool HasFile() const
Check if File was opened successfully.
Definition: XrdPfcIOFile.hh:49
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:18
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:45
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:313
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
Definition: XrdPfcInfo.cc:446
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:268
long long GetNDownloadedBytes() const
Get number of downloaded bytes.
Definition: XrdPfcInfo.hh:411
bool IsComplete() const
Get complete status.
Definition: XrdPfcInfo.hh:451
long long GetFileSize() const
Get file size.
Definition: XrdPfcInfo.hh:446
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Definition: XrdPfcInfo.cc:296
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:31
int m_NCksumErrors
number of checksum errors while getting data from remote
Definition: XrdPfcStats.hh:39
void Schedule(XrdJob *jp)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227
bool Insert(const char *data, int dlen)
Definition: XrdPfc.hh:41
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:56
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:102
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:90
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:88
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:79
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:87
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:106
void calculate_fractional_usages(long long du, long long fu, double &frac_du, double &frac_fu)
Definition: XrdPfc.cc:132
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:86
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:103
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:101
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:104
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:81
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:116
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:115
Access statistics.
Definition: XrdPfcInfo.hh:61
long long BytesHit
read from cache
Definition: XrdPfcInfo.hh:68
long long BytesBypassed
read from remote and dropped
Definition: XrdPfcInfo.hh:70
time_t DetachTime
close time
Definition: XrdPfcInfo.hh:63
long long BytesMissed
read from remote and cached
Definition: XrdPfcInfo.hh:69
time_t AttachTime
open time
Definition: XrdPfcInfo.hh:62