XRootD
XrdOssArcBackup.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O s s B a c k u p . h h */
4 /* */
5 /* (c) 2024 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <sys/param.h>
32 #include <sys/stat.h>
33 
34 #include "Xrd/XrdScheduler.hh"
35 
36 #include "XrdOss/XrdOss.hh"
37 
44 
45 #include "XrdOuc/XrdOucProg.hh"
46 #include "XrdOuc/XrdOucUtils.hh"
47 #include "XrdOuc/XrdOucStream.hh"
48 
49 #include "XrdSys/XrdSysError.hh"
50 #include "XrdSys/XrdSysPlatform.hh"
51 #include "XrdSys/XrdSysPthread.hh"
52 
53 /******************************************************************************/
54 /* G l o b a l O b j e c t s */
55 /******************************************************************************/
56 
57 namespace XrdOssArcGlobals
58 {
59 extern XrdOss* ossP;
60 
61 extern XrdScheduler* schedP;
62 
63 extern XrdOssArcConfig Config;
64 
66 
68 
69 extern XrdOssArcFSMon fsMon;
70 }
71 using namespace XrdOssArcGlobals;
72 
73 /******************************************************************************/
74 /* S t a t i c M e m b e r s */
75 /******************************************************************************/
76 
77 XrdSysMutex XrdOssArcBackup::dsBkpQMtx;
78 std::deque<XrdOssArcBackupTask*>
79  XrdOssArcBackup::dsBkpQ;
80 XrdSysCondVar2 XrdOssArcBackup::dsBkpQCV(XrdOssArcBackup::dsBkpQMtx);
81 int XrdOssArcBackup::numRunning = 0;
82 int XrdOssArcBackup::maxRunning = 0;
83 
84 
85 /******************************************************************************/
86 /* C l a s s X r d O s s A r c B a c k u p T a s k */
87 /******************************************************************************/
88 /******************************************************************************/
89 /* C o n s t r u c t o r & D e s t r u c t o r */
90 /******************************************************************************/
91 
93  : Owner(who), theScope(who.Scope), theDSN(dsn),
94  numBytes(0), numFiles(0), relSpace(false),
95  btSem(0) {}
96 
98 {
99 // Remove entry of the dataset name from the owner's set
100 //
101  Owner.dsBkpSetMtx.Lock();
102  Owner.dsBkpSet.erase(theDSN);
103  Owner.dsBkpSetMtx.UnLock();
104 
105 // Release the space that this request acquired
106 //
108 
109 // Since theDSN was shared with the owner's set we can free the storage
110 //
111  free(theDSN);
112 }
113 
114 /******************************************************************************/
115 /* X r d O s s A r c B a c k u p T a s k : : B k p X e q */
116 /******************************************************************************/
117 
119 {
120 // The first step it to setup the staging area to create an archive. This is
121 // done using a scipt. The script returns the total number of files and bytes
122 // that will need to be backed up.
123 
124 // Construct the staging path for archive creation management
125 //
126  TraceInfo("BkpTask", 0);
127  XrdOucStream cmdSup;
128  char dsnDir[MAXPATHLEN], manPFN[MAXPATHLEN];
129  int n, rc;
130 
131 // We create an instance of the stop monitor. If a stop is in effect, it
132 // will not complete construction until the stop file is removed.
133 //
134  XrdOssArcStopMon stopMon(Config.stopMon);
135 
136 // Compose the arena path. Note that our arena path already ends with a slash
137 //
138  n = snprintf(dsnDir, sizeof(dsnDir), "%s%s/", Owner.Arena(),
140  if (n >= (int)sizeof(dsnDir))
141  {Elog.Emsg("Backup",ENAMETOOLONG,"create arena path for dataset",theDSN);
142  return false;
143  }
144 
145 // We now must create the directory path to the arena
146 //
147  if ((rc = XrdOucUtils::makePath(dsnDir, S_IRWXU|S_IRGRP|S_IXGRP)))
148  {Elog.Emsg("Backup", rc, "create dataset backup arena", dsnDir);
149  return false;
150  }
151 
152 // Generate name of the manifest file. This is needed by the preparc and
153 // postarc utilities plus we want to add it to the zip archive as provenance.
154 //
155  n = snprintf(manPFN, sizeof(manPFN), "%sManifest", dsnDir);
156  if (n >= (int)sizeof(manPFN))
157  {Elog.Emsg("Backup",ENAMETOOLONG,"create bkp manifest for dataset",
158  theDSN);
159  return false;
160  }
161 
162 // Construct the argument list
163 //
164  const char* supArgv[] = {"setup", Config.srcRSE, theScope, theDSN, dsnDir,
165  Config.srcData, manPFN};
166  int supArgc = sizeof(supArgv)/sizeof(char*);
167 
168 // Do some tracing
169 //
170  DEBUG("Running "<<Config.BkpUtilName<<' '<<supArgv[0]<<' '<<supArgv[1]<<
171  ' '<<supArgv[2]<<' '<<supArgv[3]<<' '<<supArgv[4]<<
172  ' '<<supArgv[5]<<' '<<supArgv[6]);
173 
174 // Run the setup script which prepares the dataset for archiving. It should
175 // output a single line: <files> <bytes>
176 //
177  if (!(rc = Config.BkpUtilProg->Run(&cmdSup, supArgv, supArgc)))
178  {char *lp, *retStr = 0;
179  size_t vb;
180  int vf;
181  bool isOK = false;
182  while((lp = cmdSup.GetLine())) if (!retStr) retStr = strdup(lp);
183 
184  if (retStr)
185  {n = sscanf(retStr, "%d %zu", &vf, &vb);
186  if (n == 2) {numFiles = vf, numBytes = vb; isOK = true;}
187  else {char etxt[1024];
188  snprintf(etxt, sizeof(etxt),
189  "%s setup returned bad output '",
190  Config.BkpUtilName);
191  Elog.Emsg("Backup", etxt, retStr,"'");
192  }
193  free(retStr);
194  } else {
195  Elog.Emsg("Backup",Config.BkpUtilName,"setup returned no output");
196  return false;
197  }
198 
199  Config.BkpUtilProg->RunDone(cmdSup); // This may kill the process
200  if (!isOK) return false;
201  } else {
202  Elog.Emsg("Backup",rc, "run setup via", Config.BkpUtilName);
203  return false;
204  }
205 
206 // We can only proceed if there is enough space to hold the backup. This only
207 // applies to local backups. Remote backups have unlimited space.
208 //
209  if (Config.bkpLocal)
210  while(!fsMon.Permit(this))
211  {char buff[1024];
212  snprintf(buff,sizeof(buff),"Insufficient free space; defering "
213  "archiving of %s:%s", theScope, theDSN);
214  Elog.Emsg("BkpXeq", buff);
215  stopMon.Deactivate();
216  btSem.Wait();
217  stopMon.Activate();
218  snprintf(buff,sizeof(buff),"Retrying to archive %s:%s",
219  theScope, theDSN);
220  }
221 
222 // bBefore we create the archive, check if we must run a pre-archive utility.
223 // These utilities usually pre-fetch the files that we will be archiving.
224 //
225  if (Config.PrepArcProg)
226  {XrdOucStream prpSup;
227  const char* prpArgv[] = {"prepare", theScope, manPFN};
228  int prpArgc = sizeof(prpArgv)/sizeof(char*);
229 
230  DEBUG("Running "<<Config.PrepArcName<<' '<<prpArgv[0]<<' '<<prpArgv[1]
231  <<' '<<prpArgv[2]);
232 
233  if (!(rc = Config.PrepArcProg->Run(&prpSup, prpArgv, prpArgc)))
234  {char* lp;
235  while((lp = prpSup.GetLine())) {}
236  Config.PrepArcProg->RunDone(prpSup); // This may kill the process
237  } else {
238  Elog.Emsg("Backup", rc, "run preparc", Config.PrepArcName);
239  return false;
240  }
241  }
242 
243 // Run the archive script
244 //
245  if (!Owner.Archive(theDSN, dsnDir)) return false;
246 
247 // Run post-archive script if we need to
248 //
249  if (Config.PostArcProg)
250  {XrdOucStream pstSup;
251  const char* pstArgv[] = {"dispose", theScope, manPFN};
252  int pstArgc = sizeof(pstArgv)/sizeof(char*);
253 
254  DEBUG("Running "<<Config.PrepArcName<<' '<<pstArgv[0]<<' '<<pstArgv[1]
255  <<' '<<pstArgv[2]);
256 
257  if (!(rc = Config.PostArcProg->Run(&pstSup, pstArgv, pstArgc)))
258  {char* lp;
259  while((lp = pstSup.GetLine())) {}
260  Config.PostArcProg->RunDone(pstSup); // This may kill the process
261  } else {
262  Elog.Emsg("Backup", rc, "run postarc", Config.PostArcName);
263  return false;
264  }
265  }
266 
267 // We can now safely mark this dataset as having been backed up
268 //
269  XrdOucStream cmdFin;
270  const char* finArgv[] = {"finish", Config.srcRSE, theScope, theDSN, dsnDir,
271  Config.metaBKP, Config.doneBKP};
272  int finArgc = sizeof(finArgv)/sizeof(char*);
273 
274 // If the debug setting indicates we need to save the setup, disallow delete
275 //
276  if (TRACING(TRACE_Save)) finArgv[4] = "";
277 
278 // Do some tracing
279 //
280  DEBUG("Running "<<Config.BkpUtilName<<' '<<finArgv[0]<<' '<<finArgv[1]<<
281  ' '<<finArgv[2]<<' '<<finArgv[3]<<' '<<
282  (*finArgv[4] ? finArgv[4] : "n/d")<<
283  ' '<<finArgv[5]<<' '<<finArgv[6]);
284 
285 // Run the setup script which sets the dataset backup metadata to completed
286 //
287  if (!(rc = Config.BkpUtilProg->Run(&cmdFin, finArgv, finArgc)))
288  {while((cmdFin.GetLine())) {}
289  Config.BkpUtilProg->RunDone(cmdFin); // This may kill the process
290  } else {
291  Elog.Emsg("Backup",rc, "run finish via", Config.BkpUtilName);
292  return false;
293  }
294 
295 // All done
296 //
297  return true;
298 }
299 
300 /******************************************************************************/
301 /* c l a s s X r d O s s A r c B a c k u p */
302 /******************************************************************************/
303 /******************************************************************************/
304 /* X r d O s s A r c B a c k u p : : B k p W o r k e r : : D o I t */
305 /******************************************************************************/
306 
308 {
309 // Get the initial lock
310 //
311  dsBkpQMtx.Lock();
312 
313 // Get a backup task and execute it
314 //
315 do{if (!dsBkpQ.empty())
316  {XrdOssArcBackupTask* bTask = dsBkpQ.front();
317  dsBkpQ.pop_front();
318  dsBkpQMtx.UnLock();
319  bool isOK = bTask->BkpXeq();
320  dsBkpQMtx.Lock();
321 
322  char buff[1024];
323  snprintf(buff,sizeof(buff),"%s:%s",bTask->theScope,bTask->theDSN);
324  if (isOK) Elog.Emsg("BkpWorker", buff, "backed up!");
325  else Elog.Emsg("BkpWorker", buff, "backup failed; will retry later");
326 
327  delete bTask; // We may implement bTask->retries at some point???
328  } else {
329  numRunning--;
330  dsBkpQCV.Wait(); // This unlocks dsBkpQMtx
331  }
332  } while(true);
333 }
334 
335 /******************************************************************************/
336 /* c l a s s X r d O s s A r c B a c k u p */
337 /******************************************************************************/
338 /******************************************************************************/
339 /* C o n s t r u c t o r */
340 /******************************************************************************/
341 
342 XrdOssArcBackup::XrdOssArcBackup(const char *scp, bool& isOK)
343  : XrdJob("Backup"), Scope(scp)
344 {
345  char abuff[1024];
346  int rc;
347 
348 // Construct the arena where our backups will be staged
349 //
350  snprintf(abuff, sizeof(abuff), "%s%s/",Config.dsetRepoPFN,scp);
351  rc = XrdOucUtils::makePath(abuff, S_IRWXU|S_IRGRP|S_IXGRP);
352  if (rc)
353  {Elog.Emsg("Backup", rc, "create arena", abuff);
354  isOK = false;
355  } else {
356  myArena = strdup(abuff);
357  isOK = true;
358  }
359 }
360 
361 /******************************************************************************/
362 /* Private: A d d 2 B k p */
363 /******************************************************************************/
364 
365 bool XrdOssArcBackup::Add2Bkp(const char* dsn)
366 {
367  XrdSysMutexHelper mHelp(dsBkpSetMtx);
368  char* theDSN = strdup(dsn);
369 
370 // Add this dataset to our backup set and return whether or not it is new.
371 // If the dataset is new then place a task for it on the global work queue
372 // and if it can be immediately serviced, signal a waiting thread to do so.
373 //
374  auto rslt = dsBkpSet.insert(theDSN);
375  if (!rslt.second) free(theDSN);
376  else {XrdOssArcBackupTask* theTask = new XrdOssArcBackupTask(*this,theDSN);
377  dsBkpQMtx.Lock();
378  dsBkpQ.push_back(theTask);
379  if (numRunning < maxRunning) dsBkpQCV.Signal();
380  dsBkpQMtx.UnLock();
381  }
382  return rslt.second;
383 }
384 
385 /******************************************************************************/
386 /* A r c h i v e */
387 /******************************************************************************/
388 
389 bool XrdOssArcBackup::Archive(const char* dsName, const char* dsDir)
390 {
391  TraceInfo("Archive",0);
392  XrdOucStream cmdOut;
393  char tapPath[MAXPATHLEN];
394  int n, rc;
395 
396 // All we need to do is launch the archive program to complete the steps:
397 // 1. Create the zip file of all files in the dataset.
398 // 2. Move the zip file to the <tape_dir>.
399 // 3. Do a recursvive delete starting at and including <src_dir>.
400 // 4. Delete file <trg_dir>/<zipfn>.
401 
402 // The calling parameters are:
403 // <src_dir> <tape_dir> <arcfn> [{<arcpy> | ""} [<arcdsp> <manifest>]]
404 //
405 // <src_dir>: The directory containing all of the files in the dataset.
406 // This is apssed as a PFN via dsDir parameter.
407 // <tape_dir>: The directory to hold the zip archive destined to tape.
408 // We need to build this using the dsName parameter.
409 // <arcfn>: The actual filename to be used for the archive. By convention
410 // the archive is created as '<src_dir>/../<arcfn>'.
411 //[<arcpy>] Optional parameter to drive remote mode backups.
412 // 0 1 2 3
413  const char* argV[] = {dsDir, tapPath, Config.arFName, Config.ArchiverSave};
414 // 4 5
415  if (Config.bkpLocal)
416  {n = snprintf(tapPath, sizeof(tapPath), "%s/%s/%s",
417  Config.tapePath, Scope, dsName);
418  argV[3] = "";
419  } else {
420  n = snprintf(tapPath, sizeof(tapPath), "%s/%s", Scope, dsName);
421  argV[3] = Config.ArchiverSave;
422  }
423 
424 
425 
426 // Verify we didn't truncate the path
427 //
428  if (n >= (int)sizeof(tapPath))
429  {rc = -ENAMETOOLONG;
430  snprintf(tapPath, sizeof(tapPath), "%s:%s", Scope, dsName);
431  Elog.Emsg("Archive", rc, "generate tape path for dataset", tapPath);
432  Elog.Emsg("Archive","Dataset",dsName,"needs manual intervention!!!");
433  return false;
434  }
435 
436 // Do some tracing
437 //
438  DEBUG("Running "<<Config.ArchiverName<<' '<<argV[0]<<' '
439  <<argV[1]<<' '<<argV[2]<<' '<<argV[3]);
440 
441 // Run the archive script.
442 //
443  n = sizeof(argV)/sizeof(char*);
444  if (!(rc = Config.ArchiverProg->Run(&cmdOut, argV, n)))
445  {char* lp;
446  while((lp = cmdOut.GetLine())) {} // Throw away stdout
447  rc = Config.ArchiverProg->RunDone(cmdOut);
448  }
449 
450 // Check for any failures
451 //
452  if (rc)
453  {char rcVal[32];
454  snprintf(rcVal, sizeof(rcVal),"%d",rc);
455  Elog.Emsg("Archive", "Archive script failed with rc=", rcVal);
456  Elog.Emsg("Archive", "Dataset", dsName, "needs manual intervention!!!");
457  return false;
458  }
459 
460  return true;
461 }
462 
463 /******************************************************************************/
464 /* D o I t */
465 /******************************************************************************/
466 
468 {
469  // Do a backup round and then reschedule for the next one
470  //
471  GetManifest();
472 
473  schedP->Schedule(this, time(0)+Config.bkpPoll);
474 }
475 
476 /******************************************************************************/
477 /* G e t M a n i f e s t */
478 /******************************************************************************/
479 
480 int XrdOssArcBackup::GetManifest()
481 {
482  static const char* manEOL = "%%%";
483  static const char* lsbArgv[] = {"list", Config.metaBKP, Config.needBKP,
484  Scope, manEOL};
485  static int lsbArgc = sizeof(lsbArgv)/sizeof(char*);
486  static XrdSysMutex manMutex;
487 
488  TraceInfo("GetManifest",0);
489  XrdOucStream cmdOut;
490  int rc, dsCnt, dsNew = 0;
491  bool isEOF = false;
492 
493 // Here we launch the BkpUtils program to tell us the list of datasets that
494 // need to be backed up by this RSE. The BkpUtils program writes newline
495 // deparated dataset did's to stdout. Error messages are written to stderr.
496 // The final line conatins '%%%' indicating the actual end of output. If we
497 // don't get that we warn that the list is incomplete but use what we have.
498 
499 // The calling parameteris are:
500 // lsb <scope> <metavar> <metaval> <eolmarker>
501 
502 // Do some tracing
503 //
504  DEBUG("Running "<<Config.BkpUtilName<<' '<<lsbArgv[0]<<' '<<lsbArgv[1]<<
505  ' '<<lsbArgv[2]<<' '<<lsbArgv[3]<<' '<<lsbArgv[4]);
506 
507 // To avoid placing a huge load on the dataset we will be querying, only one
508 // manifest request can run at a time.
509 //
510  manMutex.Lock();
511 
512 // Run the manifest script.
513 //
514  if (!(rc = Config.BkpUtilProg->Run(&cmdOut, lsbArgv, lsbArgc)))
515  {char* lp;
516  while((lp = cmdOut.GetLine()))
517  {if (*lp == *manEOL && !strcmp(lp, manEOL))
518  {isEOF = true;
519  break;
520  }
521  if (Add2Bkp(lp)) dsNew++;
522  }
523  Config.BkpUtilProg->RunDone(cmdOut); // This may kill the process
524  }
525 
526 // We are done running this program
527 //
528  manMutex.UnLock();
529 
530 // Check if we really got an eof
531 //
532  if (!isEOF)
533  {char buff[16];
534  snprintf(buff, sizeof(buff),"%d",rc);
535  Elog.Emsg("GetManifest","Premature EOF when reading manifest; rc=",buff);
536  }
537 
538 // Get the number of entries in the backup set
539 //
540  dsBkpSetMtx.Lock();
541  dsCnt = dsBkpSet.size();
542  dsBkpSetMtx.UnLock();
543 
544 // Do some tracing here
545 //
546  DEBUG("Scope "<<Scope<<" has "<<dsCnt
547  <<" dataset(s) needing backup of which "<<dsNew<<" are new");
548 
549 // Return the number of datasets in the backup list
550 //
551  return dsCnt;
552 }
553 
554 /******************************************************************************/
555 /* S t a r t W o r k e r s */
556 /******************************************************************************/
557 
559 {
560  TraceInfo("StartWorkers",0);
561  numRunning = maxRunning = maxw;
562 
563 // Start all of the workers, they will immediately go idle.
564 // This is a one time call from config.
565 //
566  for (int i = 0; i < maxw; i++)
567  {XrdJob* bJob = new BkpWorker();
568  schedP->Schedule(bJob);
569  }
570 
571 // Do some tracing
572 //
573  DEBUG("Started "<<maxw<<" backup workers.");
574 }
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
Definition: XrdAccTest.cc:262
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define TraceInfo(x, y)
#define TRACE_Save
#define TRACING(x)
Definition: XrdTrace.hh:70
Definition: XrdJob.hh:43
XrdOssArcBackupTask(XrdOssArcBackup &who, char *dsn)
XrdOssArcBackup & Owner
XrdSysSemaphore btSem
void DoIt() override
XrdOssArcBackup(const char *scp, bool &isOK)
const char * Arena()
bool Archive(const char *dsName, const char *dsDir)
friend class XrdOssArcBackupTask
static void StartWorkers(int maxw)
static std::string DSN2Dir(const char *dsn)
void Release(size_t bytes)
bool Permit(XrdOssArcBackupTask *btP)
char * GetLine()
static int makePath(char *path, mode_t mode, bool reset=false)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:116
XrdCmsConfig Config
XrdScheduler * schedP
XrdOssArcConfig Config
Definition: XrdOssArc.cc:68
XrdOss * ossP
Definition: XrdOssArc.cc:64
XrdSysTrace ArcTrace("OssArc")
XrdScheduler * schedP
Definition: XrdOssArc.cc:66
XrdOssArcFSMon fsMon
XrdSysError Elog(0, "OssArc_")