XRootD
XrdPfcCommand.cc
Go to the documentation of this file.
1 //----------------------------------------------------------------------------------
2 // Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3 // Author: Alja Mrak-Tadel
4 //----------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //----------------------------------------------------------------------------------
18 
19 #include "XrdPfcInfo.hh"
20 #include "XrdPfc.hh"
21 #include "XrdPfcTrace.hh"
22 #include "XrdPfcPathParseTools.hh"
23 #include "XrdPfcResourceMonitor.hh"
24 
25 #include "XrdOfs/XrdOfsConfigPI.hh"
26 #include "XrdOss/XrdOss.hh"
27 #include "XrdOuc/XrdOuca2x.hh"
28 #include "XrdOuc/XrdOucArgs.hh"
29 #include "XrdOuc/XrdOucEnv.hh"
30 #include "XrdOuc/XrdOucStream.hh"
31 #include "XrdSys/XrdSysLogger.hh"
33 
34 #include <algorithm>
35 #include <cstring>
36 #include <iostream>
37 #include <fcntl.h>
38 #include <vector>
39 #include <sys/time.h>
40 
41 using namespace XrdPfc;
42 
43 //______________________________________________________________________________
44 
45 const int MAX_ACCESSES = 20;
46 
47 const long long ONE_MB = 1024ll * 1024;
48 const long long ONE_GB = 1024ll * 1024 * 1024;
49 
50 void Cache::ExecuteCommandUrl(const std::string& command_url)
51 {
52  static const char *top_epfx = "ExecuteCommandUrl ";
53 
54  SplitParser cp(command_url, "/");
55 
56  std::string token = cp.get_token();
57 
58  if (token != "xrdpfc_command")
59  {
60  TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
61  return;
62  }
63 
64  // Get the command
65  token = cp.get_token();
66 
67 
68  //================================================================
69  // create_file
70  //================================================================
71 
72  if (token == "create_file")
73  {
74  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
75  static const char* usage =
76  "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
77  " Creates a cache file with given parameters. Data in file is random.\n"
78  " Useful for cache purge testing.\n"
79  "Notes:\n"
80  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
81  " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
82  " . -t and -d can be given multiple times to record several accesses.\n"
83  " . Negative arguments given to -t are interpreted as relative to now.\n";
84 
85  const Configuration &conf = m_configuration;
86 
87  token = cp.get_token();
88 
89  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
90 
91  std::vector<char*> argv;
92  SplitParser ap(token, " ");
93  int argc = ap.fill_argv(argv);
94 
95  long long file_size = ONE_GB;
96  long long block_size = conf.m_bufferSize;
97  int access_time [MAX_ACCESSES];
98  int access_duration[MAX_ACCESSES];
99  int at_count = 0, ad_count = 0;
100  XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
101  "help", 1, "h",
102  "verbose", 1, "v",
103  "size", 1, "s",
104  "blocksize", 1, "b",
105  "time", 1, "t",
106  "duration", 1, "d",
107  (const char *) 0);
108 
109  time_t time_now = time(0);
110 
111  Spec.Set(argc, &argv[0]);
112  char theOpt;
113 
114  while ((theOpt = Spec.getopt()) != (char) -1)
115  {
116  switch (theOpt)
117  {
118  case 'h': {
119  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
120  return;
121  }
122  case 's': {
123  if (XrdOuca2x::a2sz(m_log, "Error getting filesize", Spec.getarg(),
124  &file_size, 0ll, 32 * ONE_GB))
125  return;
126  break;
127  }
128  case 'b': {
129  if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", Spec.getarg(),
130  &block_size, 0ll, 64 * ONE_MB))
131  return;
132  break;
133  }
134  case 't': {
135  if (XrdOuca2x::a2i(m_log, "Error getting access time", Spec.getarg(),
136  &access_time[at_count++], INT_MIN, INT_MAX))
137  return;
138  break;
139  }
140  case 'd': {
141  if (XrdOuca2x::a2i(m_log, "Error getting access duration", Spec.getarg(),
142  &access_duration[ad_count++], 0, 24 * 3600))
143  return;
144  break;
145  }
146  default: {
147  TRACE(Error, err_prefix << "Unhandled command argument.");
148  return;
149  }
150  }
151  }
152  if (Spec.getarg())
153  {
154  TRACE(Error, err_prefix << "Options must take up all the arguments.");
155  return;
156  }
157 
158  if (at_count < 1) access_time [at_count++] = time_now - 10;
159  if (ad_count < 1) access_duration[ad_count++] = 10;
160 
161  if (at_count != ad_count)
162  {
163  TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
164  return;
165  }
166 
167  std::string file_path (cp.get_reminder_with_delim());
168  std::string cinfo_path(file_path + Info::s_infoExtension);
169 
170  TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
171 
172  // Check if cinfo exists ... bail out if it does.
173  {
174  struct stat infoStat;
175  if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
176  {
177  TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
178  return;
179  }
180  }
181 
182  TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
183 
184  {
185  const char *myUser = conf.m_username.c_str();
186  XrdOucEnv myEnv;
187 
188  // Create the data file.
189 
190  char size_str[32]; sprintf(size_str, "%lld", file_size);
191  myEnv.Put("oss.asize", size_str);
192  myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
193  int cret;
194  if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
195  {
196  TRACE(Error, err_prefix << "Create failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
197  return;
198  }
199 
200  XrdOssDF *myFile = GetOss()->newFile(myUser);
201  if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
202  {
203  TRACE(Error, err_prefix << "Open failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
204  delete myFile;
205  return;
206  }
207 
208  // Create the info file.
209 
210  myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
211  myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
212  if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
213  {
214  TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
215  myFile->Close(); delete myFile;
216  return;
217  }
218 
219  XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
220  if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
221  {
222  TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
223  delete myInfoFile;
224  myFile->Close(); delete myFile;
225  return;
226  }
227 
228  // Allocate space for the data file.
229 
230  if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
231  {
232  TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(cret));
233  }
234 
235  // Fill up cinfo.
236 
237  Info myInfo(m_trace, false);
238  myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
239  myInfo.SetAllBitsSynced();
240 
241  for (int i = 0; i < at_count; ++i)
242  {
243  time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
244 
245  myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
246  }
247 
248  myInfo.Write(myInfoFile, cinfo_path.c_str());
249 
250  // Fake last modified time to the last access_time
251  {
252  time_t last_detach;
253  myInfo.GetLatestDetachTime(last_detach);
254  struct timespec acc_mod_time[2] = { {last_detach, UTIME_OMIT}, {last_detach, 0} };
255 
256  futimens(myInfoFile->getFD(), acc_mod_time);
257  }
258 
259  myInfoFile->Close(); delete myInfoFile;
260  myFile->Close(); delete myFile;
261 
262  TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB.");
263 
264  {
265  XrdSysCondVarHelper lock(&m_writeQ.condVar);
266 
267  m_writeQ.writes_between_purges += file_size;
268  }
269  {
270  int token = m_res_mon->register_file_open(file_path, time_now, false);
271  XrdPfc::Stats stats;
272  stats.m_BytesWritten = file_size;
273  stats.m_StBlocksAdded = (file_size & 0x1ff) ? (file_size >> 9) + 1 : file_size >> 9;
274  m_res_mon->register_file_update_stats(token, stats);
275  m_res_mon->register_file_close(token, time(0), stats);
276  }
277  }
278  }
279 
280  //================================================================
281  // remove_file
282  //================================================================
283 
284  else if (token == "remove_file")
285  {
286  static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
287  static const char* usage =
288  "Usage: remove_file/ [-h] /<path>\n"
289  " Removes given file from the cache unless it is currently open.\n"
290  " Useful for removal of stale files or duplicate files in a caching cluster.\n"
291  "Notes:\n"
292  " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
293 
294  token = cp.get_token();
295 
296  TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
297 
298  std::vector<char*> argv;
299  SplitParser ap(token, " ");
300  int argc = ap.fill_argv(argv);
301 
302  XrdOucArgs Spec(&m_log, err_prefix, "h",
303  "help", 1, "h",
304  (const char *) 0);
305 
306  Spec.Set(argc, &argv[0]);
307  char theOpt;
308 
309  while ((theOpt = Spec.getopt()) != (char) -1)
310  {
311  switch (theOpt)
312  {
313  case 'h': {
314  m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
315  return;
316  }
317  default: {
318  TRACE(Error, err_prefix << "Unhandled command argument.");
319  return;
320  }
321  }
322  }
323  if (Spec.getarg())
324  {
325  TRACE(Error, err_prefix << "Options must take up all the arguments.");
326  return;
327  }
328 
329  std::string f_name(cp.get_reminder_with_delim());
330 
331  TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
332 
333  int ret = UnlinkFile(f_name, true);
334 
335  TRACE(Info, err_prefix << "returned with status " << ret);
336  }
337 
338  //================================================================
339  // unknown command
340  //================================================================
341 
342  else
343  {
344  TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
345  }
346 }
347 
348 
349 //==============================================================================
350 // Example python script to use /xrdpfc_command/
351 //==============================================================================
352 /*
353 from XRootD import client
354 from XRootD.client.flags import OpenFlags
355 
356 import sys
357 import time
358 
359 #-------------------------------------------------------------------------------
360 
361 port = int( sys.argv[1] );
362 
363 g_srv = "root://localhost:%d/" % port
364 g_com = "/xrdpfc_command/create_file/"
365 g_dir = "/store/user/matevz/"
366 
367 #-------------------------------------------------------------------------------
368 
369 def xxsend(args, file) :
370 
371  url = g_srv + g_com + args + g_dir + file
372  print "Opening ", url
373 
374  with client.File() as f:
375  status, response = f.open(url, OpenFlags.READ)
376 
377  print '%r' % status
378  print '%r' % response
379 
380 #-------------------------------------------------------------------------------
381 
382 pfx1 = "AAAA"
383 pfx2 = "BBBB"
384 
385 for i in range(1, 1024 + 1):
386 
387  atime = -10000 + i
388 
389  xxsend("-s 4g -t %d -d 10" % atime,
390  "%s-%04d" % (pfx1, i))
391 
392  time.sleep(0.01)
393 
394 
395 for i in range(1, 512 + 1):
396 
397  atime = -5000 + i
398 
399  xxsend("-s 4g -t %d -d 10" % atime,
400  "%s-%04d" % (pfx2, i))
401 
402  time.sleep(0.01)
403  */
void usage()
#define XrdOssOK
Definition: XrdOss.hh:50
#define XRDOSS_mkpath
Definition: XrdOss.hh:466
const int MAX_ACCESSES
const long long ONE_GB
const long long ONE_MB
#define ERRNO_AND_ERRSTR(err_code)
Definition: XrdPfcTrace.hh:46
int stat(const char *path, struct stat *buf)
bool Create
#define TRACE(act, x)
Definition: XrdTrace.hh:63
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition: XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition: XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
char getopt()
Definition: XrdOucArgs.cc:151
char * getarg()
Definition: XrdOucArgs.cc:136
void Set(char *arglist)
Definition: XrdOucArgs.cc:236
void Put(const char *varname, const char *value)
Definition: XrdOucEnv.hh:85
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition: XrdOuca2x.cc:45
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
XrdOss * GetOss() const
Definition: XrdPfc.hh:267
virtual int Stat(const char *url, struct stat &sbuff)
Definition: XrdPfc.cc:1096
void ExecuteCommandUrl(const std::string &command_url)
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition: XrdPfc.cc:1163
Status of cached file. Can be read from and written into a binary file.
Definition: XrdPfcInfo.hh:41
static const char * s_infoExtension
Definition: XrdPfcInfo.hh:309
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
Definition: XrdPfcInfo.cc:446
bool GetLatestDetachTime(time_t &t) const
Get latest detach time.
Definition: XrdPfcInfo.cc:472
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
Definition: XrdPfcInfo.cc:268
void SetAllBitsSynced()
Mark all blocks as synced to disk.
Definition: XrdPfcInfo.cc:146
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
Definition: XrdPfcInfo.cc:163
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
Definition: XrdPfcStats.hh:35
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
Definition: XrdPfcStats.hh:43
long long m_BytesWritten
number of bytes written to disk
Definition: XrdPfcStats.hh:42
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
Definition: XrdPfc.hh:41
Contains parameters configurable from the xrootd config file.
Definition: XrdPfc.hh:64
std::string m_data_space
oss space for data files
Definition: XrdPfc.hh:88
long long m_bufferSize
prefetch buffer size, default 1MB
Definition: XrdPfc.hh:107
std::string m_meta_space
oss space for metadata files (cinfo)
Definition: XrdPfc.hh:89
std::string m_username
username passed to oss plugin
Definition: XrdPfc.hh:87
int fill_argv(std::vector< char * > &argv)