XRootD
XrdPfc.hh
Go to the documentation of this file.
1 #ifndef __XRDPFC_CACHE_HH__
2 #define __XRDPFC_CACHE_HH__
3 //----------------------------------------------------------------------------------
4 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
5 // Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
6 //----------------------------------------------------------------------------------
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,fm_pu
13 // but WITHOUT ANY emacs WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //----------------------------------------------------------------------------------
20 #include <string>
21 #include <list>
22 #include <map>
23 #include <set>
24 
25 #include "Xrd/XrdScheduler.hh"
26 #include "XrdVersion.hh"
27 #include "XrdSys/XrdSysPthread.hh"
28 #include "XrdOuc/XrdOucCache.hh"
29 #include "XrdOuc/XrdOucCallBack.hh"
30 
31 #include "XrdPfcFile.hh"
32 #include "XrdPfcDecision.hh"
33 
34 class XrdOss;
35 class XrdOucStream;
36 class XrdSysError;
37 class XrdSysTrace;
38 class XrdXrootdGStream;
39 
40 namespace XrdPfc
41 {
42 class File;
43 class IO;
44 class PurgePin;
45 class ResourceMonitor;
46 
47 
48 template<class MOO>
49 struct MutexHolder {
50  MOO &mutex;
51  MutexHolder(MOO &m) : mutex(m) { mutex.Lock(); }
52  ~MutexHolder() { mutex.UnLock(); }
53 };
54 }
55 
56 
57 namespace XrdPfc
58 {
59 
60 //----------------------------------------------------------------------------
62 //----------------------------------------------------------------------------
64 {
65  Configuration();
66 
67  bool are_file_usage_limits_set() const { return m_fileUsageMax > 0; }
68  bool is_age_based_purge_in_effect() const { return m_purgeColdFilesAge > 0 ; }
69  bool is_uvkeep_purge_in_effect() const { return m_cs_UVKeep >= 0; }
70  bool is_dir_stat_reporting_on() const { return m_dirStatsMaxDepth >= 0 || ! m_dirStatsDirs.empty() || ! m_dirStatsDirGlobs.empty(); }
71  bool is_purge_plugin_set_up() const { return false; }
72 
74 
75  bool is_cschk_cache() const { return m_cs_Chk & CSChk_Cache; }
76  bool is_cschk_net() const { return m_cs_Chk & CSChk_Net; }
77  bool is_cschk_any() const { return m_cs_Chk & CSChk_Both; }
78  bool is_cschk_both() const { return (m_cs_Chk & CSChk_Both) == CSChk_Both; }
79 
80  bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const { return m_cs_Chk & ~cks_on_file; }
81 
82  bool should_uvkeep_purge(time_t delta) const { return m_cs_UVKeep >= 0 && delta > m_cs_UVKeep; }
83 
84  bool m_hdfsmode;
86 
87  std::string m_username;
88  std::string m_data_space;
89  std::string m_meta_space;
90 
91  long long m_diskTotalSpace;
92  long long m_diskUsageLWM;
93  long long m_diskUsageHWM;
94  long long m_fileUsageBaseline;
95  long long m_fileUsageNominal;
96  long long m_fileUsageMax;
101 
102  std::set<std::string> m_dirStatsDirs;
103  std::set<std::string> m_dirStatsDirGlobs;
106 
107  long long m_bufferSize;
108  long long m_RamAbsAvailable;
113 
114  long long m_hdfsbsize;
115  long long m_flushCnt;
116 
117  time_t m_cs_UVKeep;
118  int m_cs_Chk;
119  bool m_cs_ChkTLS;
120 
123 };
124 
125 //------------------------------------------------------------------------------
126 
128 {
129  std::string m_diskUsageLWM;
130  std::string m_diskUsageHWM;
131  std::string m_fileUsageBaseline;
132  std::string m_fileUsageNominal;
133  std::string m_fileUsageMax;
134  std::string m_flushRaw;
135 
137  m_diskUsageLWM("0.90"), m_diskUsageHWM("0.95"),
138  m_flushRaw("")
139  {}
140 };
141 
142 
143 //==============================================================================
144 // Cache
145 //==============================================================================
146 
147 //----------------------------------------------------------------------------
149 //----------------------------------------------------------------------------
150 class Cache : public XrdOucCache
151 {
152 public:
153  //---------------------------------------------------------------------
155  //---------------------------------------------------------------------
156  Cache(XrdSysLogger *logger, XrdOucEnv *env);
157 
158  //---------------------------------------------------------------------
160  //---------------------------------------------------------------------
161  using XrdOucCache::Attach;
162 
163  virtual XrdOucCacheIO *Attach(XrdOucCacheIO *, int Options = 0);
164 
165  //---------------------------------------------------------------------
166  // Virtual function of XrdOucCache. Used for redirection to a local
167  // file on a distributed FS.
168  virtual int LocalFilePath(const char *url, char *buff=0, int blen=0,
169  LFP_Reason why=ForAccess, bool forall=false);
170 
171  //---------------------------------------------------------------------
172  // Virtual function of XrdOucCache. Used for deferred open.
173  virtual int Prepare(const char *url, int oflags, mode_t mode);
174 
175  // virtual function of XrdOucCache.
176  virtual int Stat(const char *url, struct stat &sbuff);
177 
178  // virtual function of XrdOucCache.
179  virtual int Unlink(const char *url);
180 
181  //---------------------------------------------------------------------
182  // Used by PfcFstcl::Fsctl function.
183  // Test if file is cached taking in onlyifcached configuration parameters.
184  //---------------------------------------------------------------------
185  virtual int ConsiderCached(const char *url);
186 
187  bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk);
188  void WriteFileSizeXAttr(int cinfo_fd, long long file_size);
189  long long DetermineFullFileSize(const std::string &cinfo_fname);
190 
191  //--------------------------------------------------------------------
197  //--------------------------------------------------------------------
198  bool Decide(XrdOucCacheIO*);
199 
200  //------------------------------------------------------------------------
202  //------------------------------------------------------------------------
203  const Configuration& RefConfiguration() const { return m_configuration; }
204 
205  //---------------------------------------------------------------------
212  //---------------------------------------------------------------------
213  bool Config(const char *config_filename, const char *parameters);
214 
215  //---------------------------------------------------------------------
217  //---------------------------------------------------------------------
218  static Cache &CreateInstance(XrdSysLogger *logger, XrdOucEnv *env);
219 
220  //---------------------------------------------------------------------
222  //---------------------------------------------------------------------
223  static Cache &GetInstance();
224  static const Cache &TheOne();
225  static const Configuration &Conf();
226 
227  static ResourceMonitor &ResMon();
228 
229  //---------------------------------------------------------------------
231  //---------------------------------------------------------------------
232  static bool VCheck(XrdVersionInfo &urVersion) { return true; }
233 
234  //---------------------------------------------------------------------
236  //---------------------------------------------------------------------
237  int UnlinkFile(const std::string& f_name, bool fail_if_open);
238 
239  //---------------------------------------------------------------------
241  //---------------------------------------------------------------------
242  void AddWriteTask(Block* b, bool from_read);
243 
244  //---------------------------------------------------------------------
247  //---------------------------------------------------------------------
248  void RemoveWriteQEntriesFor(File *f);
249 
250  //---------------------------------------------------------------------
252  //---------------------------------------------------------------------
253  void ProcessWriteTasks();
254 
255  long long WritesSinceLastCall();
256 
257  char* RequestRAM(long long size);
258  void ReleaseRAM(char* buf, long long size);
259 
260  void RegisterPrefetchFile(File*);
262 
264 
265  void Prefetch();
266 
267  XrdOss* GetOss() const { return m_oss; }
268 
269  bool IsFileActiveOrPurgeProtected(const std::string&) const;
270  void ClearPurgeProtectedSet();
271  PurgePin* GetPurgePin() const { return m_purge_pin; }
272 
273  File* GetFile(const std::string&, IO*, long long off = 0, long long filesize = 0);
274 
275  void ReleaseFile(File*, IO*);
276 
277  void ScheduleFileSync(File* f) { schedule_file_sync(f, false, false); }
278 
279  void FileSyncDone(File*, bool high_debug);
280 
281  XrdSysError* GetLog() { return &m_log; }
282  XrdSysTrace* GetTrace() { return m_trace; }
283 
284  ResourceMonitor& RefResMon() { return *m_res_mon; }
285  XrdXrootdGStream* GetGStream() { return m_gstream; }
286 
287  void ExecuteCommandUrl(const std::string& command_url);
288 
290 
291 private:
292  bool ConfigParameters(std::string, XrdOucStream&, TmpConfiguration &tmpc);
293  bool ConfigXeq(char *, XrdOucStream &);
294  bool xcschk(XrdOucStream &);
295  bool xdlib(XrdOucStream &);
296  bool xplib(XrdOucStream &);
297  bool xtrace(XrdOucStream &);
298  bool test_oss_basics_and_features();
299 
300  bool cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name);
301 
302  static Cache *m_instance;
303 
304  XrdOucEnv *m_env;
305  XrdSysError m_log;
306  XrdSysTrace *m_trace;
307  const char *m_traceID;
308 
309  XrdOss *m_oss;
310 
311  XrdXrootdGStream *m_gstream;
312 
313  ResourceMonitor *m_res_mon;
314 
315  std::vector<Decision*> m_decisionpoints;
316  PurgePin* m_purge_pin;
317 
318  Configuration m_configuration;
319 
320  XrdSysCondVar m_prefetch_condVar;
321  bool m_prefetch_enabled;
322 
323  XrdSysMutex m_RAM_mutex;
324  long long m_RAM_used;
325  long long m_RAM_write_queue;
326  std::list<char*> m_RAM_std_blocks;
327  int m_RAM_std_size;
328 
329  bool m_isClient;
330  bool m_dataXattr = false;
331  bool m_metaXattr = false;
332 
333  struct WriteQ
334  {
335  WriteQ() : condVar(0), writes_between_purges(0), size(0) {}
336 
337  XrdSysCondVar condVar;
338  std::list<Block*> queue;
339  long long writes_between_purges;
340  int size;
341  };
342 
343  WriteQ m_writeQ;
344 
345  // active map, purge delay set
346  typedef std::map<std::string, File*> ActiveMap_t;
347  typedef ActiveMap_t::iterator ActiveMap_i;
348  typedef std::set<std::string> FNameSet_t;
349 
350  ActiveMap_t m_active;
351  FNameSet_t m_purge_delay_set;
352  mutable XrdSysCondVar m_active_cond;
353 
354  void inc_ref_cnt(File*, bool lock, bool high_debug);
355  void dec_ref_cnt(File*, bool high_debug);
356 
357  void schedule_file_sync(File*, bool ref_cnt_already_set, bool high_debug);
358 
359  // prefetching
360  typedef std::vector<File*> PrefetchList;
361  PrefetchList m_prefetchList;
362 };
363 
364 }
365 
366 #endif
int stat(const char *path, struct stat *buf)
XrdOucString File
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *ioP, int opts=0)=0
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition: XrdPfc.hh:151
PurgePin * GetPurgePin() const
Definition: XrdPfc.hh:271
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition: XrdPfc.cc:911
void FileSyncDone(File *, bool high_debug)
Definition: XrdPfc.cc:543
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition: XrdPfc.cc:390
XrdOss * GetOss() const
Definition: XrdPfc.hh:267
static const Configuration & Conf()
Definition: XrdPfc.cc:135
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
Definition: XrdPfc.cc:779
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1096
XrdSysTrace * GetTrace()
Definition: XrdPfc.hh:282
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition: XrdPfc.hh:203
static ResourceMonitor & ResMon()
Definition: XrdPfc.cc:136
XrdXrootdGStream * GetGStream()
Definition: XrdPfc.hh:285
bool IsFileActiveOrPurgeProtected(const std::string &) const
Definition: XrdPfc.cc:662
void ClearPurgeProtectedSet()
Definition: XrdPfc.cc:670
XrdSysError * GetLog()
Definition: XrdPfc.hh:281
void ReleaseRAM(char *buf, long long size)
Definition: XrdPfc.cc:372
virtual int ConsiderCached(const char *url)
Definition: XrdPfc.cc:985
static Cache & GetInstance()
Singleton access.
Definition: XrdPfc.cc:133
void DeRegisterPrefetchFile(File *)
Definition: XrdPfc.cc:696
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
Definition: XrdPfc.cc:680
void WriteFileSizeXAttr(int cinfo_fd, long long file_size)
Definition: XrdPfc.cc:896
void Prefetch()
Definition: XrdPfc.cc:737
void ReleaseFile(File *, IO *)
Definition: XrdPfc.cc:472
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Definition: XrdPfc.cc:222
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition: XrdPfc.cc:159
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition: XrdPfc.cc:138
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1163
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *ioP, int opts=0)=0
Obtain a new IO object that fronts existing XrdOucCacheIO.
static XrdScheduler * schedP
Definition: XrdPfc.hh:289
File * GetNextFileToPrefetch()
Definition: XrdPfc.cc:718
ResourceMonitor & RefResMon()
Definition: XrdPfc.hh:284
long long WritesSinceLastCall()
Definition: XrdPfc.cc:321
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
Definition: XrdPfc.cc:274
virtual int Unlink(const char *url)
Definition: XrdPfc.cc:1153
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition: XrdPfc.cc:241
static const Cache & TheOne()
Definition: XrdPfc.cc:134
static bool VCheck(XrdVersionInfo &urVersion)
Version check.
Definition: XrdPfc.hh:232
char * RequestRAM(long long size)
Definition: XrdPfc.cc:332
virtual int Prepare(const char *url, int oflags, mode_t mode)
Definition: XrdPfc.cc:1045
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition: XrdPfc.cc:952
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Definition: XrdPfc.cc:126
void ScheduleFileSync(File *f)
Definition: XrdPfc.hh:277
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition: XrdPfcIO.hh:16
Base class for reguesting directory space to obtain.
Definition: XrdPfc.hh:41
@ CSChk_Both
Definition: XrdPfcTypes.hh:27
@ CSChk_Net
Definition: XrdPfcTypes.hh:27
@ CSChk_Cache
Definition: XrdPfcTypes.hh:27
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition: XrdPfc.hh:114
long long m_RamAbsAvailable
available from configuration
Definition: XrdPfc.hh:108
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition: XrdPfc.hh:115
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition: XrdPfc.hh:100
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition: XrdPfc.hh:80
int m_wqueue_threads
number of threads writing blocks to disk
Definition: XrdPfc.hh:111
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition: XrdPfc.hh:91
long long m_fileUsageMax
cache purge - files usage maximum
Definition: XrdPfc.hh:96
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition: XrdPfc.hh:94
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition: XrdPfc.hh:105
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition: XrdPfc.hh:85
bool is_purge_plugin_set_up() const
Definition: XrdPfc.hh:71
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition: XrdPfc.hh:93
bool is_cschk_cache() const
Definition: XrdPfc.hh:75
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition: XrdPfc.hh:103
CkSumCheck_e get_cs_Chk() const
Definition: XrdPfc.hh:73
bool is_uvkeep_purge_in_effect() const
Definition: XrdPfc.hh:69
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition: XrdPfc.hh:112
bool are_file_usage_limits_set() const
Definition: XrdPfc.hh:67
bool is_cschk_any() const
Definition: XrdPfc.hh:77
bool m_cs_ChkTLS
Allow TLS.
Definition: XrdPfc.hh:119
long long m_fileUsageNominal
cache purge - files usage nominal
Definition: XrdPfc.hh:95
int m_cs_Chk
Checksum check.
Definition: XrdPfc.hh:118
bool should_uvkeep_purge(time_t delta) const
Definition: XrdPfc.hh:82
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition: XrdPfc.hh:99
bool m_hdfsmode
flag for enabling block-level operation
Definition: XrdPfc.hh:84
int m_purgeColdFilesAge
purge files older than this age
Definition: XrdPfc.hh:98
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:88
bool is_cschk_both() const
Definition: XrdPfc.hh:78
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition: XrdPfc.hh:102
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition: XrdPfc.hh:92
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition: XrdPfc.hh:109
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:107
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:89
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition: XrdPfc.hh:110
bool is_age_based_purge_in_effect() const
Definition: XrdPfc.hh:68
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
bool is_cschk_net() const
Definition: XrdPfc.hh:76
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:122
time_t m_cs_UVKeep
unverified checksum cache keep
Definition: XrdPfc.hh:117
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition: XrdPfc.hh:104
int m_purgeInterval
sleep interval between cache purges
Definition: XrdPfc.hh:97
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition: XrdPfc.hh:121
bool is_dir_stat_reporting_on() const
Definition: XrdPfc.hh:70
MutexHolder(MOO &m)
Definition: XrdPfc.hh:51
std::string m_diskUsageLWM
Definition: XrdPfc.hh:129
std::string m_diskUsageHWM
Definition: XrdPfc.hh:130
std::string m_fileUsageBaseline
Definition: XrdPfc.hh:131
std::string m_fileUsageNominal
Definition: XrdPfc.hh:132
std::string m_flushRaw
Definition: XrdPfc.hh:134
std::string m_fileUsageMax
Definition: XrdPfc.hh:133