XRootD
XrdPfc::Cache Class Reference

Attaches/creates and detaches/deletes cache-io objects for disk based cache. More...

#include <XrdPfc.hh>

+ Inheritance diagram for XrdPfc::Cache:
+ Collaboration diagram for XrdPfc::Cache:

Public Member Functions

 Cache (XrdSysLogger *logger, XrdOucEnv *env)
 Constructor. More...
 
void AddWriteTask (Block *b, bool from_read)
 Add downloaded block in write queue. More...
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *, int Options=0)
 
virtual XrdOucCacheIOAttach (XrdOucCacheIO *ioP, int opts=0)=0
 Obtain a new IO object that fronts existing XrdOucCacheIO. More...
 
bool blocksize_str2value (const char *from, const char *str, long long &val, long long min, long long max) const
 
void ClearPurgeProtectedSet ()
 
bool Config (const char *config_filename, const char *parameters, XrdOucEnv *env)
 Parse configuration file. More...
 
virtual int ConsiderCached (const char *url)
 
bool Decide (XrdOucCacheIO *)
 Makes decision if the original XrdOucCacheIO should be cached. More...
 
bool DecideIfConsideredCached (long long file_size, long long bytes_on_disk)
 
void DeRegisterPrefetchFile (File *)
 
long long DetermineFullFileSize (const std::string &cinfo_fname)
 
void ExecuteCommandUrl (const std::string &command_url)
 
void FileSyncDone (File *, bool high_debug)
 
int GetCacheControlXAttr (const std::string &cinfo_fname, std::string &res) const
 
int GetCacheControlXAttr (int fd, std::string &res) const
 
FileGetFile (const std::string &, IO *, long long off=0, long long filesize=0)
 
XrdXrootdGStreamGetGStream ()
 
XrdSysErrorGetLog () const
 
FileGetNextFileToPrefetch ()
 
XrdOssGetOss () const
 
PurgePinGetPurgePin () const
 
XrdSysTraceGetTrace () const
 
bool is_prefetch_enabled () const
 
bool IsFileActiveOrPurgeProtected (const std::string &) const
 
virtual int LocalFilePath (const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
 
void Prefetch ()
 
bool prefetch_str2value (const char *from, const char *str, int &val, int min, int max) const
 
virtual int Prepare (const char *url, int oflags, mode_t mode)
 
void ProcessWriteTasks ()
 Separate task which writes blocks from ram to disk. More...
 
const ConfigurationRefConfiguration () const
 Reference XrdPfc configuration. More...
 
ResourceMonitorRefResMon ()
 
void RegisterPrefetchFile (File *)
 
void ReleaseFile (File *, IO *)
 
void ReleaseRAM (char *buf, long long size)
 
void RemoveWriteQEntriesFor (File *f)
 Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction. More...
 
char * RequestRAM (long long size)
 
void ScheduleFileSync (File *f)
 
virtual int Stat (const char *url, struct stat &sbuff)
 
virtual int Unlink (const char *url)
 
int UnlinkFile (const std::string &f_name, bool fail_if_open)
 Remove cinfo and data files from cache. More...
 
void WriteCacheControlXAttr (int cinfo_fd, const char *path, const std::string &cc)
 
void WriteFileSizeXAttr (int cinfo_fd, long long file_size)
 
long long WritesSinceLastCall ()
 
- Public Member Functions inherited from XrdOucCache
 XrdOucCache (const char *ctype)
 
virtual ~XrdOucCache ()
 Destructor. More...
 
virtual int Fcntl (XrdOucCacheOp::Code opc, const std::string &args, std::string &resp)
 
virtual int Rename (const char *oldp, const char *newp)
 
virtual int Rmdir (const char *dirp)
 
virtual int Truncate (const char *path, off_t size)
 
virtual int Xeq (XeqCmd cmd, char *arg, int arglen)
 

Static Public Member Functions

static const ConfigurationConf ()
 
static CacheCreateInstance (XrdSysLogger *logger, XrdOucEnv *env)
 Singleton creation. More...
 
static CacheGetInstance ()
 Singleton access. More...
 
static ResourceMonitorResMon ()
 
static const CacheTheOne ()
 
static bool VCheck (XrdVersionInfo &urVersion)
 Version check. More...
 

Static Public Attributes

static XrdSchedulerschedP = nullptr
 
- Static Public Attributes inherited from XrdOucCache
static const int optFIS = 0x0001
 File is structured (e.g. root file) More...
 
static const int optNEW = 0x0014
 File is new -> optRW (o/w read or write) More...
 
static const int optRW = 0x0004
 File is read/write (o/w read/only) More...
 
static const int optWIN = 0x0024
 File is new -> optRW use write-in cache. More...
 

Additional Inherited Members

- Public Types inherited from XrdOucCache
enum  LFP_Reason {
  ForAccess =0 ,
  ForInfo ,
  ForPath
}
 
enum  XeqCmd { xeqNoop = 0 }
 
- Public Attributes inherited from XrdOucCache
const char CacheType [8]
 A 1-to-7 character cache type identifier (usually pfc or rmc). More...
 
XrdOucCacheStats Statistics
 

Detailed Description

Attaches/creates and detaches/deletes cache-io objects for disk based cache.

Definition at line 168 of file XrdPfc.hh.

Constructor & Destructor Documentation

◆ Cache()

Cache::Cache ( XrdSysLogger logger,
XrdOucEnv env 
)

Constructor.

Definition at line 162 of file XrdPfc.cc.

162  :
163  XrdOucCache("pfc"),
164  m_env(env),
165  m_log(logger, "XrdPfc_"),
166  m_trace(new XrdSysTrace("XrdPfc", logger)),
167  m_traceID("Cache"),
168  m_oss(0),
169  m_gstream(0),
170  m_purge_pin(0),
171  m_prefetch_condVar(0),
172  m_prefetch_enabled(false),
173  m_RAM_used(0),
174  m_RAM_write_queue(0),
175  m_RAM_std_size(0),
176  m_isClient(false),
177  m_active_cond(0)
178 {
179  // Default log level is Warning.
180  m_trace->What = 2;
181 }
XrdOucCache(const char *ctype)
Definition: XrdOucCache.hh:754

References XrdSysTrace::What.

Referenced by CreateInstance().

+ Here is the caller graph for this function:

Member Function Documentation

◆ AddWriteTask()

void Cache::AddWriteTask ( Block b,
bool  from_read 
)

Add downloaded block in write queue.

Definition at line 229 of file XrdPfc.cc.

230 {
231  TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
232 
233  {
234  XrdSysMutexHelper lock(&m_RAM_mutex);
235  m_RAM_write_queue += b->get_size();
236  }
237 
238  m_writeQ.condVar.Lock();
239  if (fromRead)
240  m_writeQ.queue.push_back(b);
241  else
242  m_writeQ.queue.push_front(b);
243  m_writeQ.size++;
244  m_writeQ.condVar.Signal();
245  m_writeQ.condVar.UnLock();
246 }
#define TRACE(act, x)
Definition: XrdTrace.hh:63
int get_size() const
Definition: XrdPfcFile.hh:136
File * get_file() const
Definition: XrdPfcFile.hh:140
long long m_offset
Definition: XrdPfcFile.hh:114
const std::string & GetLocalPath() const
Definition: XrdPfcFile.hh:262

References XrdPfc::Block::get_file(), XrdPfc::Block::get_size(), XrdPfc::File::GetLocalPath(), XrdPfc::Block::m_offset, and TRACE.

+ Here is the call graph for this function:

◆ Attach() [1/2]

XrdOucCacheIO * Cache::Attach ( XrdOucCacheIO io,
int  Options = 0 
)
virtual

Implements XrdOucCache.

Definition at line 183 of file XrdPfc.cc.

184 {
185  const char* tpfx = "Attach() ";
186 
187  if (Options & XrdOucCache::optRW)
188  {
189  TRACE(Info, tpfx << "passing through write operation" << obfuscateAuth(io->Path()));
190  }
191  else if (Cache::GetInstance().Decide(io))
192  {
193  TRACE(Info, tpfx << obfuscateAuth(io->Path()));
194 
195  IO *cio;
196 
197  if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
198  {
199  cio = new IOFileBlock(io, *this);
200  }
201  else
202  {
203  IOFile *iof = new IOFile(io, *this);
204 
205  if ( ! iof->HasFile())
206  {
207  delete iof;
208  // TODO - redirect instead. But this is kind of an awkward place for it.
209  // errno is set during IOFile construction.
210  TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
211  return io;
212  }
213 
214  cio = iof;
215  }
216 
217  TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
218  ((loc && loc[0] != 0) ? loc : "<deferred open>"));
219 
220  return cio;
221  }
222  else
223  {
224  TRACE(Info, tpfx << "decision decline " << io->Path());
225  }
226  return io;
227 }
std::string obfuscateAuth(const std::string &input)
#define TRACE_PC(act, pre_code, x)
Definition: XrdPfcTrace.hh:59
bool Debug
@ Error
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
Definition: XrdOucCache.hh:188
static const int optRW
File is read/write (o/w read/only)
Definition: XrdOucCache.hh:555
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:225
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:136
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition: XrdPfc.cc:141
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
Definition: XrdPfcIOFile.hh:39
bool HasFile() const
Check if File was opened successfully.
Definition: XrdPfcIOFile.hh:48
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41

References Debug, Decide(), Error, GetInstance(), XrdPfc::IOFile::HasFile(), XrdOucCacheIO::Location(), obfuscateAuth(), XrdOucCache::optRW, XrdOucCacheIO::Path(), RefConfiguration(), TRACE, and TRACE_PC.

+ Here is the call graph for this function:

◆ Attach() [2/2]

virtual XrdOucCacheIO* XrdOucCache::Attach

Obtain a new IO object that fronts existing XrdOucCacheIO.

◆ blocksize_str2value()

bool Cache::blocksize_str2value ( const char *  from,
const char *  str,
long long &  val,
long long  min,
long long  max 
) const

Definition at line 106 of file XrdPfcConfiguration.cc.

108 {
109  if (XrdOuca2x::a2sz(m_log, "Error parsing block-size", str, &val, min, max))
110  return false;
111 
112  if (val & 0xFFF) {
113  val &= ~0x0FFF;
114  val += 0x1000;
115  m_log.Emsg(from, "blocksize must be a multiple of 4 kB. Rounded up.");
116  }
117 
118  return true;
119 }
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:116

References XrdOuca2x::a2sz(), and XrdSysError::Emsg().

+ Here is the call graph for this function:

◆ ClearPurgeProtectedSet()

void Cache::ClearPurgeProtectedSet ( )

Definition at line 694 of file XrdPfc.cc.

695 {
696  XrdSysCondVarHelper lock(&m_active_cond);
697  m_purge_delay_set.clear();
698 }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), and XrdPfc::ResourceMonitor::perform_purge_task_cleanup().

+ Here is the caller graph for this function:

◆ Conf()

const Configuration & Cache::Conf ( )
static

Definition at line 138 of file XrdPfc.cc.

138 { return m_instance->RefConfiguration(); }

References RefConfiguration().

Referenced by XrdPfc::ResourceMonitor::heart_beat(), XrdPfc::OldStylePurgeDriver(), XrdPfc::ResourceMonitor::perform_purge_check(), Proto_ResourceMonitorHeartBeat(), XrdPfc::ResourceMonitor::update_vs_and_file_usage_info(), and XrdPfc::DataFsSnapshot::write_json_file().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Config()

bool Cache::Config ( const char *  config_filename,
const char *  parameters,
XrdOucEnv env 
)

Parse configuration file.

Parameters
config_filenamepath to configuration file
parametersoptional parameters to be passed
envoptional environment to use for configuration
Returns
parse status

Definition at line 434 of file XrdPfcConfiguration.cc.

435 {
436  // Indicate whether or not we are a client instance
437  const char *theINS = getenv("XRDINSTANCE");
438  m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
439 
440  // Tell everyone else we are a caching proxy
441  XrdOucEnv::Export("XRDPFC", 1);
442 
443  XrdOucEnv emptyEnv;
444  XrdOucEnv *myEnv = env ? env : &emptyEnv;
445 
446  XrdOucStream Config(&m_log, theINS, myEnv, "=====> ");
447 
448  if (! config_filename || ! *config_filename)
449  {
450  TRACE(Error, "Config() configuration file not specified.");
451  return false;
452  }
453 
454  int fd;
455  if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
456  {
457  TRACE( Error, "Config() can't open configuration file " << config_filename);
458  return false;
459  }
460 
461  Config.Attach(fd);
462  static const char *cvec[] = { "*** pfc plugin config:", 0 };
463  Config.Capture(cvec);
464 
465  // Obtain OFS configurator for OSS plugin.
466  XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
467  &XrdVERSIONINFOVAR(XrdOucGetCache));
468  if (! ofsCfg) return false;
469 
470  TmpConfiguration tmpc;
471 
472  Configuration &CFG = m_configuration;
473 
474  // Adjust default parameters for client/serverless caching
475  if (m_isClient)
476  {
477  m_configuration.m_bufferSize = 128 * 1024; // same as normal.
478  m_configuration.m_wqueue_blocks = 8;
479  m_configuration.m_wqueue_threads = 1;
480  }
481 
482  // If network checksum processing is the default, indicate so.
483  if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
484 
485  // Actual parsing of the config file.
486  bool retval = true, aOK = true;
487  char *var;
488  while ((var = Config.GetMyFirstWord()))
489  {
490  if (! strcmp(var,"pfc.osslib"))
491  {
492  retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
493  }
494  else if (! strcmp(var,"pfc.cschk"))
495  {
496  retval = xcschk(Config);
497  }
498  else if (! strcmp(var,"pfc.decisionlib"))
499  {
500  retval = xdlib(Config);
501  }
502  else if (! strcmp(var,"pfc.purgelib"))
503  {
504  retval = xplib(Config);
505  }
506  else if (! strcmp(var,"pfc.trace"))
507  {
508  retval = xtrace(Config);
509  }
510  else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
511  {
512  m_configuration.m_allow_xrdpfc_command = true;
513  }
514  else if (! strncmp(var,"pfc.", 4))
515  {
516  retval = ConfigParameters(std::string(var+4), Config, tmpc);
517  }
518 
519  if ( ! retval)
520  {
521  TRACE(Error, "Config() error in parsing");
522  aOK = false;
523  }
524  }
525 
526  Config.Close();
527 
528  // Load OSS plugin.
529  auto orig_runmode = myEnv->Get("oss.runmode");
530  myEnv->Put("oss.runmode", "pfc");
531  if (m_configuration.is_cschk_cache())
532  {
533  char csi_conf[128];
534  if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
535  {
536  ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
537  } else {
538  TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
539  return false;
540  }
541  }
542  if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, myEnv))
543  {
544  ofsCfg->Plugin(m_oss);
545  }
546  else
547  {
548  TRACE(Error, "Config() Unable to create an OSS object");
549  return false;
550  }
551  if (orig_runmode) myEnv->Put("oss.runmode", orig_runmode);
552  else myEnv->Put("oss.runmode", "");
553 
554  // Test if OSS is operational, determine optional features.
555  aOK &= test_oss_basics_and_features();
556 
557  // sets default value for disk usage
558  XrdOssVSInfo sP;
559  {
560  if (m_configuration.m_meta_space != m_configuration.m_data_space &&
561  m_oss->StatVS(&sP, m_configuration.m_meta_space.c_str(), 1) < 0)
562  {
563  m_log.Emsg("ConfigParameters()", "error obtaining stat info for meta space ", m_configuration.m_meta_space.c_str());
564  return false;
565  }
566  if (m_configuration.m_meta_space != m_configuration.m_data_space && sP.Total < 10ll << 20)
567  {
568  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
569  m_configuration.m_meta_space.c_str());
570  return false;
571  }
572  if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
573  {
574  m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
575  return false;
576  }
577  if (sP.Total < 10ll << 20)
578  {
579  m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
580  m_configuration.m_data_space.c_str());
581  return false;
582  }
583 
584  m_configuration.m_diskTotalSpace = sP.Total;
585 
586  if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
587  cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
588  {
589  if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
590  m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
591  aOK = false;
592  }
593  }
594  else aOK = false;
595 
596  if ( ! tmpc.m_fileUsageMax.empty())
597  {
598  if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
599  cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
600  cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
601  {
602  if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
603  m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
604  m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
605  {
606  m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
607  aOK = false;
608  }
609 
610 
611  if (aOK && m_configuration.m_fileUsageMax >= m_configuration.m_diskUsageLWM)
612  {
613  m_log.Emsg("ConfigParameters()", "pfc.diskusage files values must be below lowWatermark");
614  aOK = false;
615  }
616  }
617  else aOK = false;
618  }
619  }
620 
621  // sets flush frequency
622  if ( ! tmpc.m_flushRaw.empty())
623  {
624  if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
625  {
626  if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
627  &m_configuration.m_flushCnt,
628  100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
629  {
630  return false;
631  }
632  m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
633  }
634  else
635  {
636  if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
637  &m_configuration.m_flushCnt, 100, 100000))
638  {
639  return false;
640  }
641  }
642  }
643 
644  // get number of available RAM blocks after process configuration
645  if (m_configuration.m_RamAbsAvailable == 0)
646  {
647  m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
648  char buff[1024];
649  snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
650  m_log.Say("Config info: ", buff);
651  }
652  // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
653  m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
654 
655  // Set tracing to debug if this is set in environment
656  char* cenv = getenv("XRDDEBUG");
657  if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
658 
659  if (aOK)
660  {
661 // 000 001 010
662  const char *csc[] = {"off", "cache nonet", "nocache net notls",
663 // 011
664  "cache net notls",
665 // 100 101 110
666  "off", "cache nonet", "nocache net tls",
667 // 111
668  "cache net tls"};
669  char uvk[32];
670  if (m_configuration.m_cs_UVKeep < 0)
671  strcpy(uvk, "lru");
672  else
673  sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
674  float ram_gb = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
675 
676  char urlcgi_blks[64] = "ignore", urlcgi_npref[32] = "ignore";
677  if (CFG.m_cgi_blocksize_allowed)
678  snprintf(urlcgi_blks, sizeof(urlcgi_blks), "%lldk %lldk",
679  CFG.m_cgi_min_bufferSize >> 10, CFG.m_cgi_max_bufferSize >> 10);
680  if (CFG.m_cgi_prefetch_allowed)
681  snprintf(urlcgi_npref, sizeof(urlcgi_npref), "%d %d",
683 
684  char buff[8192];
685  int loff = 0;
686  loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
687  " pfc.cschk %s uvkeep %s\n"
688  " pfc.blocksize %lldk\n"
689  " pfc.prefetch %d\n"
690  " pfc.urlcgi blocksize %s prefetch %s\n"
691  " pfc.ram %.fg\n"
692  " pfc.writequeue %d %d\n"
693  " # Total available disk: %lld\n"
694  " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
695  " pfc.spaces %s %s\n"
696  " pfc.trace %d\n"
697  " pfc.flush %lld\n"
698  " pfc.acchistorysize %d\n"
699  " pfc.onlyIfCachedMinBytes %lld\n"
700  " pfc.onlyIfCachedMinFrac %.2f\n",
701  config_filename,
702  csc[int(m_configuration.m_cs_Chk)], uvk,
703  m_configuration.m_bufferSize >> 10,
704  m_configuration.m_prefetch_max_blocks,
705  urlcgi_blks, urlcgi_npref,
706  ram_gb,
707  m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
708  sP.Total,
709  m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
710  m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
711  m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
712  m_configuration.m_data_space.c_str(),
713  m_configuration.m_meta_space.c_str(),
714  m_trace->What,
715  m_configuration.m_flushCnt,
716  m_configuration.m_accHistorySize,
717  m_configuration.m_onlyIfCachedMinSize,
718  m_configuration.m_onlyIfCachedMinFrac);
719 
720  if (m_configuration.is_dir_stat_reporting_on())
721  {
722  loff += snprintf(buff + loff, sizeof(buff) - loff,
723  " pfc.dirstats interval %d maxdepth %d (internal: size_of_dirlist %d, size_of_globlist %d)\n",
724  m_configuration.m_dirStatsInterval, m_configuration.m_dirStatsStoreDepth,
725  (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
726  loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
727  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
728  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
729  loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
730  for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
731  loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
732  }
733 
734  if (m_configuration.m_hdfsmode)
735  {
736  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
737  }
738 
739  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.writethrough %s\n", m_configuration.m_write_through ? "on" : "off");
740 
741  if (m_configuration.m_username.empty())
742  {
743  char unameBuff[256];
744  XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
745  m_configuration.m_username = unameBuff;
746  }
747  else
748  {
749  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
750  }
751 
752  if (m_configuration.m_httpcc)
753  {
754  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.httpcc on\n");
755  }
756  if (m_configuration.m_qfsredir)
757  {
758  loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.qfsredir on\n");
759  }
760 
761  m_log.Say(buff);
762 
763  m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
764  }
765 
766  // Derived settings
767  m_prefetch_enabled = CFG.m_prefetch_max_blocks > 0 || CFG.m_cgi_max_prefetch_max_blocks > 0;
769 
770  m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
771 
772  m_log.Say(" pfc g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive\n");
773 
774  // Create the ResourceMonitor and get it ready for starting the main thread function.
775  if (aOK)
776  {
777  m_res_mon = new ResourceMonitor(*m_oss);
778  m_res_mon->init_before_main();
779  }
780 
781  m_log.Say("=====> Proxy file cache configuration parsing ", aOK ? "completed" : "failed");
782 
783  if (ofsCfg) delete ofsCfg;
784 
785  // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
786  // Building of xrdpfc_print fails when this is enabled.
787 #ifdef XRDPFC_CKSUM_TEST
788  {
789  int xxx = m_configuration.m_cs_Chk;
790 
791  for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
792  {
793  Info::TestCksumStuff();
794  }
795 
796  m_configuration.m_cs_Chk = xxx;
797  }
798 #endif
799 
800  return aOK;
801 }
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition: XrdPfc.cc:80
#define open
Definition: XrdPosix.hh:78
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
long long Total
Definition: XrdOssVS.hh:90
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition: XrdOss.cc:117
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:188
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:281
char * Get(const char *varname)
Definition: XrdOucEnv.hh:69
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:70
bool Config(const char *config_filename, const char *parameters, XrdOucEnv *env)
Parse configuration file.
static size_t s_maxNumAccess
Definition: XrdPfcInfo.hh:311
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:162
@ CSChk_Both
Definition: XrdPfcTypes.hh:27
@ CSChk_None
Definition: XrdPfcTypes.hh:27
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:66
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition: XrdPfc.hh:124
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:111
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:125
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition: XrdPfc.hh:118
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition: XrdPfc.hh:103
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition: XrdPfc.hh:119
bool m_httpcc
enable http cache control
Definition: XrdPfc.hh:139
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition: XrdPfc.hh:122
int m_wqueue_threads
number of threads writing blocks to disk
Definition: XrdPfc.hh:114
bool m_write_through
flag indicating write-through mode is enabled
Definition: XrdPfc.hh:86
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:94
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:99
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:97
int m_dirStatsStoreDepth
maximum depth for statistics write out
Definition: XrdPfc.hh:108
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:88
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:96
bool is_cschk_cache() const
Definition: XrdPfc.hh:77
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition: XrdPfc.hh:106
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:115
bool m_cs_ChkTLS
Allow TLS.
Definition: XrdPfc.hh:129
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:98
int m_cs_Chk
Checksum check.
Definition: XrdPfc.hh:128
bool m_qfsredir
redirect file system query to the origin
Definition: XrdPfc.hh:140
bool m_hdfsmode
flag for enabling block-level operation
Definition: XrdPfc.hh:87
int m_purgeColdFilesAge
purge files older than this age
Definition: XrdPfc.hh:101
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:91
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition: XrdPfc.hh:105
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:95
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:112
long long m_bufferSize
cache block size, default 128 kB
Definition: XrdPfc.hh:110
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition: XrdPfc.hh:117
int m_dirStatsInterval
time between resource monitor statistics dump in seconds
Definition: XrdPfc.hh:107
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:92
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:113
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition: XrdPfc.hh:120
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:90
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition: XrdPfc.hh:121
bool is_cschk_net() const
Definition: XrdPfc.hh:78
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:132
time_t m_cs_UVKeep
unverified checksum cache keep
Definition: XrdPfc.hh:127
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:100
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:131
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:72
std::string m_diskUsageLWM
Definition: XrdPfc.hh:147
std::string m_diskUsageHWM
Definition: XrdPfc.hh:148
std::string m_fileUsageBaseline
Definition: XrdPfc.hh:149
std::string m_fileUsageNominal
Definition: XrdPfc.hh:150
std::string m_flushRaw
Definition: XrdPfc.hh:152
std::string m_fileUsageMax
Definition: XrdPfc.hh:151

References XrdOuca2x::a2ll(), XrdOuca2x::a2sz(), XrdPfc::CSChk_Both, XrdPfc::CSChk_None, XrdSysError::Emsg(), Error, XrdOucEnv::Export(), XrdOucEnv::Get(), XrdOucEnv::GetPtr(), XrdPfc::ResourceMonitor::init_before_main(), XrdPfc::Configuration::is_cschk_cache(), XrdPfc::Configuration::is_cschk_net(), XrdPfc::Configuration::is_dir_stat_reporting_on(), XrdOfsConfigPI::Load(), XrdPfc::Configuration::m_accHistorySize, XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_cgi_blocksize_allowed, XrdPfc::Configuration::m_cgi_max_bufferSize, XrdPfc::Configuration::m_cgi_max_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_min_bufferSize, XrdPfc::Configuration::m_cgi_min_prefetch_max_blocks, XrdPfc::Configuration::m_cgi_prefetch_allowed, XrdPfc::Configuration::m_cs_Chk, XrdPfc::Configuration::m_cs_ChkTLS, XrdPfc::Configuration::m_cs_UVKeep, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_dirStatsDirGlobs, XrdPfc::Configuration::m_dirStatsDirs, XrdPfc::Configuration::m_dirStatsInterval, XrdPfc::Configuration::m_dirStatsStoreDepth, XrdPfc::Configuration::m_diskTotalSpace, XrdPfc::Configuration::m_diskUsageHWM, XrdPfc::TmpConfiguration::m_diskUsageHWM, XrdPfc::Configuration::m_diskUsageLWM, XrdPfc::TmpConfiguration::m_diskUsageLWM, XrdPfc::Configuration::m_fileUsageBaseline, XrdPfc::TmpConfiguration::m_fileUsageBaseline, XrdPfc::Configuration::m_fileUsageMax, XrdPfc::TmpConfiguration::m_fileUsageMax, XrdPfc::Configuration::m_fileUsageNominal, XrdPfc::TmpConfiguration::m_fileUsageNominal, XrdPfc::Configuration::m_flushCnt, XrdPfc::TmpConfiguration::m_flushRaw, XrdPfc::Configuration::m_hdfsbsize, XrdPfc::Configuration::m_hdfsmode, XrdPfc::Configuration::m_httpcc, XrdPfc::Configuration::m_meta_space, XrdPfc::Configuration::m_onlyIfCachedMinFrac, XrdPfc::Configuration::m_onlyIfCachedMinSize, XrdPfc::Configuration::m_prefetch_max_blocks, XrdPfc::Configuration::m_purgeColdFilesAge, XrdPfc::Configuration::m_purgeInterval, XrdPfc::Configuration::m_qfsredir, XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::Configuration::m_RamKeepStdBlocks, XrdPfc::Configuration::m_username, XrdPfc::Configuration::m_wqueue_blocks, XrdPfc::Configuration::m_wqueue_threads, XrdPfc::Configuration::m_write_through, XrdOfsConfigPI::New(), open, XrdOfsConfigPI::Parse(), XrdOfsConfigPI::Plugin(), XrdOfsConfigPI::Push(), XrdOucEnv::Put(), XrdPfc::Info::s_maxNumAccess, XrdSysError::Say(), XrdOss::StatVS(), XrdOfsConfigPI::theOssLib, XrdOssVSInfo::Total, TRACE, XrdOucUtils::UserName(), XrdSysTrace::What, and XrdOucGetCache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ConsiderCached()

int Cache::ConsiderCached ( const char *  curl)
virtual
Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why.
>0 - Reserved for future use.

Definition at line 1064 of file XrdPfc.cc.

1065 {
1066  static const char* tpfx = "ConsiderCached ";
1067 
1068  TRACE(Debug, tpfx << curl);
1069 
1070  XrdCl::URL url(curl);
1071  std::string f_name = url.GetPath();
1072 
1073  File *file = nullptr;
1074  {
1075  XrdSysCondVarHelper lock(&m_active_cond);
1076  auto it = m_active.find(f_name);
1077  if (it != m_active.end()) {
1078  file = it->second;
1079  // If the file-open is in progress, `file` is a nullptr
1080  // so we cannot increase the reference count. For now,
1081  // simply treat it as if the file open doesn't exist instead
1082  // of trying to wait and see if it succeeds.
1083  if (file) {
1084  inc_ref_cnt(file, false, false);
1085  }
1086  }
1087  }
1088  if (file) {
1089  struct stat sbuff;
1090  int res = file->Fstat(sbuff);
1091  dec_ref_cnt(file, false);
1092  if (res)
1093  return res;
1094  // DecideIfConsideredCached() already called in File::Fstat().
1095  return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1096  }
1097 
1098  struct stat sbuff;
1099  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1100  if (res != XrdOssOK) {
1101  TRACE(Debug, tpfx << curl << " -> " << res);
1102  return res;
1103  }
1104  if (S_ISDIR(sbuff.st_mode))
1105  {
1106  TRACE(Debug, tpfx << curl << " -> EISDIR");
1107  return -EISDIR;
1108  }
1109 
1110  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1111  if (file_size < 0) {
1112  TRACE(Debug, tpfx << curl << " -> " << file_size);
1113  return (int) file_size;
1114  }
1115  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1116 
1117  return is_cached ? 0 : -EREMOTE;
1118 }
#define XrdOssOK
Definition: XrdOss.hh:54
#define stat(a, b)
Definition: XrdPosix.hh:105
URL representation.
Definition: XrdClURL.hh:31
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition: XrdPfc.cc:949
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition: XrdPfc.cc:1031
int Fstat(struct stat &sbuff)
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, stat, XrdOss::Stat(), TRACE, and XrdOssOK.

Referenced by XrdPfcFSctl::FSctl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ CreateInstance()

Cache & Cache::CreateInstance ( XrdSysLogger logger,
XrdOucEnv env 
)
static

Singleton creation.

Definition at line 129 of file XrdPfc.cc.

130 {
131  assert (m_instance == 0);
132  m_instance = new Cache(logger, env);
133  return *m_instance;
134 }
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition: XrdPfc.cc:162

References Cache().

Referenced by XrdOucGetCache().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Decide()

bool Cache::Decide ( XrdOucCacheIO io)

Makes decision if the original XrdOucCacheIO should be cached.

Parameters
&URL of file
Returns
decision if IO object will be cached.

Definition at line 141 of file XrdPfc.cc.

142 {
143  if (! m_decisionpoints.empty())
144  {
145  XrdCl::URL url(io->Path());
146  std::string filename = url.GetPath();
147  std::vector<Decision*>::const_iterator it;
148  for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
149  {
150  XrdPfc::Decision *d = *it;
151  if (! d) continue;
152  if (! d->Decide(filename, *m_oss))
153  {
154  return false;
155  }
156  }
157  }
158 
159  return true;
160 }
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0

References XrdPfc::Decision::Decide(), XrdCl::URL::GetPath(), and XrdOucCacheIO::Path().

Referenced by Attach().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DecideIfConsideredCached()

bool Cache::DecideIfConsideredCached ( long long  file_size,
long long  bytes_on_disk 
)

Definition at line 1031 of file XrdPfc.cc.

1032 {
1033  if (file_size == 0 || bytes_on_disk >= file_size)
1034  return true;
1035 
1036  double frac_on_disk = (double) bytes_on_disk / file_size;
1037 
1038  if (file_size <= m_configuration.m_onlyIfCachedMinSize)
1039  {
1040  if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
1041  return true;
1042  }
1043  else
1044  {
1045  if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
1046  frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
1047  return true;
1048  }
1049  return false;
1050 }

References XrdPfc::Configuration::m_onlyIfCachedMinFrac, and XrdPfc::Configuration::m_onlyIfCachedMinSize.

Referenced by ConsiderCached(), and Stat().

+ Here is the caller graph for this function:

◆ DeRegisterPrefetchFile()

void Cache::DeRegisterPrefetchFile ( File file)

Definition at line 720 of file XrdPfc.cc.

721 {
722  // Can be called with other locks held.
723 
724  if ( ! m_prefetch_enabled)
725  {
726  return;
727  }
728 
729  m_prefetch_condVar.Lock();
730  for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
731  {
732  if (*it == file)
733  {
734  m_prefetchList.erase(it);
735  break;
736  }
737  }
738  m_prefetch_condVar.UnLock();
739 }

References XrdSysCondVar::Lock(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ DetermineFullFileSize()

long long Cache::DetermineFullFileSize ( const std::string &  cinfo_fname)

Definition at line 949 of file XrdPfc.cc.

950 {
951  if (m_metaXattr) {
952  char pfn[4096];
953  m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
954  long long fsize = -1ll;
955  int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
956  if (res == sizeof(long long))
957  {
958  return fsize;
959  }
960  else
961  {
962  TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
963  }
964  }
965 
966  XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
967  XrdOucEnv env;
968  long long ret;
969  int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
970  if (res < 0) {
971  ret = res;
972  } else {
973  Info info(m_trace, 0);
974  if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
975  ret = -EBADF;
976  } else {
977  ret = info.GetFileSize();
978  }
979  infoFile->Close();
980  }
981  delete infoFile;
982  return ret;
983 }
XrdSysXAttr * XrdSysXAttrActive
Definition: XrdSysFAttr.cc:61
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:228
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition: XrdOss.hh:954
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0

References XrdOssDF::Close(), Debug, XrdSysXAttr::Get(), XrdPfc::Info::GetFileSize(), XrdOss::Lfn2Pfn(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), TRACE, and XrdSysXAttrActive.

Referenced by ConsiderCached(), and Stat().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ExecuteCommandUrl()

void Cache::ExecuteCommandUrl ( const std::string &  command_url)

Definition at line 51 of file XrdPfcCommand.cc.

52 {
53  static const char *top_epfx = "ExecuteCommandUrl ";
54 
55  SplitParser cp(command_url, "/");
56 
57  std::string token = cp.get_token();
58 
59  if (token != "xrdpfc_command")
60  {
61  TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
62  return;
63  }
64 
65  // Get the command
66  token = cp.get_token_as_string();
67 
68  auto get_opt = [](SplitParser &sp) -> char {
69  const char *t = sp.get_token();
70  if (t)
71  return (t[0] == '-' && t[1] != 0) ? t[1] : 0;
72  else
73  return -1;
74  };
75 
76  //================================================================
77  // create_file
78  //================================================================
79 
80  if (token == "create_file")
81  {
82  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
83  static const char* usage =
84  "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
85  " Creates a cache file with given parameters. Data in file is random.\n"
86  " Useful for cache purge testing.\n"
87  "Notes:\n"
88  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
89  " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
90  " . -t and -d can be given multiple times to record several accesses.\n"
91  " . Negative arguments given to -t are interpreted as relative to now.\n";
92 
93  const Configuration &conf = m_configuration;
94 
95  token = cp.get_token_as_string();
96  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
97  if (token.empty()) {
98  TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
99  return;
100  }
101  TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
102  if ( ! cp.has_reminder()) {
103  TRACE(Error, err_prefix << "Path section must not be empty.");
104  return;
105  }
106 
107  long long file_size = ONE_GB;
108  long long block_size = conf.m_bufferSize;
109  int access_time [MAX_ACCESSES];
110  int access_duration[MAX_ACCESSES];
111  int at_count = 0, ad_count = 0;
112 
113  time_t time_now = time(0);
114 
115  SplitParser ap(token, " ");
116  char theOpt;
117 
118  while ((theOpt = get_opt(ap)) != (char) -1)
119  {
120  switch (theOpt)
121  {
122  case 'h': {
123  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
124  return;
125  }
126  case 's': {
127  if (XrdOuca2x::a2sz(m_log, "Error getting filesize", ap.get_token(),
128  &file_size, 0ll, 32 * ONE_GB))
129  return;
130  break;
131  }
132  case 'b': {
133  if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", ap.get_token(),
134  &block_size, 0ll, 64 * ONE_MB))
135  return;
136  break;
137  }
138  case 't': {
139  if (XrdOuca2x::a2i(m_log, "Error getting access time", ap.get_token(),
140  &access_time[at_count++], INT_MIN, INT_MAX))
141  return;
142  break;
143  }
144  case 'd': {
145  if (XrdOuca2x::a2i(m_log, "Error getting access duration", ap.get_token(),
146  &access_duration[ad_count++], 0, 24 * 3600))
147  return;
148  break;
149  }
150  default: {
151  TRACE(Error, err_prefix << "Unhandled command argument.");
152  return;
153  }
154  }
155  }
156 
157  if (at_count < 1) access_time [at_count++] = time_now - 10;
158  if (ad_count < 1) access_duration[ad_count++] = 10;
159 
160  if (at_count != ad_count)
161  {
162  TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
163  return;
164  }
165 
166  std::string file_path (cp.get_reminder_with_delim());
167  std::string cinfo_path(file_path + Info::s_infoExtension);
168 
169  TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
170 
171  // Check if cinfo exists ... bail out if it does.
172  {
173  struct stat infoStat;
174  if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
175  {
176  TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
177  return;
178  }
179  }
180 
181  TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
182 
183  {
184  const char *myUser = conf.m_username.c_str();
185  XrdOucEnv myEnv;
186 
187  // Create the data file.
188 
189  char size_str[32]; sprintf(size_str, "%lld", file_size);
190  myEnv.Put("oss.asize", size_str);
191  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
192  int cret;
193  if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
194  {
195  TRACE(Error, err_prefix << "Create failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
196  return;
197  }
198 
199  XrdOssDF *myFile = GetOss()->newFile(myUser);
200  if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
201  {
202  TRACE(Error, err_prefix << "Open failed for data file " << file_path << ERRNO_AND_ERRSTR(-cret));
203  delete myFile;
204  return;
205  }
206 
207  // Create the info file.
208 
209  myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
210  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
211  if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
212  {
213  TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
214  myFile->Close(); delete myFile;
215  return;
216  }
217 
218  XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
219  if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
220  {
221  TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ERRNO_AND_ERRSTR(-cret));
222  delete myInfoFile;
223  myFile->Close(); delete myFile;
224  return;
225  }
226 
227  // Allocate space for the data file.
228 
229  if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
230  {
231  TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ERRNO_AND_ERRSTR(cret));
232  }
233 
234  // Fill up cinfo.
235 
236  Info myInfo(m_trace, false);
237  myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
238  myInfo.SetAllBitsSynced();
239 
240  for (int i = 0; i < at_count; ++i)
241  {
242  time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
243 
244  myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
245  }
246 
247  myInfo.Write(myInfoFile, cinfo_path.c_str());
248 
249  // Fake last modified time to the last access_time
250  {
251  time_t last_detach;
252  myInfo.GetLatestDetachTime(last_detach);
253  struct timespec acc_mod_time[2] = { {last_detach, UTIME_OMIT}, {last_detach, 0} };
254 
255  futimens(myInfoFile->getFD(), acc_mod_time);
256  }
257 
258  myInfoFile->Close(); delete myInfoFile;
259  myFile->Close(); delete myFile;
260 
261  struct stat dstat;
262  GetOss()->Stat(file_path.c_str(), &dstat);
263  TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB, "
264  << "st_blocks=" << dstat.st_blocks);
265 
266  {
267  XrdSysCondVarHelper lock(&m_writeQ.condVar);
268 
269  m_writeQ.writes_between_purges += file_size;
270  }
271  {
272  int token = m_res_mon->register_file_open(file_path, time_now, false);
273  XrdPfc::Stats stats;
274  stats.m_BytesWritten = file_size;
275  stats.m_StBlocksAdded = dstat.st_blocks;
276  m_res_mon->register_file_update_stats(token, stats);
277  m_res_mon->register_file_close(token, time(0), stats);
278  }
279  }
280  }
281 
282  //================================================================
283  // remove_file
284  //================================================================
285 
286  else if (token == "remove_file")
287  {
288  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
289  static const char* usage =
290  "Usage: remove_file/ [-h] /<path>\n"
291  " Removes given file from the cache unless it is currently open.\n"
292  " Useful for removal of stale files or duplicate files in a caching cluster.\n"
293  "Notes:\n"
294  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
295 
296  token = cp.get_token_as_string();
297  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
298  if (token.empty()) {
299  TRACE(Error, err_prefix << "Options section must not be empty, a single space character is OK.");
300  return;
301  }
302  TRACE(Debug, err_prefix << "File path (reminder of URL) is '" << cp.get_reminder() <<"'.");
303  if ( ! cp.has_reminder()) {
304  TRACE(Error, err_prefix << "Path section must not be empty.");
305  return;
306  }
307 
308  SplitParser ap(token, " ");
309  char theOpt;
310 
311  while ((theOpt = get_opt(ap)) != (char) -1)
312  {
313  switch (theOpt)
314  {
315  case 'h': {
316  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
317  return;
318  }
319  default: {
320  TRACE(Error, err_prefix << "Unhandled command argument.");
321  return;
322  }
323  }
324  }
325 
326  std::string f_name(cp.get_reminder_with_delim());
327 
328  TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
329 
330  int ret = UnlinkFile(f_name, true);
331 
332  TRACE(Info, err_prefix << "returned with status " << ret);
333  }
334 
335  //================================================================
336  // unknown command
337  //================================================================
338 
339  else
340  {
341  TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
342  }
343 }
void usage()
#define XRDOSS_mkpath
Definition: XrdOss.hh:526
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
bool Create
virtual int getFD()
Definition: XrdOss.hh:486
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:45
XrdOss * GetOss() const
Definition: XrdPfc.hh:290
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1193
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1264
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
Definition: XrdPfcStats.hh:43
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:42

References XrdOuca2x::a2i(), XrdOuca2x::a2sz(), XrdOssDF::Close(), Create, Debug, ERRNO_AND_ERRSTR, Error, XrdPfc::SplitParser::get_reminder(), XrdPfc::SplitParser::get_reminder_with_delim(), XrdPfc::SplitParser::get_token(), XrdPfc::SplitParser::get_token_as_string(), XrdOssDF::getFD(), XrdPfc::Info::GetLatestDetachTime(), GetOss(), XrdPfc::SplitParser::has_reminder(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Stats::m_BytesWritten, XrdPfc::Configuration::m_data_space, XrdPfc::Configuration::m_meta_space, XrdPfc::Stats::m_StBlocksAdded, XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdOucEnv::Put(), XrdPfc::ResourceMonitor::register_file_close(), XrdPfc::ResourceMonitor::register_file_open(), XrdPfc::ResourceMonitor::register_file_update_stats(), XrdPfc::Info::s_infoExtension, XrdSysError::Say(), XrdPfc::Info::SetAllBitsSynced(), XrdPfc::Info::SetBufferSizeFileSizeAndCreationTime(), stat, XrdOss::Stat(), Stat(), TRACE, UnlinkFile(), usage(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), XRDOSS_mkpath, and XrdOssOK.

+ Here is the call graph for this function:

◆ FileSyncDone()

void Cache::FileSyncDone ( File f,
bool  high_debug 
)

Definition at line 551 of file XrdPfc.cc.

552 {
553  dec_ref_cnt(f, high_debug);
554 }

◆ GetCacheControlXAttr() [1/2]

int Cache::GetCacheControlXAttr ( const std::string &  cinfo_fname,
std::string &  res 
) const

Definition at line 989 of file XrdPfc.cc.

990 {
991  if (m_metaXattr) {
992 
993  char pfn[4096];
994  m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
995 
996  char cc[512];
997  int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, pfn, -1);
998  if (res > 0)
999  {
1000  std::string tmp(cc, res);
1001  ival = tmp;
1002  }
1003  return res;
1004  }
1005  return 0;
1006 }

References XrdSysXAttr::Get(), XrdOss::Lfn2Pfn(), and XrdSysXAttrActive.

+ Here is the call graph for this function:

◆ GetCacheControlXAttr() [2/2]

int Cache::GetCacheControlXAttr ( int  fd,
std::string &  res 
) const

Definition at line 1012 of file XrdPfc.cc.

1013 {
1014  if (m_metaXattr) {
1015  char cc[512];
1016  int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, nullptr, fd);
1017  if (res > 0)
1018  {
1019  ival = std::string(cc, res);
1020  return res;
1021  }
1022  }
1023  return 0;
1024 }

References XrdSysXAttr::Get(), and XrdSysXAttrActive.

+ Here is the call graph for this function:

◆ GetFile()

File * Cache::GetFile ( const std::string &  path,
IO io,
long long  off = 0,
long long  filesize = 0 
)

Definition at line 397 of file XrdPfc.cc.

398 {
399  // Called from virtual IOFile constructor.
400 
401  TRACE(Debug, "GetFile " << path << ", io " << io);
402 
403  ActiveMap_i it;
404 
405  {
406  XrdSysCondVarHelper lock(&m_active_cond);
407 
408  while (true)
409  {
410  it = m_active.find(path);
411 
412  // File is not open or being opened. Mark it as being opened and
413  // proceed to opening it outside of while loop.
414  if (it == m_active.end())
415  {
416  it = m_active.insert(std::make_pair(path, (File*) 0)).first;
417  break;
418  }
419 
420  if (it->second != 0)
421  {
422  it->second->AddIO(io);
423  inc_ref_cnt(it->second, false, true);
424 
425  return it->second;
426  }
427  else
428  {
429  // Wait for some change in m_active, then recheck.
430  m_active_cond.Wait();
431  }
432  }
433  }
434 
435  // This is always true, now that IOFileBlock is unsupported.
436 
437  if (filesize == 0)
438  {
439  struct stat st;
440  int res = io->Fstat(st);
441  if (res < 0) {
442  errno = res;
443  TRACE(Error, "GetFile, could not get valid stat");
444  } else if (res > 0) {
445  errno = ENOTSUP;
446  TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
447  } else {
448  filesize = st.st_size;
449  }
450  }
451 
452  File *file = 0;
453 
454  if (filesize >= 0)
455  {
456  file = File::FileOpen(path, off, filesize, io->GetInput());
457  }
458 
459  {
460  XrdSysCondVarHelper lock(&m_active_cond);
461 
462  if (file)
463  {
464  inc_ref_cnt(file, false, true);
465  it->second = file;
466 
467  file->AddIO(io);
468  }
469  else
470  {
471  m_active.erase(it);
472  }
473 
474  m_active_cond.Broadcast();
475  }
476 
477  return file;
478 }
virtual int Fstat(struct stat &sbuff)
Definition: XrdOucCache.hh:175
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void AddIO(IO *io)
XrdOucCacheIO * GetInput()
Definition: XrdPfcIO.cc:31

References XrdPfc::File::AddIO(), XrdSysCondVar::Broadcast(), Debug, Error, XrdPfc::File::FileOpen(), XrdOucCacheIO::Fstat(), XrdPfc::IO::GetInput(), stat, TRACE, and XrdSysCondVar::Wait().

Referenced by XrdPfc::IOFile::IOFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetGStream()

XrdXrootdGStream* XrdPfc::Cache::GetGStream ( )
inline

Definition at line 308 of file XrdPfc.hh.

308 { return m_gstream; }

◆ GetInstance()

Cache & Cache::GetInstance ( )
static

Singleton access.

Definition at line 136 of file XrdPfc.cc.

136 { return *m_instance; }

Referenced by XrdPfc::IOFile::IOFile(), XrdPfc::IOFileBlock::IOFileBlock(), Attach(), XrdPfc::IOFile::DetachFinalize(), XrdPfc::ResourceMonitor::perform_purge_check(), XrdPfc::ResourceMonitor::perform_purge_task_cleanup(), PrefetchThread(), Prepare(), ProcessWriteTaskThread(), and Proto_ResourceMonitorHeartBeat().

+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError* XrdPfc::Cache::GetLog ( ) const
inline

Definition at line 304 of file XrdPfc.hh.

304 { return &m_log; }

◆ GetNextFileToPrefetch()

File * Cache::GetNextFileToPrefetch ( )

Definition at line 742 of file XrdPfc.cc.

743 {
744  m_prefetch_condVar.Lock();
745  while (m_prefetchList.empty())
746  {
747  m_prefetch_condVar.Wait();
748  }
749 
750  // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
751 
752  size_t l = m_prefetchList.size();
753  int idx = rand() % l;
754  File* f = m_prefetchList[idx];
755 
756  m_prefetch_condVar.UnLock();
757  return f;
758 }

References XrdSysCondVar::Lock(), XrdSysCondVar::UnLock(), and XrdSysCondVar::Wait().

Referenced by Prefetch().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetOss()

XrdOss* XrdPfc::Cache::GetOss ( ) const
inline

Definition at line 290 of file XrdPfc.hh.

290 { return m_oss; }

Referenced by ExecuteCommandUrl().

+ Here is the caller graph for this function:

◆ GetPurgePin()

PurgePin* XrdPfc::Cache::GetPurgePin ( ) const
inline

Definition at line 294 of file XrdPfc.hh.

294 { return m_purge_pin; }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

◆ GetTrace()

XrdSysTrace* XrdPfc::Cache::GetTrace ( ) const
inline

Definition at line 305 of file XrdPfc.hh.

305 { return m_trace; }

Referenced by XrdPfc::IO::GetTrace().

+ Here is the caller graph for this function:

◆ is_prefetch_enabled()

bool XrdPfc::Cache::is_prefetch_enabled ( ) const
inline

Definition at line 317 of file XrdPfc.hh.

317 { return m_prefetch_enabled; }

Referenced by XrdOucGetCache().

+ Here is the caller graph for this function:

◆ IsFileActiveOrPurgeProtected()

bool Cache::IsFileActiveOrPurgeProtected ( const std::string &  path) const

Definition at line 686 of file XrdPfc.cc.

687 {
688  XrdSysCondVarHelper lock(&m_active_cond);
689 
690  return m_active.find(path) != m_active.end() ||
691  m_purge_delay_set.find(path) != m_purge_delay_set.end();
692 }

◆ LocalFilePath()

int Cache::LocalFilePath ( const char *  curl,
char *  buff = 0,
int  blen = 0,
LFP_Reason  why = ForAccess,
bool  forall = false 
)
virtual

Get the path to a file that is complete in the local cache. By default, the file must be complete in the cache (i.e. no blocks are missing). This can be overridden. This path can be used to access the file on the local node.

Returns
0 - the file is complete and the local path to the file is in the buffer, if it has been supllied.
<0 - the request could not be fulfilled. The return value is -errno describing why. If a buffer was supplied and a path could be generated it is returned only if "why" is ForCheck or ForInfo. Otherwise, a null path is returned.
>0 - Reserved for future use.

Reimplemented from XrdOucCache.

Definition at line 803 of file XrdPfc.cc.

805 {
806  static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
807  static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
808  static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
809 
810  TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
811 
812  if (buff && blen > 0) buff[0] = 0;
813 
814  XrdCl::URL url(curl);
815  std::string f_name = url.GetPath();
816  std::string i_name = f_name + Info::s_infoExtension;
817 
818  if (why == ForPath)
819  {
820  int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
821  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
822  return ret;
823  }
824 
825  {
826  XrdSysCondVarHelper lock(&m_active_cond);
827  m_purge_delay_set.insert(f_name);
828  }
829 
830  struct stat sbuff, sbuff2;
831  if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
832  m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
833  {
834  if (S_ISDIR(sbuff.st_mode))
835  {
836  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
837  return -EISDIR;
838  }
839  else
840  {
841  bool read_ok = false;
842  bool is_complete = false;
843 
844  // Lock and check if the file is active. If NOT, keep the lock
845  // and add dummy access after successful reading of info file.
846  // If it IS active, just release the lock, this ongoing access will
847  // assure the file continues to exist.
848 
849  // XXXX How can I just loop over the cinfo file when active?
850  // Can I not get is_complete from the existing file?
851  // Do I still want to inject access record?
852  // Oh, it writes only if not active .... still let's try to use existing File.
853 
854  m_active_cond.Lock();
855 
856  bool is_active = m_active.find(f_name) != m_active.end();
857 
858  if (is_active) m_active_cond.UnLock();
859 
860  XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
861  XrdOucEnv myEnv;
862  int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
863  if (res >= 0)
864  {
865  Info info(m_trace, 0);
866  if (info.Read(infoFile, i_name.c_str()))
867  {
868  read_ok = true;
869 
870  is_complete = info.IsComplete();
871 
872  // Add full-size access if reason is for access.
873  if ( ! is_active && is_complete && why == ForAccess)
874  {
875  info.WriteIOStatSingle(info.GetFileSize());
876  info.Write(infoFile, i_name.c_str());
877  }
878  }
879  infoFile->Close();
880  }
881  delete infoFile;
882 
883  if ( ! is_active) m_active_cond.UnLock();
884 
885  if (read_ok)
886  {
887  if ((is_complete || why == ForInfo) && buff != 0)
888  {
889  int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
890  if (res2 < 0)
891  return res2;
892 
893  // Normally, files are owned by us but when direct cache access
894  // is wanted and possible, make sure the file is world readable.
895  if (why == ForAccess)
896  {mode_t mode = (forall ? worldReadable : groupReadable);
897  if (((sbuff.st_mode & worldReadable) != mode)
898  && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
899  {is_complete = false;
900  *buff = 0;
901  }
902  }
903  }
904 
905  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
906  (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
907 
908  return is_complete ? 0 : -EREMOTE;
909  }
910  }
911  }
912 
913  TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
914  return -ENOENT;
915 }
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0

References XrdOss::Chmod(), XrdOssDF::Close(), Debug, XrdOucCache::ForAccess, XrdOucCache::ForInfo, XrdOucCache::ForPath, XrdPfc::Info::GetFileSize(), XrdCl::URL::GetPath(), XrdPfc::Info::IsComplete(), XrdOss::Lfn2Pfn(), XrdSysCondVar::Lock(), XrdPfc::Configuration::m_username, XrdOss::newFile(), XrdOssDF::Open(), XrdPfc::Info::Read(), XrdPfc::Info::s_infoExtension, stat, XrdOss::Stat(), TRACE, XrdSysCondVar::UnLock(), XrdPfc::Info::Write(), XrdPfc::Info::WriteIOStatSingle(), and XrdOssOK.

+ Here is the call graph for this function:

◆ Prefetch()

void Cache::Prefetch ( )

Definition at line 761 of file XrdPfc.cc.

762 {
763  const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
764 
765  while (true)
766  {
767  m_RAM_mutex.Lock();
768  bool doPrefetch = (m_RAM_used < limit_RAM);
769  m_RAM_mutex.UnLock();
770 
771  if (doPrefetch)
772  {
774  f->Prefetch();
775  }
776  else
777  {
779  }
780  }
781 }
File * GetNextFileToPrefetch()
Definition: XrdPfc.cc:742
void Prefetch()
static void Wait(int milliseconds)
Definition: XrdSysTimer.cc:227

References GetNextFileToPrefetch(), XrdSysMutex::Lock(), XrdPfc::Configuration::m_RamAbsAvailable, XrdPfc::File::Prefetch(), XrdSysMutex::UnLock(), and XrdSysTimer::Wait().

Referenced by PrefetchThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ prefetch_str2value()

bool Cache::prefetch_str2value ( const char *  from,
const char *  str,
int &  val,
int  min,
int  max 
) const

Definition at line 121 of file XrdPfcConfiguration.cc.

123 {
124  if (XrdOuca2x::a2i(m_log, "Error parsing prefetch block count", str, &val, min, max))
125  return false;
126 
127  return true;
128 }

References XrdOuca2x::a2i().

+ Here is the call graph for this function:

◆ Prepare()

int Cache::Prepare ( const char *  curl,
int  oflags,
mode_t  mode 
)
virtual

Preapare the cache for a file open request. This method is called prior to actually opening a file. This method is meant to allow defering an open request or implementing the full I/O stack in the cache layer.

Returns
<0 Error has occurred, return value is -errno; fail open request. =0 Continue with open() request. >0 Defer open but treat the file as actually being open. Use the XrdOucCacheIO::Open() method to open the file at a later time.

Reimplemented from XrdOucCache.

Definition at line 1130 of file XrdPfc.cc.

1131 {
1132  XrdCl::URL url(curl);
1133  std::string f_name = url.GetPath();
1134  std::string i_name = f_name + Info::s_infoExtension;
1135 
1136  // Do not allow write access.
1137  if ((oflags & O_ACCMODE) != O_RDONLY)
1138  {
1139  if (Cache::GetInstance().RefConfiguration().m_write_through)
1140  {
1141  return 0;
1142  }
1143  TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1144  return -EROFS;
1145  }
1146 
1147  // Intercept xrdpfc_command requests.
1148  if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1149  {
1150  // Schedule a job to process command request.
1151  {
1152  CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1153 
1154  schedP->Schedule(ce);
1155  }
1156 
1157  return -EAGAIN;
1158  }
1159 
1160  {
1161  XrdSysCondVarHelper lock(&m_active_cond);
1162  m_purge_delay_set.insert(f_name);
1163  }
1164 
1165  struct stat sbuff;
1166  if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1167  {
1168 
1169  if (m_configuration.m_httpcc && !is_http_cache_valid(f_name, i_name, url))
1170  {
1171  TRACE(Info, "Http cache not valid " << f_name);
1172  UnlinkFile(f_name, false);
1173  return 0;
1174  }
1175 
1176  TRACE(Dump, "Prepare defer open " << f_name);
1177  return 1;
1178  }
1179  else
1180  {
1181  return 0;
1182  }
1183 }
static XrdScheduler * schedP
Definition: XrdPfc.hh:312
void Schedule(XrdJob *jp)
@ Warning

References GetInstance(), XrdCl::URL::GetPath(), XrdPfc::Configuration::m_allow_xrdpfc_command, XrdPfc::Configuration::m_httpcc, RefConfiguration(), XrdPfc::Info::s_infoExtension, schedP, XrdScheduler::Schedule(), stat, XrdOss::Stat(), TRACE, UnlinkFile(), TPC::Warning, and XrdOssOK.

+ Here is the call graph for this function:

◆ ProcessWriteTasks()

void Cache::ProcessWriteTasks ( )

Separate task which writes blocks from ram to disk.

Definition at line 281 of file XrdPfc.cc.

282 {
283  std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
284 
285  while (true)
286  {
287  m_writeQ.condVar.Lock();
288  while (m_writeQ.size == 0)
289  {
290  m_writeQ.condVar.Wait();
291  }
292 
293  // MT -- optimize to pop several blocks if they are available (or swap the list).
294  // This makes sense especially for smallish block sizes.
295 
296  int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
297  long long sum_size = 0;
298 
299  for (int bi = 0; bi < n_pushed; ++bi)
300  {
301  Block* block = m_writeQ.queue.front();
302  m_writeQ.queue.pop_front();
303  m_writeQ.writes_between_purges += block->get_size();
304  sum_size += block->get_size();
305 
306  blks_to_write[bi] = block;
307 
308  TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
309  }
310  m_writeQ.size -= n_pushed;
311 
312  m_writeQ.condVar.UnLock();
313 
314  {
315  XrdSysMutexHelper lock(&m_RAM_mutex);
316  m_RAM_write_queue -= sum_size;
317  }
318 
319  for (int bi = 0; bi < n_pushed; ++bi)
320  {
321  Block* block = blks_to_write[bi];
322 
323  block->m_file->WriteBlockToDisk(block);
324  }
325  }
326 }
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)

References XrdPfc::Block::get_size(), XrdPfc::File::lPath(), XrdPfc::Block::m_file, XrdPfc::Configuration::m_wqueue_blocks, TRACE, and XrdPfc::File::WriteBlockToDisk().

Referenced by ProcessWriteTaskThread().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RefConfiguration()

const Configuration& XrdPfc::Cache::RefConfiguration ( ) const
inline

Reference XrdPfc configuration.

Definition at line 225 of file XrdPfc.hh.

225 { return m_configuration; }

Referenced by XrdPfc::IOFileBlock::IOFileBlock(), Attach(), Conf(), Prepare(), and XrdOucGetCache().

+ Here is the caller graph for this function:

◆ RefResMon()

ResourceMonitor& XrdPfc::Cache::RefResMon ( )
inline

Definition at line 307 of file XrdPfc.hh.

307 { return *m_res_mon; }

Referenced by ResMon().

+ Here is the caller graph for this function:

◆ RegisterPrefetchFile()

void Cache::RegisterPrefetchFile ( File file)

Definition at line 704 of file XrdPfc.cc.

705 {
706  // Can be called with other locks held.
707 
708  if ( ! m_prefetch_enabled)
709  {
710  return;
711  }
712 
713  m_prefetch_condVar.Lock();
714  m_prefetchList.push_back(file);
715  m_prefetch_condVar.Signal();
716  m_prefetch_condVar.UnLock();
717 }

References XrdSysCondVar::Lock(), XrdSysCondVar::Signal(), and XrdSysCondVar::UnLock().

+ Here is the call graph for this function:

◆ ReleaseFile()

void Cache::ReleaseFile ( File f,
IO io 
)

Definition at line 480 of file XrdPfc.cc.

481 {
482  // Called from virtual IO::DetachFinalize.
483 
484  TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
485 
486  {
487  XrdSysCondVarHelper lock(&m_active_cond);
488 
489  f->RemoveIO(io);
490  }
491  dec_ref_cnt(f, true);
492 }
void RemoveIO(IO *io)

References Debug, XrdPfc::File::GetLocalPath(), XrdPfc::File::RemoveIO(), and TRACE.

Referenced by XrdPfc::IOFile::DetachFinalize(), and XrdPfc::IOFileBlock::DetachFinalize().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ReleaseRAM()

void Cache::ReleaseRAM ( char *  buf,
long long  size 
)

Definition at line 379 of file XrdPfc.cc.

380 {
381  bool std_size = (size == m_configuration.m_bufferSize);
382  {
383  XrdSysMutexHelper lock(&m_RAM_mutex);
384 
385  m_RAM_used -= size;
386 
387  if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
388  {
389  m_RAM_std_blocks.push_back(buf);
390  ++m_RAM_std_size;
391  return;
392  }
393  }
394  free(buf);
395 }

References XrdPfc::Configuration::m_bufferSize, and XrdPfc::Configuration::m_RamKeepStdBlocks.

◆ RemoveWriteQEntriesFor()

void Cache::RemoveWriteQEntriesFor ( File f)

Remove blocks from write queue which belong to given prefetch. This method is used at the time of File destruction.

Definition at line 248 of file XrdPfc.cc.

249 {
250  std::list<Block*> removed_blocks;
251  long long sum_size = 0;
252 
253  m_writeQ.condVar.Lock();
254  std::list<Block*>::iterator i = m_writeQ.queue.begin();
255  while (i != m_writeQ.queue.end())
256  {
257  if ((*i)->m_file == file)
258  {
259  TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
260  std::list<Block*>::iterator j = i++;
261  removed_blocks.push_back(*j);
262  sum_size += (*j)->get_size();
263  m_writeQ.queue.erase(j);
264  --m_writeQ.size;
265  }
266  else
267  {
268  ++i;
269  }
270  }
271  m_writeQ.condVar.UnLock();
272 
273  {
274  XrdSysMutexHelper lock(&m_RAM_mutex);
275  m_RAM_write_queue -= sum_size;
276  }
277 
278  file->BlocksRemovedFromWriteQ(removed_blocks);
279 }

References XrdPfc::File::BlocksRemovedFromWriteQ(), XrdPfc::File::lPath(), and TRACE.

Referenced by UnlinkFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RequestRAM()

char * Cache::RequestRAM ( long long  size)

Definition at line 339 of file XrdPfc.cc.

340 {
341  static const size_t s_block_align = sysconf(_SC_PAGESIZE);
342 
343  bool std_size = (size == m_configuration.m_bufferSize);
344 
345  m_RAM_mutex.Lock();
346 
347  long long total = m_RAM_used + size;
348 
349  if (total <= m_configuration.m_RamAbsAvailable)
350  {
351  m_RAM_used = total;
352  if (std_size && m_RAM_std_size > 0)
353  {
354  char *buf = m_RAM_std_blocks.back();
355  m_RAM_std_blocks.pop_back();
356  --m_RAM_std_size;
357 
358  m_RAM_mutex.UnLock();
359 
360  return buf;
361  }
362  else
363  {
364  m_RAM_mutex.UnLock();
365  char *buf;
366  if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
367  {
368  // Report out of mem? Probably should report it at least the first time,
369  // then periodically.
370  return 0;
371  }
372  return buf;
373  }
374  }
375  m_RAM_mutex.UnLock();
376  return 0;
377 }

References XrdSysMutex::Lock(), XrdPfc::Configuration::m_bufferSize, XrdPfc::Configuration::m_RamAbsAvailable, and XrdSysMutex::UnLock().

+ Here is the call graph for this function:

◆ ResMon()

ResourceMonitor & Cache::ResMon ( )
static

Definition at line 139 of file XrdPfc.cc.

139 { return m_instance->RefResMon(); }
ResourceMonitor & RefResMon()
Definition: XrdPfc.hh:307

References RefResMon().

Referenced by XrdPfc::ResourceMonitor::perform_purge_check(), ResourceMonitorThread(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ScheduleFileSync()

void XrdPfc::Cache::ScheduleFileSync ( File f)
inline

Definition at line 300 of file XrdPfc.hh.

300 { schedule_file_sync(f, false, false); }

◆ Stat()

int Cache::Stat ( const char *  curl,
struct stat sbuff 
)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information. >0 - Stat could not be done, forward operation to next level.

Reimplemented from XrdOucCache.

Definition at line 1193 of file XrdPfc.cc.

1194 {
1195  const char *tpfx = "Stat ";
1196 
1197  XrdCl::URL url(curl);
1198  std::string f_name = url.GetPath();
1199 
1200  File *file = nullptr;
1201  {
1202  XrdSysCondVarHelper lock(&m_active_cond);
1203  auto it = m_active.find(f_name);
1204  if (it != m_active.end()) {
1205  file = it->second;
1206  // If `file` is nullptr, the file-open is in progress; instead
1207  // of waiting for the file-open to finish, simply treat it as if
1208  // the file-open doesn't exist.
1209  if (file) {
1210  inc_ref_cnt(file, false, false);
1211  }
1212  }
1213  }
1214  if (file) {
1215  int res = file->Fstat(sbuff);
1216  dec_ref_cnt(file, false);
1217  TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1218  return res;
1219  }
1220 
1221  int res = m_oss->Stat(f_name.c_str(), &sbuff);
1222  if (res != XrdOssOK) {
1223  TRACE(Debug, tpfx << curl << " -> " << res);
1224  return 1; // res; -- for only-if-cached
1225  }
1226  if (S_ISDIR(sbuff.st_mode))
1227  {
1228  TRACE(Debug, tpfx << curl << " -> EISDIR");
1229  return -EISDIR;
1230  }
1231 
1232  long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1233  if (file_size < 0) {
1234  TRACE(Debug, tpfx << curl << " -> " << file_size);
1235  return 1; // (int) file_size; -- for only-if-cached
1236  }
1237  sbuff.st_size = file_size;
1238  bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1239  if ( ! is_cached)
1240  sbuff.st_atime = 0;
1241 
1242  TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1243 
1244  return 0;
1245 }

References Debug, DecideIfConsideredCached(), DetermineFullFileSize(), XrdPfc::File::Fstat(), XrdCl::URL::GetPath(), XrdPfc::Info::s_infoExtension, XrdOss::Stat(), TRACE, and XrdOssOK.

Referenced by ExecuteCommandUrl().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ TheOne()

const Cache & Cache::TheOne ( )
static

Definition at line 137 of file XrdPfc.cc.

137 { return *m_instance; }

Referenced by XrdPfc::OldStylePurgeDriver(), and XrdPfc::UnlinkPurgeStateFilesInMap().

+ Here is the caller graph for this function:

◆ Unlink()

int Cache::Unlink ( const char *  curl)
virtual
Returns
<0 - Stat failed, value is -errno. =0 - Stat succeeded, sbuff holds stat information.

Reimplemented from XrdOucCache.

Definition at line 1254 of file XrdPfc.cc.

1255 {
1256  XrdCl::URL url(curl);
1257  std::string f_name = url.GetPath();
1258 
1259  // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1260 
1261  return UnlinkFile(f_name, false);
1262 }

References XrdCl::URL::GetPath(), and UnlinkFile().

+ Here is the call graph for this function:

◆ UnlinkFile()

int Cache::UnlinkFile ( const std::string &  f_name,
bool  fail_if_open 
)

Remove cinfo and data files from cache.

Definition at line 1264 of file XrdPfc.cc.

1265 {
1266  static const char* trc_pfx = "UnlinkFile ";
1267  ActiveMap_i it;
1268  File *file = 0;
1269  long long st_blocks_to_purge = 0;
1270  {
1271  XrdSysCondVarHelper lock(&m_active_cond);
1272 
1273  it = m_active.find(f_name);
1274 
1275  if (it != m_active.end())
1276  {
1277  if (fail_if_open)
1278  {
1279  TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1280  return -EBUSY;
1281  }
1282 
1283  // Null File* in m_active map means an operation is ongoing, probably
1284  // Attach() with possible File::Open(). Ask for retry.
1285  if (it->second == 0)
1286  {
1287  TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1288  return -EAGAIN;
1289  }
1290 
1291  file = it->second;
1292  st_blocks_to_purge = file->initiate_emergency_shutdown();
1293  it->second = 0;
1294  }
1295  else
1296  {
1297  it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1298  }
1299  }
1300 
1301  if (file) {
1302  RemoveWriteQEntriesFor(file);
1303  } else {
1304  struct stat f_stat;
1305  if (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK)
1306  st_blocks_to_purge = f_stat.st_blocks;
1307  }
1308 
1309  std::string i_name = f_name + Info::s_infoExtension;
1310 
1311  // Unlink file & cinfo
1312  int f_ret = m_oss->Unlink(f_name.c_str());
1313  int i_ret = m_oss->Unlink(i_name.c_str());
1314 
1315  if (st_blocks_to_purge)
1316  m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1317 
1318  TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1319 
1320  {
1321  XrdSysCondVarHelper lock(&m_active_cond);
1322  m_active.erase(it);
1323  m_active_cond.Broadcast();
1324  }
1325 
1326  return std::min(f_ret, i_ret);
1327 }
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition: XrdPfc.cc:248
long long initiate_emergency_shutdown()
void register_file_purge(DirState *target, long long size_in_st_blocks)

References XrdSysCondVar::Broadcast(), Debug, XrdPfc::File::initiate_emergency_shutdown(), XrdPfc::ResourceMonitor::register_file_purge(), RemoveWriteQEntriesFor(), XrdPfc::Info::s_infoExtension, stat, XrdOss::Stat(), TRACE, XrdOss::Unlink(), and XrdOssOK.

Referenced by ExecuteCommandUrl(), XrdPfcFSctl::FSctl(), Prepare(), and Unlink().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ VCheck()

static bool XrdPfc::Cache::VCheck ( XrdVersionInfo &  urVersion)
inlinestatic

Version check.

Definition at line 255 of file XrdPfc.hh.

255 { return true; }

◆ WriteCacheControlXAttr()

void Cache::WriteCacheControlXAttr ( int  cinfo_fd,
const char *  path,
const std::string &  cc 
)

Definition at line 921 of file XrdPfc.cc.

922 {
923  if (m_metaXattr) {
924  int res = XrdSysXAttrActive->Set("pfc.cache-control", cc.c_str(), cc.size(), path, cinfo_fd, 0);
925  if (res != 0) {
926  TRACE(Error, "WritecacheControlXAttr error setting xattr " << res);
927  }
928  }
929 }
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0

References Error, XrdSysXAttr::Set(), TRACE, and XrdSysXAttrActive.

+ Here is the call graph for this function:

◆ WriteFileSizeXAttr()

void Cache::WriteFileSizeXAttr ( int  cinfo_fd,
long long  file_size 
)

Definition at line 934 of file XrdPfc.cc.

935 {
936  if (m_metaXattr) {
937  int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
938  if (res != 0) {
939  TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
940  }
941  }
942 }

References Debug, XrdSysXAttr::Set(), TRACE, and XrdSysXAttrActive.

+ Here is the call graph for this function:

◆ WritesSinceLastCall()

long long Cache::WritesSinceLastCall ( )

Definition at line 328 of file XrdPfc.cc.

329 {
330  // Called from ResourceMonitor for an alternative estimation of disk writes.
331  XrdSysCondVarHelper lock(&m_writeQ.condVar);
332  long long ret = m_writeQ.writes_between_purges;
333  m_writeQ.writes_between_purges = 0;
334  return ret;
335 }

Referenced by XrdPfc::ResourceMonitor::perform_purge_check().

+ Here is the caller graph for this function:

Member Data Documentation

◆ schedP

XrdScheduler * Cache::schedP = nullptr
static

The documentation for this class was generated from the following files: