XRootD
XrdOssArcStage.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d O s s A r c S t a g e . c c */
4 /* */
5 /* (c) 2024 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <queue>
32 #include <string>
33 #include <set>
34 
35 #include <errno.h>
36 #include <fcntl.h>
37 #include <stdlib.h>
38 
39 #include "Xrd/XrdScheduler.hh"
43 #include "XrdOuc/XrdOucEnv.hh"
44 #include "XrdOuc/XrdOucProg.hh"
45 #include "XrdSys/XrdSysFD.hh"
46 #include "XrdSys/XrdSysPlatform.hh"
47 #include "XrdSys/XrdSysPthread.hh"
48 
49 /******************************************************************************/
50 /* S t a t i c O b j e c t s */
51 /******************************************************************************/
52 
53 namespace XrdOssArcGlobals
54 {
55 extern XrdScheduler* schedP;
56 
57 extern XrdSysError Elog;
58 
59 extern XrdOssArcConfig Config;
60 
63 
64 struct ActInfo
65  {int rc;
66  const char* path;
67  char* pMem;
68 
69  ActInfo(const char* p, bool cpy=false) : rc(0)
70  {if (cpy) path = pMem = strdup(p);
71  else {path = p; pMem = 0;}
72  }
73 
74  ~ActInfo() {if (pMem) free(pMem);}
75  };
76 
77 bool cmpLess(const ActInfo* a, const ActInfo* b)
78  {return strcmp(a->path, b->path) < 0;}
79 
80 std::set<ActInfo*, decltype(&cmpLess)> Active(&cmpLess);
81 
82 std::queue<const char*> Pending;
83 }
84 using namespace XrdOssArcGlobals;
85 
86 /******************************************************************************/
87 /* D o I t */
88 /******************************************************************************/
89 
91 {
92  TraceInfo("Bring_Online",0);
93  const char* nxtPath;
94  time_t seTime;
95  int fd;
96 
97 // The arcvPath is the path of the file in the locally mounted tape buffer.
98 // Simply open it to force it to be staged online.
99 //
100 do{DEBUG("Staging "<<arcvPath);
101  seTime = time(0);
102  if ((fd = XrdSysFD_Open(arcvPath, O_RDONLY)) < 0)
103  StageError(errno, "open/stage file", arcvPath);
104  else {close(fd);
105  seTime = time(0) - seTime;
106  DEBUG(arcvPath<<" staged in "<<seTime<<" second(s)");
107  }
108 
109 // Check if there is something pending that we can do now
110 //
111  schedMtx.Lock();
112  if (Pending.empty()) break;
113  nxtPath = Pending.front();
114  Pending.pop();
115  schedMtx.UnLock();
116 
117 // Reset this object to handle the path
118 //
119  Reset(nxtPath);
120 
121  } while(true);
122 
123 // Do final reset as we finished processing
124 //
125  Reset(0);
126 
127 // We are done, so delete this object and return
128 //
129  int n = Config.maxStage++; // schedMtx is still held
130  schedMtx.UnLock();
131  DEBUG("Staging queue empty; MaxStage="<<n+1);
132  delete this;
133 }
134 
135 /******************************************************************************/
136 /* Private: i s O n l i n e */
137 /******************************************************************************/
138 
140 {
141  TraceInfo("isOnline",0);
142  int rc, finrc;
143 
144  DEBUG("Running "<<Config.MssComName<<" online "<<path);
145  rc = Config.MssComProg->Run("online", path);
146 
147 // Adjust return code. Note that XrdOucProg return -status!
148 //
149  if (rc < -1 || rc > 1) finrc = -1;
150  else finrc = -rc;
151  DEBUG("MssComCmd returned "<<rc<<" -> "<<finrc);
152 
153  return static_cast<MssRC>(finrc);
154 }
155 
156 /******************************************************************************/
157 /* Private: R e s e t */
158 /******************************************************************************/
159 
160 void XrdOssArcStage::Reset(const char* path)
161 {
162 
163 // Remove ourselves from the active set if we still have a path
164 //
165  if (arcvPath)
166  {ActInfo aInfo(arcvPath);
167  stageMtx.Lock();
168  auto it = Active.find(&aInfo);
169  if (it != Active.end())
170  {ActInfo* aiP = *it;
171  Active.erase(it);
172  delete aiP;
173  }
174  stageMtx.UnLock();
175  }
176 
177 // Replace out path with the new path
178 //
179  arcvPath = path;
180 }
181 
182 /******************************************************************************/
183 /* S t a g e */
184 /******************************************************************************/
185 
186 int XrdOssArcStage::Stage(const char *path, const char* mssPath)
187 {
188  TraceInfo("Stage",0);
189  ActInfo aInfo(path);
190 
191 // Check if this is being staged
192 //
193  stageMtx.Lock();
194  auto it = Active.find(&aInfo);
195  if (it != Active.end())
196  {int rc;
197  if ((*it)->rc == 0) rc = EINPROGRESS;
198  else {rc = (*it)->rc;
199  ActInfo* aiP = *it;
200  Active.erase(it);
201  delete aiP;
202  }
203  stageMtx.UnLock();
204  return rc;
205  } else stageMtx.UnLock();
206 
207 // Make sure the path exists and is actually online
208 //
209  MssRC mssRC = isOnline(mssPath);
210  switch(mssRC)
211  {case isFalse: break;
212  case isTrue: return 0; break;
213  default: return EINVAL; break;
214  }
215 
216 // Create a an action information object. This will copy the path and we
217 // can use the copy in other places as the pointer i
218 //
219  ActInfo* stageInfo = new ActInfo(path, true);
220 
221 // Add the path to the staging set. Another thread may have beat us to it.
222 //
223  stageMtx.Lock();
224  auto iResult = Active.insert(stageInfo);
225  stageMtx.UnLock();
226  if (!iResult.second)
227  {delete stageInfo;
228  return EINPROGRESS;
229  }
230 
231 // Schedule this staging request if we are allowed to do so
232 //
233  int smx;
234  schedMtx.Lock();
235  if (Config.maxStage)
236  {smx = Config.maxStage--;
237  schedMtx.UnLock();
238  XrdOssArcStage *asP = new XrdOssArcStage(stageInfo->path);
239  schedP->Schedule((XrdJob*)asP);
240  return EINPROGRESS;
241  } else smx = 0;
242 
243 // Too many things being staged, so queue this request
244 //
245  Pending.push(stageInfo->path);
246  schedMtx.UnLock();
247 
248 // Do some debugging
249 //
250  DEBUG("MaxStage="<<smx<<" staging '"<<path<<(smx?"' scheduled":"' queued"));
251 
252 // All done
253 //
254  return EINPROGRESS;
255 }
256 
257 /******************************************************************************/
258 /* Private: S t a g e E r r o r */
259 /******************************************************************************/
260 
261 void XrdOssArcStage::StageError(int rc, const char* what, const char* path)
262 {
263  ActInfo aInfo(arcvPath);
264 
265 // Flag this request as failed
266 //
267  stageMtx.Lock();
268  auto it = Active.find(&aInfo);
269  stageMtx.UnLock();
270  if (it != Active.end()) (*it)->rc = rc;
271 
272 // Issue error message
273 //
274  Elog.Emsg("Stage", rc, what, path);
275 
276 // We now must clear our arcvPath to prevent removal from the active set
277 //
278  arcvPath = 0;
279 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
#define TraceInfo(x, y)
#define close(a)
Definition: XrdPosix.hh:48
Definition: XrdJob.hh:43
XrdOucProg * MssComProg
const char * MssComName
virtual void DoIt() override
static MssRC isOnline(const char *path)
static int Stage(const char *path, const char *mssPath)
int Run(XrdOucStream *Sp, const char *argV[], int argc=0, const char *envV[]=0) const
Definition: XrdOucProg.cc:108
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:116
XrdOssArcConfig Config
Definition: XrdOssArc.cc:68
XrdSysMutex stageMtx
XrdScheduler * schedP
Definition: XrdOssArc.cc:66
std::set< ActInfo *, decltype(&cmpLess)> Active & cmpLess
std::queue< const char * > Pending
XrdSysError Elog(0, "OssArc_")
XrdSysMutex schedMtx
ActInfo(const char *p, bool cpy=false)