XRootD
XrdClFileStateHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClStatus.hh"
29 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClConstants.hh"
35 #include "XrdCl/XrdClMonitor.hh"
36 #include "XrdCl/XrdClFileTimer.hh"
38 #include "XrdCl/XrdClJobManager.hh"
40 #include "XrdCl/XrdClAnyObject.hh"
41 #include "XrdCl/XrdClUtils.hh"
42 
43 #ifdef WITH_XRDEC
44 #include "XrdCl/XrdClEcHandler.hh"
45 #endif
46 
47 #include "XrdOuc/XrdOucCRC.hh"
49 #include "XrdOuc/XrdOucUtils.hh"
50 
52 #include "XrdSys/XrdSysPageSize.hh"
53 #include "XrdSys/XrdSysPthread.hh"
54 
55 #include <sstream>
56 #include <memory>
57 #include <numeric>
58 #include <sys/time.h>
59 #include <uuid/uuid.h>
60 #include <mutex>
61 
62 namespace
63 {
64  //----------------------------------------------------------------------------
65  // Helper callback for handling PgRead responses
66  //----------------------------------------------------------------------------
67  class PgReadHandler : public XrdCl::ResponseHandler
68  {
69  friend class PgReadRetryHandler;
70 
71  public:
72 
73  //------------------------------------------------------------------------
74  // Constructor
75  //------------------------------------------------------------------------
76  PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77  XrdCl::ResponseHandler *userHandler,
78  uint64_t orgOffset ) :
79  stateHandler( stateHandler ),
80  userHandler( userHandler ),
81  orgOffset( orgOffset ),
82  maincall( true ),
83  retrycnt( 0 ),
84  nbrepair( 0 )
85  {
86  }
87 
88  //------------------------------------------------------------------------
89  // Handle the response
90  //------------------------------------------------------------------------
92  XrdCl::AnyObject *response,
93  XrdCl::HostList *hostList )
94  {
95  using namespace XrdCl;
96 
97  std::unique_lock<std::mutex> lck( mtx );
98 
99  if( !maincall )
100  {
101  //--------------------------------------------------------------------
102  // We are serving PgRead retry request
103  //--------------------------------------------------------------------
104  --retrycnt;
105  if( !status->IsOK() )
106  st.reset( status );
107  else
108  {
109  delete status; // by convention other args are null (see PgReadRetryHandler)
110  ++nbrepair; // update number of repaired pages
111  }
112 
113  if( retrycnt == 0 )
114  {
115  //------------------------------------------------------------------
116  // All retries came back
117  //------------------------------------------------------------------
118  if( st->IsOK() )
119  {
120  PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
121  pginf.SetNbRepair( nbrepair );
122  userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
123  }
124  else
125  userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
126  lck.unlock();
127  delete this;
128  }
129 
130  return;
131  }
132 
133  //----------------------------------------------------------------------
134  // We are serving main PgRead request
135  //----------------------------------------------------------------------
136  if( !status->IsOK() )
137  {
138  //--------------------------------------------------------------------
139  // The main PgRead request has failed
140  //--------------------------------------------------------------------
141  userHandler->HandleResponseWithHosts( status, response, hostList );
142  lck.unlock();
143  delete this;
144  return;
145  }
146 
147  maincall = false;
148 
149  //----------------------------------------------------------------------
150  // Do the integrity check
151  //----------------------------------------------------------------------
152  PageInfo *pginf = 0;
153  response->Get( pginf );
154 
155  uint64_t pgoff = pginf->GetOffset();
156  uint32_t bytesRead = pginf->GetLength();
157  std::vector<uint32_t> &cksums = pginf->GetCksums();
158  char *buffer = reinterpret_cast<char*>( pginf->GetBuffer() );
159  size_t nbpages = XrdOucPgrwUtils::csNum( pgoff, bytesRead );
160  uint32_t pgsize = XrdSys::PageSize - pgoff % XrdSys::PageSize;
161  if( pgsize > bytesRead ) pgsize = bytesRead;
162 
163  for( size_t pgnb = 0; pgnb < nbpages; ++pgnb )
164  {
165  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
166  if( crcval != cksums[pgnb] )
167  {
168  Log *log = DefaultEnv::GetLog();
169  log->Info( FileMsg, "[%p@%s] Received corrupted page, will retry page #%zu.",
170  (void*)this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
171 
172  XRootDStatus st = XrdCl::FileStateHandler::PgReadRetry( stateHandler, pgoff, pgsize, pgnb, buffer, this, 0 );
173  if( !st.IsOK())
174  {
175  *status = st; // the reason for this failure
176  break;
177  }
178  ++retrycnt; // update the retry counter
179  }
180 
181  bytesRead -= pgsize;
182  buffer += pgsize;
183  pgoff += pgsize;
184  pgsize = XrdSys::PageSize;
185  if( pgsize > bytesRead ) pgsize = bytesRead;
186  }
187 
188 
189  if( retrycnt == 0 )
190  {
191  //--------------------------------------------------------------------
192  // All went well!
193  //--------------------------------------------------------------------
194  userHandler->HandleResponseWithHosts( status, response, hostList );
195  lck.unlock();
196  delete this;
197  return;
198  }
199 
200  //----------------------------------------------------------------------
201  // We have to wait for retries!
202  //----------------------------------------------------------------------
203  resp.reset( response );
204  hosts.reset( hostList );
205  st.reset( status );
206  }
207 
208  void UpdateCksum( size_t pgnb, uint32_t crcval )
209  {
210  if( resp )
211  {
212  XrdCl::PageInfo *pginf = 0;
213  resp->Get( pginf );
214  pginf->GetCksums()[pgnb] = crcval;
215  }
216  }
217 
218  private:
219 
220  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221  XrdCl::ResponseHandler *userHandler;
222  uint64_t orgOffset;
223 
224  std::unique_ptr<XrdCl::AnyObject> resp;
225  std::unique_ptr<XrdCl::HostList> hosts;
226  std::unique_ptr<XrdCl::XRootDStatus> st;
227 
228  std::mutex mtx;
229  bool maincall;
230  size_t retrycnt;
231  size_t nbrepair;
232 
233  };
234 
235  //----------------------------------------------------------------------------
236  // Helper callback for handling PgRead retries
237  //----------------------------------------------------------------------------
238  class PgReadRetryHandler : public XrdCl::ResponseHandler
239  {
240  public:
241 
242  PgReadRetryHandler( PgReadHandler *pgReadHandler, size_t pgnb ) : pgReadHandler( pgReadHandler ),
243  pgnb( pgnb )
244  {
245 
246  }
247 
248  //------------------------------------------------------------------------
249  // Handle the response
250  //------------------------------------------------------------------------
251  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252  XrdCl::AnyObject *response,
253  XrdCl::HostList *hostList )
254  {
255  using namespace XrdCl;
256 
257  if( !status->IsOK() )
258  {
259  Log *log = DefaultEnv::GetLog();
260  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
261  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262  pgReadHandler->HandleResponseWithHosts( status, response, hostList );
263  delete this;
264  return;
265  }
266 
267  XrdCl::PageInfo *pginf = 0;
268  response->Get( pginf );
269  if( pginf->GetLength() > (uint32_t)XrdSys::PageSize || pginf->GetCksums().size() != 1 )
270  {
271  Log *log = DefaultEnv::GetLog();
272  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
273  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
274  // we retry a page at a time so the length cannot exceed 4KB
275  DeleteArgs( status, response, hostList );
276  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
277  delete this;
278  return;
279  }
280 
281  uint32_t crcval = XrdOucCRC::Calc32C( pginf->GetBuffer(), pginf->GetLength() );
282  if( crcval != pginf->GetCksums().front() )
283  {
284  Log *log = DefaultEnv::GetLog();
285  log->Info( FileMsg, "[%p@%s] Failed to recover page #%zu.",
286  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287  DeleteArgs( status, response, hostList );
288  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
289  delete this;
290  return;
291  }
292 
293  Log *log = DefaultEnv::GetLog();
294  log->Info( FileMsg, "[%p@%s] Successfully recovered page #%zu.",
295  (void*)this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
296 
297  DeleteArgs( 0, response, hostList );
298  pgReadHandler->UpdateCksum( pgnb, crcval );
299  pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
300  delete this;
301  }
302 
303  private:
304 
305  inline void DeleteArgs( XrdCl::XRootDStatus *status,
306  XrdCl::AnyObject *response,
307  XrdCl::HostList *hostList )
308  {
309  delete status;
310  delete response;
311  delete hostList;
312  }
313 
314  PgReadHandler *pgReadHandler;
315  size_t pgnb;
316  };
317 
318  //----------------------------------------------------------------------------
319  // Handle PgRead substitution with ordinary Read
320  //----------------------------------------------------------------------------
321  class PgReadSubstitutionHandler : public XrdCl::ResponseHandler
322  {
323  public:
324 
325  //------------------------------------------------------------------------
326  // Constructor
327  //------------------------------------------------------------------------
328  PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
329  XrdCl::ResponseHandler *userHandler ) :
330  stateHandler( stateHandler ),
331  userHandler( userHandler )
332  {
333  }
334 
335  //------------------------------------------------------------------------
336  // Handle the response
337  //------------------------------------------------------------------------
338  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
339  XrdCl::AnyObject *rdresp,
340  XrdCl::HostList *hostList )
341  {
342  if( !status->IsOK() )
343  {
344  userHandler->HandleResponseWithHosts( status, rdresp, hostList );
345  delete this;
346  return;
347  }
348 
349  using namespace XrdCl;
350 
351  ChunkInfo *chunk = 0;
352  rdresp->Get( chunk );
353 
354  std::vector<uint32_t> cksums;
355  if( stateHandler->pIsChannelEncrypted )
356  {
357  size_t nbpages = chunk->length / XrdSys::PageSize;
358  if( chunk->length % XrdSys::PageSize )
359  ++nbpages;
360  cksums.reserve( nbpages );
361 
362  size_t size = chunk->length;
363  char *buffer = reinterpret_cast<char*>( chunk->buffer );
364 
365  for( size_t pg = 0; pg < nbpages; ++pg )
366  {
367  size_t pgsize = XrdSys::PageSize;
368  if( pgsize > size ) pgsize = size;
369  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
370  cksums.push_back( crcval );
371  buffer += pgsize;
372  size -= pgsize;
373  }
374  }
375 
376  PageInfo *pages = new PageInfo( chunk->offset, chunk->length,
377  chunk->buffer, std::move( cksums ) );
378  delete rdresp;
379  AnyObject *response = new AnyObject();
380  response->Set( pages );
381  userHandler->HandleResponseWithHosts( status, response, hostList );
382 
383  delete this;
384  }
385 
386  private:
387 
388  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
389  XrdCl::ResponseHandler *userHandler;
390  };
391 
392  //----------------------------------------------------------------------------
393  // Object that does things to the FileStateHandler when kXR_open returns
394  // and then calls the user handler
395  //----------------------------------------------------------------------------
396  class OpenHandler: public XrdCl::ResponseHandler
397  {
398  public:
399  //------------------------------------------------------------------------
400  // Constructor
401  //------------------------------------------------------------------------
402  OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
403  XrdCl::ResponseHandler *userHandler ):
404  pStateHandler( stateHandler ),
405  pUserHandler( userHandler )
406  {
407  }
408 
409  //------------------------------------------------------------------------
410  // Handle the response
411  //------------------------------------------------------------------------
412  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
413  XrdCl::AnyObject *response,
414  XrdCl::HostList *hostList )
415  {
416  using namespace XrdCl;
417 
418  //----------------------------------------------------------------------
419  // Extract the statistics info
420  //----------------------------------------------------------------------
421  OpenInfo *openInfo = 0;
422  if( status->IsOK() )
423  response->Get( openInfo );
424 #ifdef WITH_XRDEC
425  else
426  //--------------------------------------------------------------------
427  // Handle EC redirect
428  //--------------------------------------------------------------------
429  if( status->code == errRedirect )
430  {
431  std::string ecurl = status->GetErrorMessage();
432  EcHandler *ecHandler = GetEcHandler( hostList->front().url, ecurl );
433  if( ecHandler && pStateHandler->NeedFileTempl() )
434  {
435  delete status;
436  status = new XRootDStatus( stError, errNotSupported, 0,
437  "File template not supported with Ec" );
438  delete ecHandler;
439  ecHandler = 0;
440  }
441  else if( ecHandler )
442  {
443  pStateHandler->pPlugin = ecHandler; // set the plugin for the File object
444  ecHandler->Open( pStateHandler->pOpenFlags, pUserHandler, 0/*TODO figure out right value for the timeout*/ );
445  return;
446  }
447  }
448 #endif
449  //----------------------------------------------------------------------
450  // Notify the state handler and the client and say bye bye
451  //----------------------------------------------------------------------
452  pStateHandler->OnOpen( status, openInfo, hostList );
453  delete response;
454  if( pUserHandler )
455  pUserHandler->HandleResponseWithHosts( status, 0, hostList );
456  else
457  {
458  delete status;
459  delete hostList;
460  }
461  delete this;
462  }
463 
464  private:
465  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
466  XrdCl::ResponseHandler *pUserHandler;
467  };
468 
469  //----------------------------------------------------------------------------
470  // Object that does things to the FileStateHandler when kXR_close returns
471  // and then calls the user handler
472  //----------------------------------------------------------------------------
473  class CloseHandler: public XrdCl::ResponseHandler
474  {
475  public:
476  //------------------------------------------------------------------------
477  // Constructor
478  //------------------------------------------------------------------------
479  CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
480  XrdCl::ResponseHandler *userHandler,
481  XrdCl::Message *message ):
482  pStateHandler( stateHandler ),
483  pUserHandler( userHandler ),
484  pMessage( message )
485  {
486  }
487 
488  //------------------------------------------------------------------------
490  //------------------------------------------------------------------------
491  virtual ~CloseHandler()
492  {
493  delete pMessage;
494  }
495 
496  //------------------------------------------------------------------------
497  // Handle the response
498  //------------------------------------------------------------------------
499  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
500  XrdCl::AnyObject *response,
501  XrdCl::HostList *hostList )
502  {
503  pStateHandler->OnClose( status );
504  if( pUserHandler )
505  pUserHandler->HandleResponseWithHosts( status, response, hostList );
506  else
507  {
508  delete response;
509  delete status;
510  delete hostList;
511  }
512 
513  delete this;
514  }
515 
516  private:
517  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
518  XrdCl::ResponseHandler *pUserHandler;
519  XrdCl::Message *pMessage;
520  };
521 
522  //----------------------------------------------------------------------------
523  // Stateful message handler
524  //----------------------------------------------------------------------------
525  class StatefulHandler: public XrdCl::ResponseHandler
526  {
527  public:
528  //------------------------------------------------------------------------
529  // Constructor
530  //------------------------------------------------------------------------
531  StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
532  XrdCl::ResponseHandler *userHandler,
533  XrdCl::Message *message,
534  const XrdCl::MessageSendParams &sendParams ):
535  pStateHandler( stateHandler ),
536  pUserHandler( userHandler ),
537  pMessage( message ),
538  pSendParams( sendParams )
539  {
540  }
541 
542  //------------------------------------------------------------------------
543  // Destructor
544  //------------------------------------------------------------------------
545  virtual ~StatefulHandler()
546  {
547  delete pMessage;
548  delete pSendParams.chunkList;
549  delete pSendParams.kbuff;
550  }
551 
552  //------------------------------------------------------------------------
553  // Handle the response
554  //------------------------------------------------------------------------
555  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
556  XrdCl::AnyObject *response,
557  XrdCl::HostList *hostList )
558  {
559  using namespace XrdCl;
560  std::unique_ptr<AnyObject> responsePtr( response );
561  pSendParams.hostList = hostList;
562 
563  //----------------------------------------------------------------------
564  // Houston we have a problem...
565  //----------------------------------------------------------------------
566  if( !status->IsOK() )
567  {
568  XrdCl::FileStateHandler::OnStateError( pStateHandler, status, pMessage, this, pSendParams );
569  return;
570  }
571 
572  //----------------------------------------------------------------------
573  // We're clear
574  //----------------------------------------------------------------------
575  responsePtr.release();
576  XrdCl::FileStateHandler::OnStateResponse( pStateHandler, status, pMessage, response, hostList );
577  if( pUserHandler )
578  pUserHandler->HandleResponseWithHosts( status, response, hostList );
579  else
580  {
581  delete status,
582  delete response;
583  delete hostList;
584  }
585  delete this;
586  }
587 
588  //------------------------------------------------------------------------
590  //------------------------------------------------------------------------
591  XrdCl::ResponseHandler *GetUserHandler()
592  {
593  return pUserHandler;
594  }
595 
596  private:
597  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
598  XrdCl::ResponseHandler *pUserHandler;
599  XrdCl::Message *pMessage;
600  XrdCl::MessageSendParams pSendParams;
601  };
602 
603  //----------------------------------------------------------------------------
604  // Release-buffer Handler
605  //----------------------------------------------------------------------------
606  class ReleaseBufferHandler: public XrdCl::ResponseHandler
607  {
608  public:
609 
610  //------------------------------------------------------------------------
611  // Constructor
612  //------------------------------------------------------------------------
613  ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
614  buffer( std::move( buffer ) ),
615  handler( handler )
616  {
617  }
618 
619  //------------------------------------------------------------------------
620  // Handle the response
621  //------------------------------------------------------------------------
622  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
623  XrdCl::AnyObject *response,
624  XrdCl::HostList *hostList )
625  {
626  if (handler)
627  handler->HandleResponseWithHosts( status, response, hostList );
628  }
629 
630  //------------------------------------------------------------------------
631  // Get the underlying buffer
632  //------------------------------------------------------------------------
633  XrdCl::Buffer& GetBuffer()
634  {
635  return buffer;
636  }
637 
638  private:
639  XrdCl::Buffer buffer;
640  XrdCl::ResponseHandler *handler;
641  };
642 }
643 
644 namespace XrdCl
645 {
646  //----------------------------------------------------------------------------
647  // Constructor
648  //----------------------------------------------------------------------------
649  FileStateHandler::FileStateHandler( FilePlugIn *& plugin ):
650  pFileState( Closed ),
651  pStatInfo( 0 ),
652  pFileUrl( 0 ),
653  pDataServer( 0 ),
654  pLoadBalancer( 0 ),
655  pStateRedirect( 0 ),
656  pWrtRecoveryRedir( 0 ),
657  pFileHandle( 0 ),
658  pOpenMode( 0 ),
659  pOpenFlags( OpenFlags::None ),
660  pSessionId( 0 ),
661  pDoRecoverRead( true ),
662  pDoRecoverWrite( true ),
663  pFollowRedirects( true ),
664  pUseVirtRedirector( true ),
665  pIsChannelEncrypted( false ),
666  pAllowBundledClose( false ),
667  pPlugin( plugin )
668  {
669  pFileHandle = new uint8_t[4];
670  ResetMonitoringVars();
673  pLFileHandler = new LocalFileHandler();
674  }
675 
676  //------------------------------------------------------------------------
681  //------------------------------------------------------------------------
682  FileStateHandler::FileStateHandler( bool useVirtRedirector, FilePlugIn *& plugin ):
683  pFileState( Closed ),
684  pStatInfo( 0 ),
685  pFileUrl( 0 ),
686  pDataServer( 0 ),
687  pLoadBalancer( 0 ),
688  pStateRedirect( 0 ),
689  pWrtRecoveryRedir( 0 ),
690  pFileHandle( 0 ),
691  pOpenMode( 0 ),
692  pOpenFlags( OpenFlags::None ),
693  pSessionId( 0 ),
694  pDoRecoverRead( true ),
695  pDoRecoverWrite( true ),
696  pFollowRedirects( true ),
697  pUseVirtRedirector( useVirtRedirector ),
698  pAllowBundledClose( false ),
699  pPlugin( plugin )
700  {
701  pFileHandle = new uint8_t[4];
702  ResetMonitoringVars();
705  pLFileHandler = new LocalFileHandler();
706  }
707 
708  //----------------------------------------------------------------------------
709  // Destructor
710  //----------------------------------------------------------------------------
712  {
713  //--------------------------------------------------------------------------
714  // This, in principle, should never ever happen. Except for the case
715  // when we're interfaced with ROOT that may call this desctructor from
716  // its garbage collector, from its __cxa_finalize, ie. after the XrdCl lib
717  // has been finalized by the linker. So, if we don't have the log object
718  // at this point we just give up the hope.
719  //--------------------------------------------------------------------------
720  if( DefaultEnv::GetLog() && pSessionId && !pDataServer->IsLocalFile() ) // if the file object was bound to a physical connection
721  DefaultEnv::GetPostMaster()->DecFileInstCnt( *pDataServer );
722 
725 
728 
729  if( pFileState != Closed && DefaultEnv::GetLog() )
730  {
731  XRootDStatus st;
732  MonitorClose( &st );
733  ResetMonitoringVars();
734  }
735 
736  // check if the logger is still there, this is only for root, as root might
737  // have unload us already so in this case we don't want to do anything
738  if( DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
739  {
741  registry.Release( *pFileUrl );
742  }
743 
744  delete pStatInfo;
745  delete pFileUrl;
746  delete pDataServer;
747  delete pLoadBalancer;
748  delete [] pFileHandle;
749  delete pLFileHandler;
750  }
751 
752  //----------------------------------------------------------------------------
753  // Open with file template
754  //----------------------------------------------------------------------------
756  std::shared_ptr<FileStateHandler> &self,
757  ExportedFileTemplate *templ,
758  const std::string &url,
759  OpenFlags::Flags flags,
760  uint16_t mode,
761  ResponseHandler *handler,
762  time_t timeout )
763  {
764  if( !templ )
765  return XRootDStatus( stError, errInvalidArgs, 0, "Template file not available" );
766 
767  FileStateHandlerTemplate *fht = dynamic_cast<FileStateHandlerTemplate*>( templ );
768  if( !fht )
769  return XRootDStatus( stError, errInvalidArgs, 0, "Template file invalid" );
770 
771  self->pTemplateFileWp = fht->pTemplateFileWp;
772 
773  return OpenImpl( self, url, flags, mode, handler, timeout );
774  }
775 
776  //----------------------------------------------------------------------------
777  // Open the file pointed to by the given URL
778  //----------------------------------------------------------------------------
779  XRootDStatus FileStateHandler::Open( std::shared_ptr<FileStateHandler> &self,
780  const std::string &url,
781  OpenFlags::Flags flags,
782  uint16_t mode,
783  ResponseHandler *handler,
784  time_t timeout )
785  {
786  self->pTemplateFileWp.reset();
787  return OpenImpl( self, url, flags, mode, handler, timeout );
788  }
789 
790  //----------------------------------------------------------------------------
791  // Most of Open implementation, used by Open and OpenUsingTemplate
792  //----------------------------------------------------------------------------
793  XRootDStatus FileStateHandler::OpenImpl( std::shared_ptr<FileStateHandler> &self,
794  const std::string &url,
795  OpenFlags::Flags flags,
796  uint16_t mode,
797  ResponseHandler *handler,
798  time_t timeout )
799  {
800  XrdSysMutexHelper scopedLock( self->pMutex );
801 
802  //--------------------------------------------------------------------------
803  // Check if we can proceed
804  //--------------------------------------------------------------------------
805  if( self->pFileState == Error )
806  return self->pStatus;
807 
808  if( self->pFileState == OpenInProgress )
810 
811  if( self->pFileState == CloseInProgress || self->pFileState == Opened ||
812  self->pFileState == Recovering )
813  return XRootDStatus( stError, errInvalidOp );
814 
815  self->pFileState = OpenInProgress;
816 
817  //--------------------------------------------------------------------------
818  // Check if the parameters are valid
819  //--------------------------------------------------------------------------
820  Log *log = DefaultEnv::GetLog();
821 
822  if( self->pFileUrl )
823  {
824  if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
825  {
827  registry.Release( *self->pFileUrl );
828  }
829  delete self->pFileUrl;
830  self->pFileUrl = 0;
831  }
832 
833  self->pFileUrl = new URL( url );
834 
835  //--------------------------------------------------------------------------
836  // Add unique uuid to each open request so replays due to error/timeout
837  // recovery can be correctly handled.
838  //--------------------------------------------------------------------------
839  URL::ParamsMap cgi = self->pFileUrl->GetParams();
840  uuid_t uuid;
841  char requuid[37]= {0};
842  uuid_generate( uuid );
843  uuid_unparse( uuid, requuid );
844  cgi["xrdcl.requuid"] = requuid;
845  self->pFileUrl->SetParams( cgi );
846 
847  if( !self->pFileUrl->IsValid() )
848  {
849  log->Error( FileMsg, "[%p@%s] Trying to open invalid url: %s",
850  (void*)self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
851  self->pStatus = XRootDStatus( stError, errInvalidArgs );
852  self->pFileState = Closed;
853  return self->pStatus;
854  }
855 
856  //--------------------------------------------------------------------------
857  // Check if the recovery procedures should be enabled
858  //--------------------------------------------------------------------------
859  const URL::ParamsMap &urlParams = self->pFileUrl->GetParams();
860  URL::ParamsMap::const_iterator it;
861  it = urlParams.find( "xrdcl.recover-reads" );
862  if( (it != urlParams.end() && it->second == "false") ||
863  !self->pDoRecoverRead )
864  {
865  self->pDoRecoverRead = false;
866  log->Debug( FileMsg, "[%p@%s] Read recovery procedures are disabled",
867  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
868  }
869 
870  it = urlParams.find( "xrdcl.recover-writes" );
871  if( (it != urlParams.end() && it->second == "false") ||
872  !self->pDoRecoverWrite )
873  {
874  self->pDoRecoverWrite = false;
875  log->Debug( FileMsg, "[%p@%s] Write recovery procedures are disabled",
876  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
877  }
878 
879  //--------------------------------------------------------------------------
880  // Open the file
881  //--------------------------------------------------------------------------
882  log->Debug( FileMsg, "[%p@%s] Sending an open command", (void*)self.get(),
883  self->pFileUrl->GetObfuscatedURL().c_str() );
884 
885  self->pOpenMode = mode;
886  self->pOpenFlags = flags;
887  OpenHandler *openHandler = new OpenHandler( self, handler );
888 
889  Message *msg;
890  ClientOpenRequest *req;
891  std::string path = self->pFileUrl->GetPathWithFilteredParams();
892  MessageUtils::CreateRequest( msg, req, path.length() );
893 
894  req->requestid = kXR_open;
895  req->mode = mode;
896  req->options = (flags&0xffff) | kXR_async | kXR_retstat;
897  req->dlen = path.length();
898  URL sendUrl;
899  XRootDStatus st = FillFhTempl( self, *self->pFileUrl, msg, sendUrl );
900  if( !st.IsOK() )
901  {
902  delete openHandler;
903  self->pStatus = st;
904  self->pFileState = Closed;
905  return st;
906  }
907  msg->Append( path.c_str(), path.length(), 24 );
908 
910  MessageSendParams params; params.timeout = timeout;
911  params.followRedirects = self->pFollowRedirects;
913 
914  st = self->IssueRequest( sendUrl, msg, openHandler, params );
915 
916  if( !st.IsOK() )
917  {
918  delete openHandler;
919  self->pStatus = st;
920  self->pFileState = Closed;
921  return st;
922  }
923  return st;
924  }
925 
926  //----------------------------------------------------------------------------
927  // Close the file object
928  //----------------------------------------------------------------------------
929  XRootDStatus FileStateHandler::Close( std::shared_ptr<FileStateHandler> &self,
930  ResponseHandler *handler,
931  time_t timeout )
932  {
933  XrdSysMutexHelper scopedLock( self->pMutex );
934 
935  //--------------------------------------------------------------------------
936  // Check if we can proceed
937  //--------------------------------------------------------------------------
938  if( self->pFileState == Error )
939  return self->pStatus;
940 
941  if( self->pFileState == CloseInProgress )
943 
944  if( self->pFileState == Closed )
945  return XRootDStatus( stOK, suAlreadyDone );
946 
947  if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
948  return XRootDStatus( stError, errInvalidOp );
949 
950  if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
951  return XRootDStatus( stError, errInvalidOp );
952 
953  self->pFileState = CloseInProgress;
954 
955  Log *log = DefaultEnv::GetLog();
956  log->Debug( FileMsg, "[%p@%s] Sending a close command for handle %#x to %s",
957  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
958  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
959 
960  //--------------------------------------------------------------------------
961  // Close the file
962  //--------------------------------------------------------------------------
963  Message *msg;
964  ClientCloseRequest *req;
965  MessageUtils::CreateRequest( msg, req );
966 
967  req->requestid = kXR_close;
968  memcpy( req->fhandle, self->pFileHandle, 4 );
969 
971  msg->SetSessionId( self->pSessionId );
972  CloseHandler *closeHandler = new CloseHandler( self, handler, msg );
973  MessageSendParams params;
974  params.timeout = timeout;
975  params.followRedirects = false;
976  params.stateful = true;
978 
979  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
980 
981  if( !st.IsOK() )
982  {
983  // an invalid-session error means the connection to the server has been
984  // closed, which in turn means that the server closed the file already
985  if( st.code == errInvalidSession || st.code == errSocketDisconnected ||
987  st.code == errPollerError || st.code == errSocketError )
988  {
989  self->pFileState = Closed;
990  ResponseJob *job = new ResponseJob( closeHandler, new XRootDStatus(),
991  nullptr, nullptr );
993  return XRootDStatus();
994  }
995 
996  delete closeHandler;
997  self->pStatus = st;
998  self->pFileState = Error;
999  return st;
1000  }
1001  return st;
1002  }
1003 
1004  //----------------------------------------------------------------------------
1005  // Stat the file
1006  //----------------------------------------------------------------------------
1007  XRootDStatus FileStateHandler::Stat( std::shared_ptr<FileStateHandler> &self,
1008  bool force,
1009  ResponseHandler *handler,
1010  time_t timeout )
1011  {
1012  XrdSysMutexHelper scopedLock( self->pMutex );
1013 
1014  if( self->pFileState == Error ) return self->pStatus;
1015 
1016  if( self->pFileState != Opened && self->pFileState != Recovering )
1017  return XRootDStatus( stError, errInvalidOp );
1018 
1019  //--------------------------------------------------------------------------
1020  // Return the cached info
1021  //--------------------------------------------------------------------------
1022  if( !force )
1023  {
1024  AnyObject *obj = new AnyObject();
1025  obj->Set( new StatInfo( *self->pStatInfo ) );
1026  if (handler)
1027  handler->HandleResponseWithHosts( new XRootDStatus(), obj, new HostList() );
1028  return XRootDStatus();
1029  }
1030 
1031  Log *log = DefaultEnv::GetLog();
1032  log->Debug( FileMsg, "[%p@%s] Sending a stat command for handle %#x to %s",
1033  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1034  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1035 
1036  //--------------------------------------------------------------------------
1037  // Issue a new stat request
1038  // stating a file handle doesn't work (fixed in 3.2.0) so we need to
1039  // stat the pat
1040  //--------------------------------------------------------------------------
1041  Message *msg;
1042  ClientStatRequest *req;
1043  std::string path = self->pFileUrl->GetPath();
1044  MessageUtils::CreateRequest( msg, req );
1045 
1046  req->requestid = kXR_stat;
1047  memcpy( req->fhandle, self->pFileHandle, 4 );
1048 
1049  MessageSendParams params;
1050  params.timeout = timeout;
1051  params.followRedirects = false;
1052  params.stateful = true;
1054 
1056  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1057 
1058  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1059  }
1060 
1061  //----------------------------------------------------------------------------
1062  // Preread scattered data tracts in one operation - async
1063  //----------------------------------------------------------------------------
1064  XRootDStatus FileStateHandler::PreRead( std::shared_ptr<FileStateHandler> &self,
1065  const TractList &tracts,
1066  ResponseHandler *handler,
1067  time_t timeout )
1068  {
1069  //--------------------------------------------------------------------------
1070  // Sanity check
1071  //--------------------------------------------------------------------------
1072  XrdSysMutexHelper scopedLock( self->pMutex );
1073 
1074  if( self->pFileState == Error ) return self->pStatus;
1075 
1076  if( self->pFileState != Opened && self->pFileState != Recovering )
1077  return XRootDStatus( stError, errInvalidOp );
1078 
1079  Log *log = DefaultEnv::GetLog();
1080  log->Debug( FileMsg, "[%p@%s] Sending an read+preread command for handle %#x to %s",
1081  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1082  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1083 
1084  //--------------------------------------------------------------------------
1085  // Build the message
1086  //--------------------------------------------------------------------------
1087  Message *msg;
1088  ClientReadRequest *req;
1089  MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*tracts.size() + 8 );
1090 
1091  req->requestid = kXR_read;
1092  req->offset = 0;
1093  req->rlen = 0;
1094  memcpy( req->fhandle, self->pFileHandle, 4 );
1095  req->dlen = sizeof(readahead_list)*tracts.size() + 8;
1096 
1097  static char dummyBuff[8];
1098  ChunkList *list = new ChunkList();
1099  list->push_back( ChunkInfo( 0, 0, dummyBuff ) );
1100 
1101  //--------------------------------------------------------------------------
1102  // Copy the tract info
1103  //--------------------------------------------------------------------------
1104  readahead_list *dataTract = (readahead_list*)msg->GetBuffer( 24 + 8 );
1105  for( size_t i = 0; i < tracts.size(); ++i )
1106  {
1107  dataTract[i].rlen = tracts[i].length;
1108  dataTract[i].offset = tracts[i].offset;
1109  memcpy( dataTract[i].fhandle, req->fhandle, 4 );
1110  }
1111 
1112  //--------------------------------------------------------------------------
1113  // Send the message
1114  //--------------------------------------------------------------------------
1115  MessageSendParams params;
1116  params.timeout = timeout;
1117  params.followRedirects = false;
1118  params.stateful = true;
1119  params.chunkList = list;
1121 
1123  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1124 
1125  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1126  }
1127 
1128  //----------------------------------------------------------------------------
1129  // Read a data chunk at a given offset - sync
1130  //----------------------------------------------------------------------------
1131  XRootDStatus FileStateHandler::Read( std::shared_ptr<FileStateHandler> &self,
1132  uint64_t offset,
1133  uint32_t size,
1134  void *buffer,
1135  ResponseHandler *handler,
1136  time_t timeout )
1137  {
1138  XrdSysMutexHelper scopedLock( self->pMutex );
1139 
1140  if( self->pFileState == Error ) return self->pStatus;
1141 
1142  if( self->pFileState != Opened && self->pFileState != Recovering )
1143  return XRootDStatus( stError, errInvalidOp );
1144 
1145  Log *log = DefaultEnv::GetLog();
1146  log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1147  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1148  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1149 
1150  Message *msg;
1151  ClientReadRequest *req;
1152  MessageUtils::CreateRequest( msg, req );
1153 
1154  req->requestid = kXR_read;
1155  req->offset = offset;
1156  req->rlen = size;
1157  memcpy( req->fhandle, self->pFileHandle, 4 );
1158 
1159  ChunkList *list = new ChunkList();
1160  list->push_back( ChunkInfo( offset, size, buffer ) );
1161 
1163  MessageSendParams params;
1164  params.timeout = timeout;
1165  params.followRedirects = false;
1166  params.stateful = true;
1167  params.chunkList = list;
1169  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1170 
1171  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1172  }
1173 
1174  //------------------------------------------------------------------------
1175  // Read data pages at a given offset
1176  //------------------------------------------------------------------------
1177  XRootDStatus FileStateHandler::PgRead( std::shared_ptr<FileStateHandler> &self,
1178  uint64_t offset,
1179  uint32_t size,
1180  void *buffer,
1181  ResponseHandler *handler,
1182  time_t timeout )
1183  {
1184  int issupported = true;
1185  AnyObject obj;
1187  int protver = 0;
1188  XRootDStatus st2 = Utils::GetProtocolVersion( *self->pDataServer, protver );
1189  if( st1.IsOK() && st2.IsOK() )
1190  {
1191  int *ptr = 0;
1192  obj.Get( ptr );
1193  issupported = ( ptr && (*ptr & kXR_suppgrw) ) && ( protver >= kXR_PROTPGRWVERSION );
1194  delete ptr;
1195  }
1196  else
1197  issupported = false;
1198 
1199  if( !issupported )
1200  {
1201  DefaultEnv::GetLog()->Debug( FileMsg, "[%p@%s] PgRead not supported; substituting with Read.",
1202  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1203  ResponseHandler *substitHandler = new PgReadSubstitutionHandler( self, handler );
1204  auto st = Read( self, offset, size, buffer, substitHandler, timeout );
1205  if( !st.IsOK() ) delete substitHandler;
1206  return st;
1207  }
1208 
1209  ResponseHandler* pgHandler = new PgReadHandler( self, handler, offset );
1210  auto st = PgReadImpl( self, offset, size, buffer, PgReadFlags::None, pgHandler, timeout );
1211  if( !st.IsOK() ) delete pgHandler;
1212  return st;
1213  }
1214 
1215  XRootDStatus FileStateHandler::PgReadRetry( std::shared_ptr<FileStateHandler> &self,
1216  uint64_t offset,
1217  uint32_t size,
1218  size_t pgnb,
1219  void *buffer,
1220  PgReadHandler *handler,
1221  time_t timeout )
1222  {
1223  if( size > (uint32_t)XrdSys::PageSize )
1224  return XRootDStatus( stError, errInvalidArgs, EINVAL,
1225  "PgRead retry size exceeded 4KB." );
1226 
1227  ResponseHandler *retryHandler = new PgReadRetryHandler( handler, pgnb );
1228  XRootDStatus st = PgReadImpl( self, offset, size, buffer, PgReadFlags::Retry, retryHandler, timeout );
1229  if( !st.IsOK() ) delete retryHandler;
1230  return st;
1231  }
1232 
1233  XRootDStatus FileStateHandler::PgReadImpl( std::shared_ptr<FileStateHandler> &self,
1234  uint64_t offset,
1235  uint32_t size,
1236  void *buffer,
1237  uint16_t flags,
1238  ResponseHandler *handler,
1239  time_t timeout )
1240  {
1241  XrdSysMutexHelper scopedLock( self->pMutex );
1242 
1243  if( self->pFileState == Error ) return self->pStatus;
1244 
1245  if( self->pFileState != Opened && self->pFileState != Recovering )
1246  return XRootDStatus( stError, errInvalidOp );
1247 
1248  Log *log = DefaultEnv::GetLog();
1249  log->Debug( FileMsg, "[%p@%s] Sending a pgread command for handle %#x to %s",
1250  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1251  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1252 
1253  Message *msg;
1254  ClientPgReadRequest *req;
1255  MessageUtils::CreateRequest( msg, req, sizeof( ClientPgReadReqArgs ) );
1256 
1257  req->requestid = kXR_pgread;
1258  req->offset = offset;
1259  req->rlen = size;
1260  memcpy( req->fhandle, self->pFileHandle, 4 );
1261 
1262  //--------------------------------------------------------------------------
1263  // Now adjust the message size so it can hold PgRead arguments
1264  //--------------------------------------------------------------------------
1265  req->dlen = sizeof( ClientPgReadReqArgs );
1266  void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
1267  memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
1268  ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
1269  msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
1270  args->reqflags = flags;
1271 
1272  ChunkList *list = new ChunkList();
1273  list->push_back( ChunkInfo( offset, size, buffer ) );
1274 
1276  MessageSendParams params;
1277  params.timeout = timeout;
1278  params.followRedirects = false;
1279  params.stateful = true;
1280  params.chunkList = list;
1282  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1283 
1284  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1285  }
1286 
1287  //----------------------------------------------------------------------------
1288  // Write a data chunk at a given offset - async
1289  //----------------------------------------------------------------------------
1290  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1291  uint64_t offset,
1292  uint32_t size,
1293  const void *buffer,
1294  ResponseHandler *handler,
1295  time_t timeout )
1296  {
1297  XrdSysMutexHelper scopedLock( self->pMutex );
1298 
1299  if( self->pFileState == Error ) return self->pStatus;
1300 
1301  if( self->pFileState != Opened && self->pFileState != Recovering )
1302  return XRootDStatus( stError, errInvalidOp );
1303 
1304  Log *log = DefaultEnv::GetLog();
1305  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1306  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1307  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1308 
1309  Message *msg;
1310  ClientWriteRequest *req;
1311  MessageUtils::CreateRequest( msg, req );
1312 
1313  req->requestid = kXR_write;
1314  req->offset = offset;
1315  req->dlen = size;
1316  memcpy( req->fhandle, self->pFileHandle, 4 );
1317 
1318  ChunkList *list = new ChunkList();
1319  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
1320 
1321  MessageSendParams params;
1322  params.timeout = timeout;
1323  params.followRedirects = false;
1324  params.stateful = true;
1325  params.chunkList = list;
1326 
1328 
1330  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1331 
1332  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1333  }
1334 
1335  //----------------------------------------------------------------------------
1336  // Write a data chunk at a given offset
1337  //----------------------------------------------------------------------------
1338  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1339  uint64_t offset,
1340  Buffer &&buffer,
1341  ResponseHandler *handler,
1342  time_t timeout )
1343  {
1344  //--------------------------------------------------------------------------
1345  // If the memory is not page (4KB) aligned we cannot use the kernel buffer
1346  // so fall back to normal write
1347  //--------------------------------------------------------------------------
1348  if( !XrdSys::KernelBuffer::IsPageAligned( buffer.GetBuffer() ) || self->pIsChannelEncrypted )
1349  {
1350  Log *log = DefaultEnv::GetLog();
1351  log->Info( FileMsg, "[%p@%s] Buffer for handle %#x is not page aligned (4KB), "
1352  "cannot convert it to kernel space buffer.", (void*)self.get(),
1353  self->pFileUrl->GetObfuscatedURL().c_str(), *((uint32_t*)self->pFileHandle) );
1354 
1355  void *buff = buffer.GetBuffer();
1356  uint32_t size = buffer.GetSize();
1357  ReleaseBufferHandler *wrtHandler =
1358  new ReleaseBufferHandler( std::move( buffer ), handler );
1359  XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1360  if( !st.IsOK() )
1361  {
1362  buffer = std::move( wrtHandler->GetBuffer() );
1363  delete wrtHandler;
1364  }
1365  return st;
1366  }
1367 
1368  //--------------------------------------------------------------------------
1369  // Transfer the data from user space to kernel space
1370  //--------------------------------------------------------------------------
1371  uint32_t length = buffer.GetSize();
1372  char *ubuff = buffer.Release();
1373 
1374  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1375  ssize_t ret = XrdSys::Move( ubuff, *kbuff, length );
1376  if( ret < 0 )
1377  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1378 
1379  //--------------------------------------------------------------------------
1380  // Now create a write request and enqueue it
1381  //--------------------------------------------------------------------------
1382  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1383  }
1384 
1385  //----------------------------------------------------------------------------
1386  // Write a data from a given file descriptor at a given offset - async
1387  //----------------------------------------------------------------------------
1388  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1389  uint64_t offset,
1390  uint32_t size,
1391  Optional<uint64_t> fdoff,
1392  int fd,
1393  ResponseHandler *handler,
1394  time_t timeout )
1395  {
1396  //--------------------------------------------------------------------------
1397  // Read the data from the file descriptor into a kernel buffer
1398  //--------------------------------------------------------------------------
1399  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1400  ssize_t ret = fdoff ? XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1401  XrdSys::Read( fd, *kbuff, size );
1402  if( ret < 0 )
1403  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1404 
1405  //--------------------------------------------------------------------------
1406  // Now create a write request and enqueue it
1407  //--------------------------------------------------------------------------
1408  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1409  }
1410 
1411  //----------------------------------------------------------------------------
1412  // Write number of pages at a given offset - async
1413  //----------------------------------------------------------------------------
1414  XRootDStatus FileStateHandler::PgWrite( std::shared_ptr<FileStateHandler> &self,
1415  uint64_t offset,
1416  uint32_t size,
1417  const void *buffer,
1418  std::vector<uint32_t> &cksums,
1419  ResponseHandler *handler,
1420  time_t timeout )
1421  {
1422  //--------------------------------------------------------------------------
1423  // Resolve timeout value
1424  //--------------------------------------------------------------------------
1425  if( timeout == 0 )
1426  {
1427  int val = DefaultRequestTimeout;
1428  XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
1429  timeout = val;
1430  }
1431 
1432  //--------------------------------------------------------------------------
1433  // Validate the digest vector size
1434  //--------------------------------------------------------------------------
1435  if( cksums.empty() )
1436  {
1437  const char *data = static_cast<const char*>( buffer );
1438  XrdOucPgrwUtils::csCalc( data, offset, size, cksums );
1439  }
1440  else
1441  {
1442  size_t crc32cCnt = XrdOucPgrwUtils::csNum( offset, size );
1443  if( crc32cCnt != cksums.size() )
1444  return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
1445  }
1446 
1447  //--------------------------------------------------------------------------
1448  // Create a context for PgWrite operation
1449  //--------------------------------------------------------------------------
1450  struct pgwrt_t
1451  {
1452  pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
1453  {
1454  }
1455 
1456  ~pgwrt_t()
1457  {
1458  if( handler )
1459  {
1460  // if all retries were successful no error status was set
1461  if( !status ) status = new XRootDStatus();
1462  handler->HandleResponse( status, nullptr );
1463  }
1464  }
1465 
1466  static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1467  {
1468  if( pgoff == offset ) return 0; // we need this if statement because we operate on unsigned integers
1469  return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
1470  }
1471 
1472  inline void SetStatus( XRootDStatus* s )
1473  {
1474  if( !status ) status = s;
1475  else delete s;
1476  }
1477 
1478  ResponseHandler *handler;
1479  XRootDStatus *status;
1480  };
1481  auto pgwrt = std::make_shared<pgwrt_t>( handler );
1482 
1483  int fLen, lLen;
1484  XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
1485  uint32_t fstpglen = fLen;
1486 
1487  time_t start = ::time( nullptr );
1488  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1489  {
1490  std::unique_ptr<AnyObject> scoped( r );
1491  // if the request failed simply pass the status to the
1492  // user handler
1493  if( !s->IsOK() )
1494  {
1495  pgwrt->SetStatus( s );
1496  return; // pgwrt destructor will call the handler
1497  }
1498  // also if the request was sucessful and there were no
1499  // corrupted pages pass the status to the user handler
1500  RetryInfo *inf = nullptr;
1501  r->Get( inf );
1502  if( !inf->NeedRetry() )
1503  {
1504  pgwrt->SetStatus( s );
1505  return; // pgwrt destructor will call the handler
1506  }
1507  delete s;
1508  // first adjust the timeout value
1509  time_t elapsed = ::time( nullptr ) - start;
1510  if( elapsed >= timeout )
1511  {
1512  pgwrt->SetStatus( new XRootDStatus( stError, errOperationExpired ) );
1513  return; // pgwrt destructor will call the handler
1514  }
1515  else timeout -= elapsed;
1516  // retransmit the corrupted pages
1517  for( size_t i = 0; i < inf->Size(); ++i )
1518  {
1519  auto tpl = inf->At( i );
1520  uint64_t pgoff = std::get<0>( tpl );
1521  uint32_t pglen = std::get<1>( tpl );
1522  const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
1523  uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1524  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1525  {
1526  std::unique_ptr<AnyObject> scoped( r );
1527  // if we failed simply set the status
1528  if( !s->IsOK() )
1529  {
1530  pgwrt->SetStatus( s );
1531  return; // the destructor will call the handler
1532  }
1533  delete s;
1534  // otherwise check if the data were not corrupted again
1535  RetryInfo *inf = nullptr;
1536  r->Get( inf );
1537  if( inf->NeedRetry() ) // so we failed in the end
1538  {
1539  DefaultEnv::GetLog()->Warning( FileMsg, "[%p@%s] Failed retransmitting corrupted "
1540  "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1541  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1542  pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
1543  "Failed to retransmit corrupted page" ) );
1544  }
1545  else
1546  DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Succesfuly retransmitted corrupted "
1547  "page: pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1548  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1549  } );
1550  auto st = PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1551  if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
1552  DefaultEnv::GetLog()->Info( FileMsg, "[%p@%s] Retransmitting corrupted page: "
1553  "pgoff=%llu, pglen=%u, pgdigest=%u", (void*)self.get(),
1554  self->pFileUrl->GetObfuscatedURL().c_str(), (unsigned long long) pgoff, pglen, pgdigest );
1555  }
1556  } );
1557 
1558  auto st = PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1559  if( !st.IsOK() )
1560  {
1561  pgwrt->handler = nullptr;
1562  delete h;
1563  }
1564  return st;
1565  }
1566 
1567  //------------------------------------------------------------------------
1568  // Write number of pages at a given offset - async
1569  //------------------------------------------------------------------------
1570  XRootDStatus FileStateHandler::PgWriteRetry( std::shared_ptr<FileStateHandler> &self,
1571  uint64_t offset,
1572  uint32_t size,
1573  const void *buffer,
1574  uint32_t digest,
1575  ResponseHandler *handler,
1576  time_t timeout )
1577  {
1578  std::vector<uint32_t> cksums{ digest };
1579  return PgWriteImpl( self, offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
1580  }
1581 
1582  //------------------------------------------------------------------------
1583  // Write number of pages at a given offset - async
1584  //------------------------------------------------------------------------
1585  XRootDStatus FileStateHandler::PgWriteImpl( std::shared_ptr<FileStateHandler> &self,
1586  uint64_t offset,
1587  uint32_t size,
1588  const void *buffer,
1589  std::vector<uint32_t> &cksums,
1590  kXR_char flags,
1591  ResponseHandler *handler,
1592  time_t timeout )
1593  {
1594  XrdSysMutexHelper scopedLock( self->pMutex );
1595 
1596  if( self->pFileState == Error ) return self->pStatus;
1597 
1598  if( self->pFileState != Opened && self->pFileState != Recovering )
1599  return XRootDStatus( stError, errInvalidOp );
1600 
1601  Log *log = DefaultEnv::GetLog();
1602  log->Debug( FileMsg, "[%p@%s] Sending a pgwrite command for handle %#x to %s",
1603  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1604  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1605 
1606  //--------------------------------------------------------------------------
1607  // Create the message
1608  //--------------------------------------------------------------------------
1609  Message *msg;
1610  ClientPgWriteRequest *req;
1611  MessageUtils::CreateRequest( msg, req );
1612 
1613  req->requestid = kXR_pgwrite;
1614  req->offset = offset;
1615  req->dlen = size + cksums.size() * sizeof( uint32_t );
1616  req->reqflags = flags;
1617  memcpy( req->fhandle, self->pFileHandle, 4 );
1618 
1619  ChunkList *list = new ChunkList();
1620  list->push_back( ChunkInfo( offset, size, (char*)buffer ) );
1621 
1622  MessageSendParams params;
1623  params.timeout = timeout;
1624  params.followRedirects = false;
1625  params.stateful = true;
1626  params.chunkList = list;
1627  params.crc32cDigests.swap( cksums );
1628 
1630 
1632  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1633 
1634  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1635  }
1636 
1637  //----------------------------------------------------------------------------
1638  // Commit all pending disk writes - async
1639  //----------------------------------------------------------------------------
1640  XRootDStatus FileStateHandler::Sync( std::shared_ptr<FileStateHandler> &self,
1641  ResponseHandler *handler,
1642  time_t timeout )
1643  {
1644  XrdSysMutexHelper scopedLock( self->pMutex );
1645 
1646  if( self->pFileState == Error ) return self->pStatus;
1647 
1648  if( self->pFileState != Opened && self->pFileState != Recovering )
1649  return XRootDStatus( stError, errInvalidOp );
1650 
1651  Log *log = DefaultEnv::GetLog();
1652  log->Debug( FileMsg, "[%p@%s] Sending a sync command for handle %#x to %s",
1653  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1654  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1655 
1656  Message *msg;
1657  ClientSyncRequest *req;
1658  MessageUtils::CreateRequest( msg, req );
1659 
1660  req->requestid = kXR_sync;
1661  memcpy( req->fhandle, self->pFileHandle, 4 );
1662 
1663  MessageSendParams params;
1664  params.timeout = timeout;
1665  params.followRedirects = false;
1666  params.stateful = true;
1668 
1670  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1671 
1672  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1673  }
1674 
1675  //----------------------------------------------------------------------------
1676  // Truncate the file to a particular size - async
1677  //----------------------------------------------------------------------------
1678  XRootDStatus FileStateHandler::Truncate( std::shared_ptr<FileStateHandler> &self,
1679  uint64_t size,
1680  ResponseHandler *handler,
1681  time_t timeout )
1682  {
1683  XrdSysMutexHelper scopedLock( self->pMutex );
1684 
1685  if( self->pFileState == Error ) return self->pStatus;
1686 
1687  if( self->pFileState != Opened && self->pFileState != Recovering )
1688  return XRootDStatus( stError, errInvalidOp );
1689 
1690  Log *log = DefaultEnv::GetLog();
1691  log->Debug( FileMsg, "[%p@%s] Sending a truncate command for handle %#x to %s",
1692  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1693  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1694 
1695  Message *msg;
1696  ClientTruncateRequest *req;
1697  MessageUtils::CreateRequest( msg, req );
1698 
1699  req->requestid = kXR_truncate;
1700  memcpy( req->fhandle, self->pFileHandle, 4 );
1701  req->offset = size;
1702 
1703  MessageSendParams params;
1704  params.timeout = timeout;
1705  params.followRedirects = false;
1706  params.stateful = true;
1708 
1710  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1711 
1712  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1713  }
1714 
1715  //----------------------------------------------------------------------------
1716  // Read scattered data chunks in one operation - async
1717  //----------------------------------------------------------------------------
1718  XRootDStatus FileStateHandler::VectorRead( std::shared_ptr<FileStateHandler> &self,
1719  const ChunkList &chunks,
1720  void *buffer,
1721  ResponseHandler *handler,
1722  time_t timeout )
1723  {
1724  //--------------------------------------------------------------------------
1725  // Sanity check
1726  //--------------------------------------------------------------------------
1727  XrdSysMutexHelper scopedLock( self->pMutex );
1728 
1729  if( self->pFileState == Error ) return self->pStatus;
1730 
1731  if( self->pFileState != Opened && self->pFileState != Recovering )
1732  return XRootDStatus( stError, errInvalidOp );
1733 
1734  Log *log = DefaultEnv::GetLog();
1735  log->Debug( FileMsg, "[%p@%s] Sending a vector read command for handle %#x to %s",
1736  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1737  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1738 
1739  //--------------------------------------------------------------------------
1740  // Build the message
1741  //--------------------------------------------------------------------------
1742  Message *msg;
1743  ClientReadVRequest *req;
1744  MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*chunks.size() );
1745 
1746  req->requestid = kXR_readv;
1747  req->dlen = sizeof(readahead_list)*chunks.size();
1748 
1749  ChunkList *list = new ChunkList();
1750  char *cursor = (char*)buffer;
1751 
1752  //--------------------------------------------------------------------------
1753  // Copy the chunk info
1754  //--------------------------------------------------------------------------
1755  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
1756  for( size_t i = 0; i < chunks.size(); ++i )
1757  {
1758  dataChunk[i].rlen = chunks[i].length;
1759  dataChunk[i].offset = chunks[i].offset;
1760  memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1761 
1762  void *chunkBuffer;
1763  if( cursor )
1764  {
1765  chunkBuffer = cursor;
1766  cursor += chunks[i].length;
1767  }
1768  else
1769  chunkBuffer = chunks[i].buffer;
1770 
1771  list->push_back( ChunkInfo( chunks[i].offset,
1772  chunks[i].length,
1773  chunkBuffer ) );
1774  }
1775 
1776  //--------------------------------------------------------------------------
1777  // Send the message
1778  //--------------------------------------------------------------------------
1779  MessageSendParams params;
1780  params.timeout = timeout;
1781  params.followRedirects = false;
1782  params.stateful = true;
1783  params.chunkList = list;
1785 
1787  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1788 
1789  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1790  }
1791 
1792  //------------------------------------------------------------------------
1793  // Write scattered data chunks in one operation - async
1794  //------------------------------------------------------------------------
1795  XRootDStatus FileStateHandler::VectorWrite( std::shared_ptr<FileStateHandler> &self,
1796  const ChunkList &chunks,
1797  ResponseHandler *handler,
1798  time_t timeout )
1799  {
1800  //--------------------------------------------------------------------------
1801  // Sanity check
1802  //--------------------------------------------------------------------------
1803  XrdSysMutexHelper scopedLock( self->pMutex );
1804 
1805  if( self->pFileState == Error ) return self->pStatus;
1806 
1807  if( self->pFileState != Opened && self->pFileState != Recovering )
1808  return XRootDStatus( stError, errInvalidOp );
1809 
1810  Log *log = DefaultEnv::GetLog();
1811  log->Debug( FileMsg, "[%p@%s] Sending a vector write command for handle %#x to %s",
1812  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1813  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1814 
1815  //--------------------------------------------------------------------------
1816  // Determine the size of the payload
1817  //--------------------------------------------------------------------------
1818 
1819  // the size of write vector
1820  uint32_t payloadSize = sizeof(XrdProto::write_list) * chunks.size();
1821 
1822  //--------------------------------------------------------------------------
1823  // Build the message
1824  //--------------------------------------------------------------------------
1825  Message *msg;
1826  ClientWriteVRequest *req;
1827  MessageUtils::CreateRequest( msg, req, payloadSize );
1828 
1829  req->requestid = kXR_writev;
1830  req->dlen = sizeof(XrdProto::write_list) * chunks.size();
1831 
1832  ChunkList *list = new ChunkList();
1833 
1834  //--------------------------------------------------------------------------
1835  // Copy the chunk info
1836  //--------------------------------------------------------------------------
1837  XrdProto::write_list *writeList =
1838  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
1839 
1840 
1841 
1842  for( size_t i = 0; i < chunks.size(); ++i )
1843  {
1844  writeList[i].wlen = chunks[i].length;
1845  writeList[i].offset = chunks[i].offset;
1846  memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1847 
1848  list->push_back( ChunkInfo( chunks[i].offset,
1849  chunks[i].length,
1850  chunks[i].buffer ) );
1851  }
1852 
1853  //--------------------------------------------------------------------------
1854  // Send the message
1855  //--------------------------------------------------------------------------
1856  MessageSendParams params;
1857  params.timeout = timeout;
1858  params.followRedirects = false;
1859  params.stateful = true;
1860  params.chunkList = list;
1862 
1864  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1865 
1866  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1867  }
1868 
1869  //------------------------------------------------------------------------
1870  // Write scattered buffers in one operation - async
1871  //------------------------------------------------------------------------
1872  XRootDStatus FileStateHandler::WriteV( std::shared_ptr<FileStateHandler> &self,
1873  uint64_t offset,
1874  const struct iovec *iov,
1875  int iovcnt,
1876  ResponseHandler *handler,
1877  time_t timeout )
1878  {
1879  XrdSysMutexHelper scopedLock( self->pMutex );
1880 
1881  if( self->pFileState == Error ) return self->pStatus;
1882 
1883  if( self->pFileState != Opened && self->pFileState != Recovering )
1884  return XRootDStatus( stError, errInvalidOp );
1885 
1886  Log *log = DefaultEnv::GetLog();
1887  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
1888  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1889  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1890 
1891  Message *msg;
1892  ClientWriteRequest *req;
1893  MessageUtils::CreateRequest( msg, req );
1894 
1895  ChunkList *list = new ChunkList();
1896 
1897  uint32_t size = 0;
1898  for( int i = 0; i < iovcnt; ++i )
1899  {
1900  if( iov[i].iov_len == 0 ) continue;
1901  size += iov[i].iov_len;
1902  list->push_back( ChunkInfo( 0, iov[i].iov_len,
1903  (char*)iov[i].iov_base ) );
1904  }
1905 
1906  req->requestid = kXR_write;
1907  req->offset = offset;
1908  req->dlen = size;
1909  memcpy( req->fhandle, self->pFileHandle, 4 );
1910 
1911  MessageSendParams params;
1912  params.timeout = timeout;
1913  params.followRedirects = false;
1914  params.stateful = true;
1915  params.chunkList = list;
1916 
1918 
1920  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1921 
1922  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1923  }
1924 
1925  //------------------------------------------------------------------------
1926  // Read data into scattered buffers in one operation - async
1927  //------------------------------------------------------------------------
1928  XRootDStatus FileStateHandler::ReadV( std::shared_ptr<FileStateHandler> &self,
1929  uint64_t offset,
1930  struct iovec *iov,
1931  int iovcnt,
1932  ResponseHandler *handler,
1933  time_t timeout )
1934  {
1935  XrdSysMutexHelper scopedLock( self->pMutex );
1936 
1937  if( self->pFileState == Error ) return self->pStatus;
1938 
1939  if( self->pFileState != Opened && self->pFileState != Recovering )
1940  return XRootDStatus( stError, errInvalidOp );
1941 
1942  Log *log = DefaultEnv::GetLog();
1943  log->Debug( FileMsg, "[%p@%s] Sending a read command for handle %#x to %s",
1944  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1945  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1946 
1947  Message *msg;
1948  ClientReadRequest *req;
1949  MessageUtils::CreateRequest( msg, req );
1950 
1951  // calculate the total read size
1952  size_t size = std::accumulate( iov, iov + iovcnt, 0, []( size_t acc, iovec &rhs )
1953  {
1954  return acc + rhs.iov_len;
1955  } );
1956  req->requestid = kXR_read;
1957  req->offset = offset;
1958  req->rlen = size;
1959  msg->SetVirtReqID( kXR_virtReadv );
1960  memcpy( req->fhandle, self->pFileHandle, 4 );
1961 
1962  ChunkList *list = new ChunkList();
1963  list->reserve( iovcnt );
1964  uint64_t choff = offset;
1965  for( int i = 0; i < iovcnt; ++i )
1966  {
1967  list->emplace_back( choff, iov[i].iov_len, iov[i].iov_base );
1968  choff += iov[i].iov_len;
1969  }
1970 
1972  MessageSendParams params;
1973  params.timeout = timeout;
1974  params.followRedirects = false;
1975  params.stateful = true;
1976  params.chunkList = list;
1978  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1979 
1980  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1981  }
1982 
1983 
1984  //----------------------------------------------------------------------------
1985  // Performs a custom operation on an open file, server implementation
1986  // dependent - async
1987  //----------------------------------------------------------------------------
1988  XRootDStatus FileStateHandler::Fcntl( std::shared_ptr<FileStateHandler> &self,
1989  QueryCode::Code queryCode,
1990  const Buffer &arg,
1991  ResponseHandler *handler,
1992  time_t timeout )
1993  {
1994  XrdSysMutexHelper scopedLock( self->pMutex );
1995 
1996  if( self->pFileState == Error ) return self->pStatus;
1997 
1998  if( self->pFileState != Opened && self->pFileState != Recovering )
1999  return XRootDStatus( stError, errInvalidOp );
2000 
2001  Log *log = DefaultEnv::GetLog();
2002  log->Debug( FileMsg, "[%p@%s] Sending a fcntl command for handle %#x to %s",
2003  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2004  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2005 
2006  Message *msg;
2007  ClientQueryRequest *req;
2008  MessageUtils::CreateRequest( msg, req, arg.GetSize() );
2009 
2010  req->requestid = kXR_query;
2011  req->infotype = queryCode;
2012  req->dlen = arg.GetSize();
2013  memcpy( req->fhandle, self->pFileHandle, 4 );
2014  msg->Append( arg.GetBuffer(), arg.GetSize(), sizeof(ClientQueryRequest) );
2015 
2016  MessageSendParams params;
2017  params.timeout = timeout;
2018  params.followRedirects = false;
2019  params.stateful = true;
2021 
2023  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2024 
2025  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2026  }
2027 
2028  //----------------------------------------------------------------------------
2029  // Get access token to a file - async
2030  //----------------------------------------------------------------------------
2031  XRootDStatus FileStateHandler::Visa( std::shared_ptr<FileStateHandler> &self,
2032  ResponseHandler *handler,
2033  time_t timeout )
2034  {
2035  XrdSysMutexHelper scopedLock( self->pMutex );
2036 
2037  if( self->pFileState == Error ) return self->pStatus;
2038 
2039  if( self->pFileState != Opened && self->pFileState != Recovering )
2040  return XRootDStatus( stError, errInvalidOp );
2041 
2042  Log *log = DefaultEnv::GetLog();
2043  log->Debug( FileMsg, "[%p@%s] Sending a visa command for handle %#x to %s",
2044  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2045  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2046 
2047  Message *msg;
2048  ClientQueryRequest *req;
2049  MessageUtils::CreateRequest( msg, req );
2050 
2051  req->requestid = kXR_query;
2052  req->infotype = kXR_Qvisa;
2053  memcpy( req->fhandle, self->pFileHandle, 4 );
2054 
2055  MessageSendParams params;
2056  params.timeout = timeout;
2057  params.followRedirects = false;
2058  params.stateful = true;
2060 
2062  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2063 
2064  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2065  }
2066 
2067  //------------------------------------------------------------------------
2068  // Set extended attributes - async
2069  //------------------------------------------------------------------------
2070  XRootDStatus FileStateHandler::SetXAttr( std::shared_ptr<FileStateHandler> &self,
2071  const std::vector<xattr_t> &attrs,
2072  ResponseHandler *handler,
2073  time_t timeout )
2074  {
2075  XrdSysMutexHelper scopedLock( self->pMutex );
2076 
2077  if( self->pFileState == Error ) return self->pStatus;
2078 
2079  if( self->pFileState != Opened && self->pFileState != Recovering )
2080  return XRootDStatus( stError, errInvalidOp );
2081 
2082  Log *log = DefaultEnv::GetLog();
2083  log->Debug( FileMsg, "[%p@%s] Sending a fattr set command for handle %#x to %s",
2084  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2085  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2086 
2087  //--------------------------------------------------------------------------
2088  // Issue a new fattr get request
2089  //--------------------------------------------------------------------------
2090  return XAttrOperationImpl( self, kXR_fattrSet, 0, attrs, handler, timeout );
2091  }
2092 
2093  //------------------------------------------------------------------------
2094  // Get extended attributes - async
2095  //------------------------------------------------------------------------
2096  XRootDStatus FileStateHandler::GetXAttr( std::shared_ptr<FileStateHandler> &self,
2097  const std::vector<std::string> &attrs,
2098  ResponseHandler *handler,
2099  time_t timeout )
2100  {
2101  XrdSysMutexHelper scopedLock( self->pMutex );
2102 
2103  if( self->pFileState == Error ) return self->pStatus;
2104 
2105  if( self->pFileState != Opened && self->pFileState != Recovering )
2106  return XRootDStatus( stError, errInvalidOp );
2107 
2108  Log *log = DefaultEnv::GetLog();
2109  log->Debug( FileMsg, "[%p@%s] Sending a fattr get command for handle %#x to %s",
2110  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2111  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2112 
2113  //--------------------------------------------------------------------------
2114  // Issue a new fattr get request
2115  //--------------------------------------------------------------------------
2116  return XAttrOperationImpl( self, kXR_fattrGet, 0, attrs, handler, timeout );
2117  }
2118 
2119  //------------------------------------------------------------------------
2120  // Delete extended attributes - async
2121  //------------------------------------------------------------------------
2122  XRootDStatus FileStateHandler::DelXAttr( std::shared_ptr<FileStateHandler> &self,
2123  const std::vector<std::string> &attrs,
2124  ResponseHandler *handler,
2125  time_t timeout )
2126  {
2127  XrdSysMutexHelper scopedLock( self->pMutex );
2128 
2129  if( self->pFileState == Error ) return self->pStatus;
2130 
2131  if( self->pFileState != Opened && self->pFileState != Recovering )
2132  return XRootDStatus( stError, errInvalidOp );
2133 
2134  Log *log = DefaultEnv::GetLog();
2135  log->Debug( FileMsg, "[%p@%s] Sending a fattr del command for handle %#x to %s",
2136  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2137  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2138 
2139  //--------------------------------------------------------------------------
2140  // Issue a new fattr del request
2141  //--------------------------------------------------------------------------
2142  return XAttrOperationImpl( self, kXR_fattrDel, 0, attrs, handler, timeout );
2143  }
2144 
2145  //------------------------------------------------------------------------
2146  // List extended attributes - async
2147  //------------------------------------------------------------------------
2148  XRootDStatus FileStateHandler::ListXAttr( std::shared_ptr<FileStateHandler> &self,
2149  ResponseHandler *handler,
2150  time_t timeout )
2151  {
2152  XrdSysMutexHelper scopedLock( self->pMutex );
2153 
2154  if( self->pFileState == Error ) return self->pStatus;
2155 
2156  if( self->pFileState != Opened && self->pFileState != Recovering )
2157  return XRootDStatus( stError, errInvalidOp );
2158 
2159  Log *log = DefaultEnv::GetLog();
2160  log->Debug( FileMsg, "[%p@%s] Sending a fattr list command for handle %#x to %s",
2161  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2162  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2163 
2164  //--------------------------------------------------------------------------
2165  // Issue a new fattr get request
2166  //--------------------------------------------------------------------------
2167  static const std::vector<std::string> nothing;
2168  return XAttrOperationImpl( self, kXR_fattrList, ClientFattrRequest::aData,
2169  nothing, handler, timeout );
2170  }
2171 
2172  //------------------------------------------------------------------------
2182  //------------------------------------------------------------------------
2183  XRootDStatus FileStateHandler::Checkpoint( std::shared_ptr<FileStateHandler> &self,
2184  kXR_char code,
2185  ResponseHandler *handler,
2186  time_t timeout )
2187  {
2188  XrdSysMutexHelper scopedLock( self->pMutex );
2189 
2190  if( self->pFileState == Error ) return self->pStatus;
2191 
2192  if( self->pFileState != Opened && self->pFileState != Recovering )
2193  return XRootDStatus( stError, errInvalidOp );
2194 
2195  Log *log = DefaultEnv::GetLog();
2196  log->Debug( FileMsg, "[%p@%s] Sending a checkpoint command for handle %#x to %s",
2197  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2198  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2199 
2200  Message *msg;
2201  ClientChkPointRequest *req;
2202  MessageUtils::CreateRequest( msg, req );
2203 
2204  req->requestid = kXR_chkpoint;
2205  req->opcode = code;
2206  memcpy( req->fhandle, self->pFileHandle, 4 );
2207 
2208  MessageSendParams params;
2209  params.timeout = timeout;
2210  params.followRedirects = false;
2211  params.stateful = true;
2212 
2214 
2216  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2217 
2218  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2219  }
2220 
2221  //------------------------------------------------------------------------
2231  //------------------------------------------------------------------------
2232  XRootDStatus FileStateHandler::ChkptWrt( std::shared_ptr<FileStateHandler> &self,
2233  uint64_t offset,
2234  uint32_t size,
2235  const void *buffer,
2236  ResponseHandler *handler,
2237  time_t timeout )
2238  {
2239  XrdSysMutexHelper scopedLock( self->pMutex );
2240 
2241  if( self->pFileState == Error ) return self->pStatus;
2242 
2243  if( self->pFileState != Opened && self->pFileState != Recovering )
2244  return XRootDStatus( stError, errInvalidOp );
2245 
2246  Log *log = DefaultEnv::GetLog();
2247  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2248  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2249  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2250 
2251  Message *msg;
2252  ClientChkPointRequest *req;
2253  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2254 
2255  req->requestid = kXR_chkpoint;
2256  req->opcode = kXR_ckpXeq;
2257  req->dlen = 24; // as specified in the protocol specification
2258  memcpy( req->fhandle, self->pFileHandle, 4 );
2259 
2261  wrtreq->requestid = kXR_write;
2262  wrtreq->offset = offset;
2263  wrtreq->dlen = size;
2264  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2265 
2266  ChunkList *list = new ChunkList();
2267  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
2268 
2269  MessageSendParams params;
2270  params.timeout = timeout;
2271  params.followRedirects = false;
2272  params.stateful = true;
2273  params.chunkList = list;
2274 
2276 
2278  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2279 
2280  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2281  }
2282 
2283  //------------------------------------------------------------------------
2293  //------------------------------------------------------------------------
2294  XRootDStatus FileStateHandler::ChkptWrtV( std::shared_ptr<FileStateHandler> &self,
2295  uint64_t offset,
2296  const struct iovec *iov,
2297  int iovcnt,
2298  ResponseHandler *handler,
2299  time_t timeout )
2300  {
2301  XrdSysMutexHelper scopedLock( self->pMutex );
2302 
2303  if( self->pFileState == Error ) return self->pStatus;
2304 
2305  if( self->pFileState != Opened && self->pFileState != Recovering )
2306  return XRootDStatus( stError, errInvalidOp );
2307 
2308  Log *log = DefaultEnv::GetLog();
2309  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
2310  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2311  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2312 
2313  Message *msg;
2314  ClientChkPointRequest *req;
2315  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2316 
2317  req->requestid = kXR_chkpoint;
2318  req->opcode = kXR_ckpXeq;
2319  req->dlen = 24; // as specified in the protocol specification
2320  memcpy( req->fhandle, self->pFileHandle, 4 );
2321 
2322  ChunkList *list = new ChunkList();
2323  uint32_t size = 0;
2324  for( int i = 0; i < iovcnt; ++i )
2325  {
2326  if( iov[i].iov_len == 0 ) continue;
2327  size += iov[i].iov_len;
2328  list->push_back( ChunkInfo( 0, iov[i].iov_len,
2329  (char*)iov[i].iov_base ) );
2330  }
2331 
2333  wrtreq->requestid = kXR_write;
2334  wrtreq->offset = offset;
2335  wrtreq->dlen = size;
2336  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2337 
2338  MessageSendParams params;
2339  params.timeout = timeout;
2340  params.followRedirects = false;
2341  params.stateful = true;
2342  params.chunkList = list;
2343 
2345 
2347  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2348 
2349  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2350  }
2351 
2352  //----------------------------------------------------------------------------
2353  // Check if the file is open
2354  //----------------------------------------------------------------------------
2356  {
2357  XrdSysMutexHelper scopedLock( pMutex );
2358 
2359  if( pFileState == Opened || pFileState == Recovering )
2360  return true;
2361  return false;
2362  }
2363 
2364  //----------------------------------------------------------------------------
2365  // Set file property
2366  //----------------------------------------------------------------------------
2367  bool FileStateHandler::SetProperty( const std::string &name,
2368  const std::string &value )
2369  {
2370  XrdSysMutexHelper scopedLock( pMutex );
2371  if( name == "ReadRecovery" )
2372  {
2373  if( value == "true" ) pDoRecoverRead = true;
2374  else pDoRecoverRead = false;
2375  return true;
2376  }
2377  else if( name == "WriteRecovery" )
2378  {
2379  if( value == "true" ) pDoRecoverWrite = true;
2380  else pDoRecoverWrite = false;
2381  return true;
2382  }
2383  else if( name == "FollowRedirects" )
2384  {
2385  if( value == "true" ) pFollowRedirects = true;
2386  else pFollowRedirects = false;
2387  return true;
2388  }
2389  else if( name == "BundledClose" )
2390  {
2391  if( value == "true" ) pAllowBundledClose = true;
2392  else pAllowBundledClose = false;
2393  return true;
2394  }
2395  return false;
2396  }
2397 
2398  //----------------------------------------------------------------------------
2399  // Get file property
2400  //----------------------------------------------------------------------------
2401  bool FileStateHandler::GetProperty( const std::string &name,
2402  std::string &value ) const
2403  {
2404  XrdSysMutexHelper scopedLock( pMutex );
2405  if( name == "ReadRecovery" )
2406  {
2407  if( pDoRecoverRead ) value = "true";
2408  else value = "false";
2409  return true;
2410  }
2411  else if( name == "WriteRecovery" )
2412  {
2413  if( pDoRecoverWrite ) value = "true";
2414  else value = "false";
2415  return true;
2416  }
2417  else if( name == "FollowRedirects" )
2418  {
2419  if( pFollowRedirects ) value = "true";
2420  else value = "false";
2421  return true;
2422  }
2423  else if( name == "DataServer" && pDataServer )
2424  { value = pDataServer->GetHostId(); return true; }
2425  else if( name == "LastURL" && pDataServer )
2426  { value = pDataServer->GetURL(); return true; }
2427  else if( name == "WrtRecoveryRedir" && pWrtRecoveryRedir )
2428  { value = pWrtRecoveryRedir->GetHostId(); return true; }
2429  value = "";
2430  return false;
2431  }
2432 
2433  //----------------------------------------------------------------------------
2434  // Process the results of the opening operation
2435  //----------------------------------------------------------------------------
2437  const OpenInfo *openInfo,
2438  const HostList *hostList )
2439  {
2440  Log *log = DefaultEnv::GetLog();
2441  XrdSysMutexHelper scopedLock( pMutex );
2442 
2443  //--------------------------------------------------------------------------
2444  // Assign the data server and the load balancer
2445  //--------------------------------------------------------------------------
2446  std::string lastServer = pFileUrl->GetHostId();
2447  if( hostList )
2448  {
2449  delete pDataServer;
2450  delete pLoadBalancer;
2451  pLoadBalancer = 0;
2452  delete pWrtRecoveryRedir;
2453  pWrtRecoveryRedir = 0;
2454 
2455  pDataServer = new URL( hostList->back().url );
2456  pDataServer->SetParams( pFileUrl->GetParams() );
2457  if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2458  lastServer = pDataServer->GetHostId();
2459  HostList::const_iterator itC;
2460  URL::ParamsMap params = pDataServer->GetParams();
2461  for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2462  {
2463  MessageUtils::MergeCGI( params,
2464  itC->url.GetParams(),
2465  true );
2466  }
2467  pDataServer->SetParams( params );
2468 
2469  HostList::const_reverse_iterator it;
2470  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2471  if( it->loadBalancer )
2472  {
2473  pLoadBalancer = new URL( it->url );
2474  break;
2475  }
2476 
2477  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2478  if( it->flags & kXR_recoverWrts )
2479  {
2480  pWrtRecoveryRedir = new URL( it->url );
2481  break;
2482  }
2483  }
2484 
2485  log->Debug(FileMsg, "[%p@%s] Open has returned with status %s",
2486  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), status->ToStr().c_str() );
2487 
2488  if( pDataServer && !pDataServer->IsLocalFile() )
2489  {
2490  //------------------------------------------------------------------------
2491  // Check if we are using a secure connection
2492  //------------------------------------------------------------------------
2493  XrdCl::AnyObject isencobj;
2495  QueryTransport( *pDataServer, XRootDQuery::IsEncrypted, isencobj );
2496  if( st.IsOK() )
2497  {
2498  bool *isenc;
2499  isencobj.Get( isenc );
2500  pIsChannelEncrypted = isenc ? *isenc : false;
2501  delete isenc;
2502  }
2503  }
2504 
2505  //--------------------------------------------------------------------------
2506  // We have failed
2507  //--------------------------------------------------------------------------
2508  pStatus = *status;
2509  if( !pStatus.IsOK() || !openInfo )
2510  {
2511  log->Debug(FileMsg, "[%p@%s] Error while opening at %s: %s",
2512  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2513  pStatus.ToStr().c_str() );
2514  FailQueuedMessages( pStatus );
2515  pFileState = Error;
2516 
2517  //------------------------------------------------------------------------
2518  // Report to monitoring
2519  //------------------------------------------------------------------------
2521  if( mon )
2522  {
2524  i.file = pFileUrl;
2525  i.status = status;
2527  mon->Event( Monitor::EvErrIO, &i );
2528  }
2529  }
2530  //--------------------------------------------------------------------------
2531  // We have succeeded
2532  //--------------------------------------------------------------------------
2533  else
2534  {
2535  //------------------------------------------------------------------------
2536  // if requested file colocation or dup was done, don't do again on reopen
2537  //------------------------------------------------------------------------
2538  pOpenFlags &= ~(OpenFlags::Dup | OpenFlags::Samefs);
2539 
2540  //------------------------------------------------------------------------
2541  // Store the response info
2542  //------------------------------------------------------------------------
2543  openInfo->GetFileHandle( pFileHandle );
2544  pSessionId = openInfo->GetSessionId();
2545  if( openInfo->GetStatInfo() )
2546  {
2547  delete pStatInfo;
2548  pStatInfo = new StatInfo( *openInfo->GetStatInfo() );
2549  }
2550 
2551  log->Debug( FileMsg, "[%p@%s] successfully opened at %s, handle: %#x, "
2552  "session id: %llu", (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
2553  pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2554  (unsigned long long) pSessionId );
2555 
2556  //------------------------------------------------------------------------
2557  // Inform the monitoring about opening success
2558  //------------------------------------------------------------------------
2559  gettimeofday( &pOpenTime, 0 );
2561  if( mon )
2562  {
2564  i.file = pFileUrl;
2565  i.dataServer = pDataServer->GetHostId();
2566  i.oFlags = pOpenFlags;
2567  i.oFlags2 = pOpenFlags>>16;
2568  i.fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2569  mon->Event( Monitor::EvOpen, &i );
2570  }
2571 
2572  //------------------------------------------------------------------------
2573  // Resend the queued messages if any
2574  //------------------------------------------------------------------------
2575  ReSendQueuedMessages();
2576  pFileState = Opened;
2577  }
2578  }
2579 
2580  //----------------------------------------------------------------------------
2581  // Process the results of the closing operation
2582  //----------------------------------------------------------------------------
2584  {
2585  Log *log = DefaultEnv::GetLog();
2586  XrdSysMutexHelper scopedLock( pMutex );
2587 
2588  log->Debug(FileMsg, "[%p@%s] Close returned from %s with: %s", (void*)this,
2589  pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2590  status->ToStr().c_str() );
2591 
2592  log->Dump(FileMsg, "[%p@%s] Items in the fly %zu, queued for recovery %zu",
2593  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(), pToBeRecovered.size() );
2594 
2595  MonitorClose( status );
2596  ResetMonitoringVars();
2597 
2598  pStatus = *status;
2599  pFileState = Closed;
2600  }
2601 
2602  //----------------------------------------------------------------------------
2603  // Handle an error while sending a stateful message
2604  //----------------------------------------------------------------------------
2605  void FileStateHandler::OnStateError( std::shared_ptr<FileStateHandler> &self,
2606  XRootDStatus *status,
2607  Message *message,
2608  ResponseHandler *userHandler,
2609  MessageSendParams &sendParams )
2610  {
2611  //--------------------------------------------------------------------------
2612  // It may be a redirection
2613  //--------------------------------------------------------------------------
2614  if( !status->IsOK() && status->code == errRedirect && self->pFollowRedirects )
2615  {
2616  static const std::string root = "root", xroot = "xroot", file = "file",
2617  roots = "roots", xroots = "xroots";
2618  std::string msg = status->GetErrorMessage();
2619  if( !msg.compare( 0, root.size(), root ) ||
2620  !msg.compare( 0, xroot.size(), xroot ) ||
2621  !msg.compare( 0, file.size(), file ) ||
2622  !msg.compare( 0, roots.size(), roots ) ||
2623  !msg.compare( 0, xroots.size(), xroots ) )
2624  {
2625  FileStateHandler::OnStateRedirection( self, msg, message, userHandler, sendParams );
2626  return;
2627  }
2628  }
2629 
2630  //--------------------------------------------------------------------------
2631  // Handle error
2632  //--------------------------------------------------------------------------
2633  Log *log = DefaultEnv::GetLog();
2634  XrdSysMutexHelper scopedLock( self->pMutex );
2635  self->pInTheFly.erase( message );
2636 
2637  log->Dump( FileMsg, "[%p@%s] File state error encountered. Message %s "
2638  "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2639  message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2640 
2641  //--------------------------------------------------------------------------
2642  // Report to monitoring
2643  //--------------------------------------------------------------------------
2645  if( mon )
2646  {
2648  i.file = self->pFileUrl;
2649  i.status = status;
2650 
2651  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2652  switch( req->header.requestid )
2653  {
2654  case kXR_read: i.opCode = Monitor::ErrorInfo::ErrRead; break;
2660  default: i.opCode = Monitor::ErrorInfo::ErrUnc;
2661  }
2662 
2663  mon->Event( Monitor::EvErrIO, &i );
2664  }
2665 
2666  //--------------------------------------------------------------------------
2667  // The message is not recoverable
2668  // (message using a kernel buffer is not recoverable by definition)
2669  //--------------------------------------------------------------------------
2670  if( !self->IsRecoverable( *status ) || sendParams.kbuff )
2671  {
2672  log->Error( FileMsg, "[%p@%s] Fatal file state error. Message %s "
2673  "returned with %s", (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2674  message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2675 
2676  self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2677  delete status;
2678  return;
2679  }
2680 
2681  //--------------------------------------------------------------------------
2682  // Insert the message to the recovery queue and start the recovery
2683  // procedure if we don't have any more message in the fly
2684  //--------------------------------------------------------------------------
2685  self->pCloseReason = *status;
2686  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2687  delete status;
2688  }
2689 
2690  //----------------------------------------------------------------------------
2691  // Handle stateful redirect
2692  //----------------------------------------------------------------------------
2693  void FileStateHandler::OnStateRedirection( std::shared_ptr<FileStateHandler> &self,
2694  const std::string &redirectUrl,
2695  Message *message,
2696  ResponseHandler *userHandler,
2697  MessageSendParams &sendParams )
2698  {
2699  XrdSysMutexHelper scopedLock( self->pMutex );
2700  self->pInTheFly.erase( message );
2701 
2702  //--------------------------------------------------------------------------
2703  // Register the state redirect url and append the new cgi information to
2704  // the file URL
2705  //--------------------------------------------------------------------------
2706  if( !self->pStateRedirect )
2707  {
2708  std::ostringstream o;
2709  self->pStateRedirect = new URL( redirectUrl );
2710  URL::ParamsMap params = self->pFileUrl->GetParams();
2711  MessageUtils::MergeCGI( params,
2712  self->pStateRedirect->GetParams(),
2713  false );
2714  self->pFileUrl->SetParams( params );
2715  }
2716 
2717  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2718  }
2719 
2720  //----------------------------------------------------------------------------
2721  // Handle stateful response
2722  //----------------------------------------------------------------------------
2723  void FileStateHandler::OnStateResponse( std::shared_ptr<FileStateHandler> &self,
2724  XRootDStatus *status,
2725  Message *message,
2726  AnyObject *response,
2727  HostList */*urlList*/ )
2728  {
2729  Log *log = DefaultEnv::GetLog();
2730  XrdSysMutexHelper scopedLock( self->pMutex );
2731 
2732  log->Dump( FileMsg, "[%p@%s] Got state response for message %s",
2733  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2734  message->GetObfuscatedDescription().c_str() );
2735 
2736  //--------------------------------------------------------------------------
2737  // Since this message may be the last "in-the-fly" and no recovery
2738  // is done if messages are in the fly, we may need to trigger recovery
2739  //--------------------------------------------------------------------------
2740  self->pInTheFly.erase( message );
2741  RunRecovery( self );
2742 
2743  //--------------------------------------------------------------------------
2744  // Play with the actual response before returning it. This is a good
2745  // place to do caching in the future.
2746  //--------------------------------------------------------------------------
2747  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2748  switch( req->header.requestid )
2749  {
2750  //------------------------------------------------------------------------
2751  // Cache the stat response
2752  //------------------------------------------------------------------------
2753  case kXR_stat:
2754  {
2755  StatInfo *info = 0;
2756  response->Get( info );
2757  delete self->pStatInfo;
2758  self->pStatInfo = new StatInfo( *info );
2759  break;
2760  }
2761 
2762  //------------------------------------------------------------------------
2763  // Handle read response
2764  //------------------------------------------------------------------------
2765  case kXR_read:
2766  {
2767  ++self->pRCount;
2768  self->pRBytes += req->read.rlen;
2769  break;
2770  }
2771 
2772  //------------------------------------------------------------------------
2773  // Handle read response
2774  //------------------------------------------------------------------------
2775  case kXR_pgread:
2776  {
2777  ++self->pRCount;
2778  self->pRBytes += req->pgread.rlen;
2779  break;
2780  }
2781 
2782  //------------------------------------------------------------------------
2783  // Handle readv response
2784  //------------------------------------------------------------------------
2785  case kXR_readv:
2786  {
2787  ++self->pVRCount;
2788  size_t segs = req->header.dlen/sizeof(readahead_list);
2789  readahead_list *dataChunk = (readahead_list*)message->GetBuffer( 24 );
2790  for( size_t i = 0; i < segs; ++i )
2791  self->pVRBytes += dataChunk[i].rlen;
2792  self->pVSegs += segs;
2793  break;
2794  }
2795 
2796  //------------------------------------------------------------------------
2797  // Handle write response
2798  //------------------------------------------------------------------------
2799  case kXR_write:
2800  {
2801  ++self->pWCount;
2802  self->pWBytes += req->write.dlen;
2803  break;
2804  }
2805 
2806  //------------------------------------------------------------------------
2807  // Handle write response
2808  //------------------------------------------------------------------------
2809  case kXR_pgwrite:
2810  {
2811  ++self->pWCount;
2812  self->pWBytes += req->pgwrite.dlen;
2813  break;
2814  }
2815 
2816  //------------------------------------------------------------------------
2817  // Handle writev response
2818  //------------------------------------------------------------------------
2819  case kXR_writev:
2820  {
2821  ++self->pVWCount;
2822  size_t size = req->header.dlen/sizeof(readahead_list);
2823  XrdProto::write_list *wrtList =
2824  reinterpret_cast<XrdProto::write_list*>( message->GetBuffer( 24 ) );
2825  for( size_t i = 0; i < size; ++i )
2826  self->pVWBytes += wrtList[i].wlen;
2827  break;
2828  }
2829  };
2830  }
2831 
2832  //------------------------------------------------------------------------
2834  //------------------------------------------------------------------------
2835  void FileStateHandler::Tick( time_t now )
2836  {
2837  if (pMutex.CondLock())
2838  {TimeOutRequests( now );
2839  pMutex.UnLock();
2840  }
2841  }
2842 
2843  //----------------------------------------------------------------------------
2844  // Declare timeout on requests being recovered
2845  //----------------------------------------------------------------------------
2847  {
2848  if( !pToBeRecovered.empty() )
2849  {
2850  Log *log = DefaultEnv::GetLog();
2851  log->Dump( FileMsg, "[%p@%s] Got a timer event", (void*)this,
2852  pFileUrl->GetObfuscatedURL().c_str() );
2853  RequestList::iterator it;
2855  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2856  {
2857  if( it->params.expires <= now )
2858  {
2859  jobMan->QueueJob( new ResponseJob(
2860  it->handler,
2862  0, it->params.hostList ) );
2863  it = pToBeRecovered.erase( it );
2864  }
2865  else
2866  ++it;
2867  }
2868  }
2869  }
2870 
2871  //----------------------------------------------------------------------------
2872  // Called in the child process after the fork
2873  //----------------------------------------------------------------------------
2875  {
2876  Log *log = DefaultEnv::GetLog();
2877 
2878  if( pFileState == Closed || pFileState == Error )
2879  return;
2880 
2881  if( (IsReadOnly() && pDoRecoverRead) ||
2882  (!IsReadOnly() && pDoRecoverWrite) )
2883  {
2884  log->Debug( FileMsg, "[%p@%s] Putting the file in recovery state in "
2885  "process %d", (void*)this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2886  pFileState = Recovering;
2887  pInTheFly.clear();
2888  pToBeRecovered.clear();
2889  }
2890  else
2891  pFileState = Error;
2892  }
2893 
2894  //------------------------------------------------------------------------
2895  // Try other data server
2896  //------------------------------------------------------------------------
2897  XRootDStatus FileStateHandler::TryOtherServer( std::shared_ptr<FileStateHandler> &self, time_t timeout )
2898  {
2899  XrdSysMutexHelper scopedLock( self->pMutex );
2900 
2901  if( self->pFileState != Opened || !self->pLoadBalancer )
2902  return XRootDStatus( stError, errInvalidOp );
2903 
2904  self->pFileState = Recovering;
2905 
2906  Log *log = DefaultEnv::GetLog();
2907  log->Debug( FileMsg, "[%p@%s] Reopen file at next data server.",
2908  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2909 
2910  // merge CGI
2911  auto lbcgi = self->pLoadBalancer->GetParams();
2912  auto dtcgi = self->pDataServer->GetParams();
2913  MessageUtils::MergeCGI( lbcgi, dtcgi, false );
2914  // update tried CGI
2915  auto itr = lbcgi.find( "tried" );
2916  if( itr == lbcgi.end() )
2917  lbcgi["tried"] = self->pDataServer->GetHostName();
2918  else
2919  {
2920  std::string tried = itr->second;
2921  tried += "," + self->pDataServer->GetHostName();
2922  lbcgi["tried"] = tried;
2923  }
2924  self->pLoadBalancer->SetParams( lbcgi );
2925 
2926  return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2927  }
2928 
2929  //------------------------------------------------------------------------
2930  // Generic implementation of xattr operation
2931  //------------------------------------------------------------------------
2932  template<typename T>
2933  Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2934  kXR_char subcode,
2935  kXR_char options,
2936  const std::vector<T> &attrs,
2937  ResponseHandler *handler,
2938  time_t timeout )
2939  {
2940  //--------------------------------------------------------------------------
2941  // Issue a new fattr request
2942  //--------------------------------------------------------------------------
2943  Message *msg;
2944  ClientFattrRequest *req;
2945  MessageUtils::CreateRequest( msg, req );
2946 
2947  req->requestid = kXR_fattr;
2948  req->subcode = subcode;
2949  req->numattr = attrs.size();
2950  req->options = options;
2951  memcpy( req->fhandle, self->pFileHandle, 4 );
2952  XRootDStatus st = MessageUtils::CreateXAttrBody( msg, attrs );
2953  if( !st.IsOK() ) return st;
2954 
2955  MessageSendParams params;
2956  params.timeout = timeout;
2957  params.followRedirects = false;
2958  params.stateful = true;
2960 
2962  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2963 
2964  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2965  }
2966 
2967  //----------------------------------------------------------------------------
2968  // Send a message to a host or put it in the recovery queue
2969  //----------------------------------------------------------------------------
2970  Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2971  const URL &url,
2972  Message *msg,
2973  ResponseHandler *handler,
2974  MessageSendParams &sendParams )
2975  {
2976  //--------------------------------------------------------------------------
2977  // Recovering
2978  //--------------------------------------------------------------------------
2979  if( self->pFileState == Recovering )
2980  {
2981  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2982  }
2983 
2984  //--------------------------------------------------------------------------
2985  // Trying to send
2986  //--------------------------------------------------------------------------
2987  if( self->pFileState == Opened )
2988  {
2989  msg->SetSessionId( self->pSessionId );
2990  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2991 
2992  //------------------------------------------------------------------------
2993  // Invalid session id means that the connection has been broken while we
2994  // were idle so we haven't been informed about this fact earlier.
2995  //------------------------------------------------------------------------
2996  if( !st.IsOK() && st.code == errInvalidSession && self->IsRecoverable( st ) )
2997  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2998 
2999  if( st.IsOK() )
3000  self->pInTheFly.insert(msg);
3001  else
3002  delete handler;
3003  return st;
3004  }
3005  return Status( stError, errInvalidOp );
3006  }
3007 
3008  //----------------------------------------------------------------------------
3009  // Check if the stateful error is recoverable
3010  //----------------------------------------------------------------------------
3011  bool FileStateHandler::IsRecoverable( const XRootDStatus &status ) const
3012  {
3013  const auto recoverable_errors = {
3017  errInternal,
3018  errTlsError,
3020  };
3021 
3022  if (pDoRecoverRead || pDoRecoverWrite)
3023  for (const auto error : recoverable_errors)
3024  if (status.code == error)
3025  return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
3026 
3027  return false;
3028  }
3029 
3030  //----------------------------------------------------------------------------
3031  // Check if the file is open for read only
3032  //----------------------------------------------------------------------------
3033  bool FileStateHandler::IsReadOnly() const
3034  {
3035  // Keeping the check for append (with a cast) as this was previously tested,
3036  // but OpenFlags::Flags does not currently enumerate the Append flag
3037  if( (pOpenFlags & OpenFlags::Read) && !(pOpenFlags & OpenFlags::Update) &&
3038  !(pOpenFlags & static_cast<OpenFlags::Flags>(kXR_open_apnd)) )
3039  return true;
3040  return false;
3041  }
3042 
3043  //----------------------------------------------------------------------------
3044  // Recover a message
3045  //----------------------------------------------------------------------------
3046  Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
3047  RequestData rd,
3048  bool callbackOnFailure )
3049  {
3050  self->pFileState = Recovering;
3051 
3052  Log *log = DefaultEnv::GetLog();
3053  log->Dump( FileMsg, "[%p@%s] Putting message %s in the recovery list",
3054  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3055  rd.request->GetObfuscatedDescription().c_str() );
3056 
3057  Status st = RunRecovery( self );
3058  if( st.IsOK() )
3059  {
3060  self->pToBeRecovered.push_back( rd );
3061  return st;
3062  }
3063 
3064  if( callbackOnFailure )
3065  self->FailMessage( rd, st );
3066 
3067  return st;
3068  }
3069 
3070  //----------------------------------------------------------------------------
3071  // Run the recovery procedure if appropriate
3072  //----------------------------------------------------------------------------
3073  Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
3074  {
3075  if( self->pFileState != Recovering )
3076  return Status();
3077 
3078  if( !self->pInTheFly.empty() )
3079  return Status();
3080 
3081  Log *log = DefaultEnv::GetLog();
3082  log->Debug( FileMsg, "[%p@%s] Running the recovery procedure", (void*)self.get(),
3083  self->pFileUrl->GetObfuscatedURL().c_str() );
3084 
3085  Status st;
3086  if( self->pStateRedirect )
3087  {
3088  SendClose( self, 0 );
3089  st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
3090  delete self->pStateRedirect; self->pStateRedirect = 0;
3091  }
3092  else if( self->IsReadOnly() && self->pLoadBalancer )
3093  st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
3094  else
3095  st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
3096 
3097  if( !st.IsOK() )
3098  {
3099  self->pFileState = Error;
3100  self->pStatus = st;
3101  self->FailQueuedMessages( st );
3102  }
3103 
3104  return st;
3105  }
3106 
3107  //----------------------------------------------------------------------------
3108  // Send a close and ignore the response
3109  //----------------------------------------------------------------------------
3110  XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
3111  time_t timeout )
3112  {
3113  Message *msg;
3114  ClientCloseRequest *req;
3115  MessageUtils::CreateRequest( msg, req );
3116 
3117  req->requestid = kXR_close;
3118  memcpy( req->fhandle, self->pFileHandle, 4 );
3119 
3121  msg->SetSessionId( self->pSessionId );
3123  [self]( XRootDStatus&, AnyObject& ) mutable { self.reset(); } );
3124  MessageSendParams params;
3125  params.timeout = timeout;
3126  params.followRedirects = false;
3127  params.stateful = true;
3128 
3130 
3131  return self->IssueRequest( *self->pDataServer, msg, handler, params );
3132  }
3133 
3134  //----------------------------------------------------------------------------
3135  // Re-open the current file at a given server
3136  //----------------------------------------------------------------------------
3137  XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3138  const URL &url,
3139  time_t timeout )
3140  {
3141  Log *log = DefaultEnv::GetLog();
3142  log->Dump( FileMsg, "[%p@%s] Sending a recovery open command to %s",
3143  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.GetObfuscatedURL().c_str() );
3144 
3145  //--------------------------------------------------------------------------
3146  // Remove the kXR_delete and kXR_new flags, as we don't want the recovery
3147  // procedure to delete a file that has been partially updated or fail it
3148  // because a partially uploaded file already exists.
3149  //--------------------------------------------------------------------------
3150  if( self->pOpenFlags & OpenFlags::Delete)
3151  {
3152  self->pOpenFlags &= ~OpenFlags::Delete;
3153  self->pOpenFlags |= OpenFlags::Update;
3154  }
3155 
3156  self->pOpenFlags &= ~OpenFlags::New;
3157 
3158  Message *msg;
3159  ClientOpenRequest *req;
3160  URL u = url;
3161 
3162  if( url.GetPath().empty() )
3163  u.SetPath( self->pFileUrl->GetPath() );
3164 
3165  std::string path = u.GetPathWithFilteredParams();
3166  MessageUtils::CreateRequest( msg, req, path.length() );
3167 
3168  req->requestid = kXR_open;
3169  req->mode = self->pOpenMode;
3170  req->options = (self->pOpenFlags & 0xffff);
3171  req->dlen = path.length();
3172  URL sendUrl;
3173  XRootDStatus st = FillFhTempl( self, url, msg, sendUrl );
3174  if( !st.IsOK() )
3175  {
3176  self->pStatus = st;
3177  self->pFileState = Closed;
3178  return st;
3179  }
3180  msg->Append( path.c_str(), path.length(), 24 );
3181 
3182  // create a new reopen handler
3183  // (it is not assigned to 'pReOpenHandler' in order not to bump the reference counter
3184  // until we know that 'SendMessage' was successful)
3185  OpenHandler *openHandler = new OpenHandler( self, 0 );
3186  MessageSendParams params; params.timeout = timeout;
3189 
3190  //--------------------------------------------------------------------------
3191  // Issue the open request
3192  //--------------------------------------------------------------------------
3193  st = self->IssueRequest( sendUrl, msg, openHandler, params );
3194 
3195  // if there was a problem destroy the open handler
3196  if( !st.IsOK() )
3197  {
3198  delete openHandler;
3199  self->pStatus = st;
3200  self->pFileState = Closed;
3201  }
3202  return st;
3203  }
3204 
3205  //------------------------------------------------------------------------
3206  // Fail a message
3207  //------------------------------------------------------------------------
3208  void FileStateHandler::FailMessage( RequestData rd, XRootDStatus status )
3209  {
3210  Log *log = DefaultEnv::GetLog();
3211  log->Dump( FileMsg, "[%p@%s] Failing message %s with %s",
3212  (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3213  rd.request->GetObfuscatedDescription().c_str(),
3214  status.ToStr().c_str() );
3215 
3216  StatefulHandler *sh = dynamic_cast<StatefulHandler*>(rd.handler);
3217  if( !sh )
3218  {
3219  Log *log = DefaultEnv::GetLog();
3220  log->Error( FileMsg, "[%p@%s] Internal error while recovering %s",
3221  (void*)this, pFileUrl->GetObfuscatedURL().c_str(),
3222  rd.request->GetObfuscatedDescription().c_str() );
3223  return;
3224  }
3225 
3227  ResponseHandler *userHandler = sh->GetUserHandler();
3228  jobMan->QueueJob( new ResponseJob(
3229  userHandler,
3230  new XRootDStatus( status ),
3231  0, rd.params.hostList ) );
3232 
3233  delete sh;
3234  }
3235 
3236  //----------------------------------------------------------------------------
3237  // Fail queued messages
3238  //----------------------------------------------------------------------------
3239  void FileStateHandler::FailQueuedMessages( XRootDStatus status )
3240  {
3241  RequestList::iterator it;
3242  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3243  FailMessage( *it, status );
3244  pToBeRecovered.clear();
3245  }
3246 
3247  //------------------------------------------------------------------------
3248  // Re-send queued messages
3249  //------------------------------------------------------------------------
3250  void FileStateHandler::ReSendQueuedMessages()
3251  {
3252  RequestList::iterator it;
3253  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3254  {
3255  it->request->SetSessionId( pSessionId );
3256  ReWriteFileHandle( it->request );
3257  XRootDStatus st = IssueRequest( *pDataServer, it->request,
3258  it->handler, it->params );
3259  if( !st.IsOK() )
3260  FailMessage( *it, st );
3261  }
3262  pToBeRecovered.clear();
3263  }
3264 
3265  //------------------------------------------------------------------------
3266  // Re-write file handle
3267  //------------------------------------------------------------------------
3268  void FileStateHandler::ReWriteFileHandle( Message *msg )
3269  {
3271  switch( hdr->requestid )
3272  {
3273  case kXR_read:
3274  {
3276  memcpy( req->fhandle, pFileHandle, 4 );
3277  break;
3278  }
3279  case kXR_write:
3280  {
3282  memcpy( req->fhandle, pFileHandle, 4 );
3283  break;
3284  }
3285  case kXR_sync:
3286  {
3288  memcpy( req->fhandle, pFileHandle, 4 );
3289  break;
3290  }
3291  case kXR_truncate:
3292  {
3294  memcpy( req->fhandle, pFileHandle, 4 );
3295  break;
3296  }
3297  case kXR_readv:
3298  {
3300  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
3301  for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3302  memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3303  break;
3304  }
3305  case kXR_writev:
3306  {
3307  ClientWriteVRequest *req =
3308  reinterpret_cast<ClientWriteVRequest*>( msg->GetBuffer() );
3309  XrdProto::write_list *wrtList =
3310  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
3311  size_t size = req->dlen / sizeof(XrdProto::write_list);
3312  for( size_t i = 0; i < size; ++i )
3313  memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3314  break;
3315  }
3316  case kXR_pgread:
3317  {
3319  memcpy( req->fhandle, pFileHandle, 4 );
3320  break;
3321  }
3322  case kXR_pgwrite:
3323  {
3325  memcpy( req->fhandle, pFileHandle, 4 );
3326  break;
3327  }
3328  }
3329 
3330  Log *log = DefaultEnv::GetLog();
3331  log->Dump( FileMsg, "[%p@%s] Rewritten file handle for %s to %#x",
3332  (void*)this, pFileUrl->GetObfuscatedURL().c_str(), msg->GetObfuscatedDescription().c_str(),
3333  *((uint32_t*)pFileHandle) );
3335  }
3336 
3337  //----------------------------------------------------------------------------
3338  // Dispatch monitoring information on close
3339  //----------------------------------------------------------------------------
3340  void FileStateHandler::MonitorClose( const XRootDStatus *status )
3341  {
3343  if( mon )
3344  {
3346  i.file = pFileUrl;
3347  i.oTOD = pOpenTime;
3348  gettimeofday( &i.cTOD, 0 );
3349  i.rBytes = pRBytes;
3350  i.vrBytes = pVRBytes;
3351  i.wBytes = pWBytes;
3352  i.vwBytes = pVWBytes;
3353  i.vSegs = pVSegs;
3354  i.rCount = pRCount;
3355  i.vCount = pVRCount;
3356  i.wCount = pWCount;
3357  i.status = status;
3358  mon->Event( Monitor::EvClose, &i );
3359  }
3360  }
3361 
3362  XRootDStatus FileStateHandler::IssueRequest( const URL &url,
3363  Message *msg,
3364  ResponseHandler *handler,
3365  MessageSendParams &sendParams )
3366  {
3367  // first handle Metalinks
3368  if( pUseVirtRedirector && url.IsMetalink() )
3369  return MessageUtils::RedirectMessage( url, msg, handler,
3370  sendParams, pLFileHandler );
3371 
3372  // than local file access
3373  if( url.IsLocalFile() )
3374  return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3375 
3376  // and finally ordinary XRootD requests
3377  return MessageUtils::SendMessage( url, msg, handler,
3378  sendParams, pLFileHandler );
3379  }
3380 
3381  //------------------------------------------------------------------------
3382  // Send a write request with payload being stored in a kernel buffer
3383  //------------------------------------------------------------------------
3384  XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3385  uint64_t offset,
3386  uint32_t length,
3387  std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3388  ResponseHandler *handler,
3389  time_t timeout )
3390  {
3391  //--------------------------------------------------------------------------
3392  // Create the write request
3393  //--------------------------------------------------------------------------
3394  XrdSysMutexHelper scopedLock( self->pMutex );
3395 
3396  if( self->pFileState != Opened && self->pFileState != Recovering )
3397  return XRootDStatus( stError, errInvalidOp );
3398 
3399  Log *log = DefaultEnv::GetLog();
3400  log->Debug( FileMsg, "[%p@%s] Sending a write command for handle %#x to %s",
3401  (void*)self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3402  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3403 
3404  Message *msg;
3405  ClientWriteRequest *req;
3406  MessageUtils::CreateRequest( msg, req );
3407 
3408  req->requestid = kXR_write;
3409  req->offset = offset;
3410  req->dlen = length;
3411  memcpy( req->fhandle, self->pFileHandle, 4 );
3412 
3413  MessageSendParams params;
3414  params.timeout = timeout;
3415  params.followRedirects = false;
3416  params.stateful = true;
3417  params.kbuff = kbuff.release();
3418  params.chunkList = new ChunkList();
3419 
3421 
3423  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3424 
3425  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3426  }
3427 
3428  //------------------------------------------------------------------------
3429  // Fills in the file template value and optiont fields that need the
3430  // template (i.e. samefs and dup) in an Open message request
3431  //------------------------------------------------------------------------
3432  XRootDStatus FileStateHandler::FillFhTempl(
3433  std::shared_ptr<FileStateHandler> &self,
3434  const URL &url, Message *msg, URL &sendUrl)
3435  {
3437  sendUrl = url;
3438 
3439  if( !self->NeedFileTempl() )
3440  {
3441  // template file not requireed
3442  return XRootDStatus();
3443  }
3444 
3445  using wp = std::weak_ptr<FileStateHandler>;
3446  if( !self->pTemplateFileWp.owner_before(wp{}) &&
3447  !wp{}.owner_before(self->pTemplateFileWp) )
3448  {
3449  // no tempalte file was set
3450  return XRootDStatus( stError, errInvalidArgs, 0,
3451  "File flags required a template file" );
3452  }
3453 
3454  // all the options that need template
3455  if( self->pOpenFlags & OpenFlags::Dup )
3456  req->optiont |= kXR_dup;
3457  if( self->pOpenFlags & OpenFlags::Samefs )
3458  req->optiont |= kXR_samefs;
3459 
3460  std::shared_ptr<FileStateHandler> tfp = self->pTemplateFileWp.lock();
3461  if(!tfp)
3462  return XRootDStatus( stError, errInvalidArgs, 0,
3463  "Template file object does not exist" );
3464 
3465  XrdSysMutexHelper scopedLock( tfp->pMutex );
3466 
3467  if( tfp->pFileState != Opened )
3468  return XRootDStatus( stError, errInvalidOp, 0,
3469  "Template file not open" );
3470 
3471  if (!tfp->pDataServer || !tfp->pFileHandle)
3472  return XRootDStatus( stError, errInvalidArgs, 0,
3473  "Template file not connected" );
3474 
3475  sendUrl.SetHostPort( tfp->pDataServer->GetHostName(),tfp->pDataServer->GetPort() );
3476  sendUrl.SetUserName( tfp->pDataServer->GetUserName() );
3477  msg->SetSessionId( tfp->pSessionId );
3478  memcpy( req->fhtemplt, tfp->pFileHandle, sizeof(req->fhtemplt) );
3479 
3480  if( !Utils::HasKSameFS( sendUrl ) )
3482 
3483  return XRootDStatus();
3484  }
3485 
3486  //------------------------------------------------------------------------
3487  // Clone file ranges into current file
3488  //------------------------------------------------------------------------
3489  XRootDStatus FileStateHandler::Clone(std::shared_ptr<FileStateHandler> &self,
3490  const CloneLocations &locs,
3491  ResponseHandler *handler,
3492  time_t timeout )
3493  {
3494  XrdSysMutexHelper scopedLock( self->pMutex );
3495 
3496  if( self->pFileState == Error ) return self->pStatus;
3497 
3498  if( self->pFileState != Opened && self->pFileState != Recovering )
3499  return XRootDStatus( stError, errInvalidOp );
3500 
3501  if( !Utils::HasKSameFS( *self->pDataServer ) )
3503 
3504  Log *log = DefaultEnv::GetLog();
3505  log->Debug( FileMsg, "[%p@%s] Sending a clone command for handle %#x to %s",
3506  self.get(), self->pFileUrl->GetURL().c_str(),
3507  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3508 
3509  Message *msg;
3510  ClientReadRequest *req;
3511 
3512  size_t nrange = locs.locations.size();
3513 
3514  MessageUtils::CreateRequest( msg, req, sizeof(XrdProto::clone_list)*nrange );
3515 
3516  req->requestid = kXR_clone;
3517  req->dlen = sizeof(XrdProto::clone_list)*nrange;
3518  memcpy( req->fhandle, self->pFileHandle, 4 );
3519 
3521  int idx=0;
3522  for(auto &loc: locs.locations)
3523  {
3524  if( !loc.file )
3525  return XRootDStatus( stError, errInvalidOp, 0,
3526  "Template file not available" );
3527 
3528  FileStateHandlerTemplate *fht = dynamic_cast<FileStateHandlerTemplate*>(loc.file.get());
3529  if( !fht )
3530  return XRootDStatus( stError, errInvalidOp, 0,
3531  "Template file invalid" );
3532 
3533  std::shared_ptr<FileStateHandler> tfp = fht->pTemplateFileWp.lock();
3534  if( !tfp )
3535  return XRootDStatus( stError, errInvalidOp, 0,
3536  "Template file object does not exist" );
3537 
3538  XrdSysMutexHelper scopedLock( tfp->pMutex );
3539  if( tfp->pFileState != Opened )
3540  return XRootDStatus( stError, errInvalidOp, 0,
3541  "Template file not open" );
3542 
3543  if( tfp->pSessionId != self->pSessionId )
3544  return XRootDStatus( stError, errInvalidOp, 0,
3545  "Clone source not at same location as destination" );
3546 
3547  memcpy( cl[idx].srcFH, tfp->pFileHandle, 4 );
3548  cl[idx].srcOffs = loc.srcOffs;
3549  cl[idx].srcLen = loc.srcLen;
3550  cl[idx].dstOffs = loc.dstOffs;
3551  ++idx;
3552  }
3553 
3555  MessageSendParams params;
3556  params.timeout = timeout;
3557  params.followRedirects = false;
3558  params.stateful = true;
3560  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3561 
3562  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3563  }
3564 }
kXR_unt16 requestid
Definition: XProtocol.hh:511
kXR_unt16 requestid
Definition: XProtocol.hh:666
kXR_unt16 requestid
Definition: XProtocol.hh:847
@ kXR_fattrDel
Definition: XProtocol.hh:300
@ kXR_fattrSet
Definition: XProtocol.hh:303
@ kXR_fattrList
Definition: XProtocol.hh:302
@ kXR_fattrGet
Definition: XProtocol.hh:301
#define kXR_suppgrw
Definition: XProtocol.hh:1216
kXR_char fhandle[4]
Definition: XProtocol.hh:565
kXR_char fhandle[4]
Definition: XProtocol.hh:823
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:903
kXR_char fhandle[4]
Definition: XProtocol.hh:848
kXR_char fhandle[4]
Definition: XProtocol.hh:812
kXR_int64 offset
Definition: XProtocol.hh:682
kXR_unt16 requestid
Definition: XProtocol.hh:680
@ kXR_virtReadv
Definition: XProtocol.hh:152
kXR_char fhtemplt[4]
Definition: XProtocol.hh:516
kXR_unt16 options
Definition: XProtocol.hh:513
static const int kXR_ckpXeq
Definition: XProtocol.hh:218
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:904
kXR_unt16 requestid
Definition: XProtocol.hh:257
@ kXR_async
Definition: XProtocol.hh:488
@ kXR_open_apnd
Definition: XProtocol.hh:492
@ kXR_retstat
Definition: XProtocol.hh:493
struct ClientRequestHdr header
Definition: XProtocol.hh:887
kXR_char fhandle[4]
Definition: XProtocol.hh:543
#define kXR_recoverWrts
Definition: XProtocol.hh:1208
kXR_unt16 optiont
Definition: XProtocol.hh:514
kXR_unt16 infotype
Definition: XProtocol.hh:667
kXR_char fhandle[4]
Definition: XProtocol.hh:681
kXR_char fhandle[4]
Definition: XProtocol.hh:258
kXR_unt16 requestid
Definition: XProtocol.hh:159
kXR_char fhandle[4]
Definition: XProtocol.hh:669
@ kXR_read
Definition: XProtocol.hh:126
@ kXR_open
Definition: XProtocol.hh:123
@ kXR_writev
Definition: XProtocol.hh:144
@ kXR_clone
Definition: XProtocol.hh:145
@ kXR_readv
Definition: XProtocol.hh:138
@ kXR_sync
Definition: XProtocol.hh:129
@ kXR_fattr
Definition: XProtocol.hh:133
@ kXR_query
Definition: XProtocol.hh:114
@ kXR_write
Definition: XProtocol.hh:132
@ kXR_truncate
Definition: XProtocol.hh:141
@ kXR_stat
Definition: XProtocol.hh:130
@ kXR_pgread
Definition: XProtocol.hh:143
@ kXR_chkpoint
Definition: XProtocol.hh:125
@ kXR_close
Definition: XProtocol.hh:116
@ kXR_pgwrite
Definition: XProtocol.hh:139
kXR_int32 dlen
Definition: XProtocol.hh:684
struct ClientReadRequest read
Definition: XProtocol.hh:909
kXR_int32 rlen
Definition: XProtocol.hh:696
kXR_unt16 requestid
Definition: XProtocol.hh:808
kXR_int32 dlen
Definition: XProtocol.hh:517
kXR_char fhandle[4]
Definition: XProtocol.hh:835
kXR_unt16 mode
Definition: XProtocol.hh:512
kXR_unt16 requestid
Definition: XProtocol.hh:542
kXR_unt16 requestid
Definition: XProtocol.hh:822
kXR_char fhandle[4]
Definition: XProtocol.hh:206
kXR_int64 offset
Definition: XProtocol.hh:697
#define kXR_PROTPGRWVERSION
Definition: XProtocol.hh:73
kXR_int64 offset
Definition: XProtocol.hh:849
@ kXR_dup
Definition: XProtocol.hh:503
@ kXR_samefs
Definition: XProtocol.hh:504
struct ClientWriteRequest write
Definition: XProtocol.hh:918
kXR_int32 rlen
Definition: XProtocol.hh:683
kXR_unt16 requestid
Definition: XProtocol.hh:706
@ kXR_Qvisa
Definition: XProtocol.hh:656
kXR_int32 dlen
Definition: XProtocol.hh:161
unsigned char kXR_char
Definition: XPtypes.hh:65
static int mapError(int rc)
Definition: XProtocol.hh:1404
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
Definition: XrdClBuffer.hh:34
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
Definition: XrdClBuffer.hh:164
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, time_t timeout)
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
An interface for file plug-ins.
std::weak_ptr< FileStateHandler > pTemplateFileWp
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, time_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, time_t timeout)
Try other data server.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Clone(std::shared_ptr< FileStateHandler > &self, const CloneLocations &locs, ResponseHandler *handler, time_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, QueryCode::Code queryCode, const Buffer &arg, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, time_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, time_t timeout=0)
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, time_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Opened
Opening has succeeded.
@ Recovering
Recovering from an error.
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus OpenUsingTemplate(std::shared_ptr< FileStateHandler > &self, ExportedFileTemplate *templ, const std::string &url, OpenFlags::Flags flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus PreRead(std::shared_ptr< FileStateHandler > &self, const TractList &tracts, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, time_t timeout=0)
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, time_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, time_t timeout=0)
bool IsOpen() const
Check if the file is open.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Open operation (.
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Call the user callback.
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:465
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
void SetHostPort(const std::string &hostName, int port)
Definition: XrdClURL.hh:206
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:402
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition: XrdClURL.cc:331
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition: XrdClURL.cc:498
void SetPath(const std::string &path)
Set the path.
Definition: XrdClURL.hh:225
bool IsLocalFile() const
Definition: XrdClURL.cc:474
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
void SetUserName(const std::string &userName)
Set the username.
Definition: XrdClURL.hh:143
static bool HasKSameFS(const XrdCl::URL &url)
Check if given server supports kXR_clone and kXR_samefs.
Definition: XrdClUtils.hh:267
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
Definition: XrdClUtils.hh:235
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
Definition: XrdClStatus.hh:76
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t errPollerError
Definition: XrdClStatus.hh:75
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInProgress
Definition: XrdClStatus.hh:59
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint64_t FileMsg
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
std::vector< TractInfo > TractList
List of Tracts.
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
Definition: XrdClStatus.hh:78
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const uint16_t errSocketError
Definition: XrdClStatus.hh:72
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errSocketDisconnected
Definition: XrdClStatus.hh:74
none object for initializing empty Optional
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
static const int aData
Definition: XProtocol.hh:328
kXR_char fhandle[4]
Definition: XProtocol.hh:318
kXR_unt16 requestid
Definition: XProtocol.hh:317
kXR_unt16 requestid
Definition: XProtocol.hh:861
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< CloneLocation > locations
Definition: XrdClFile.hh:1053
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
Describe a file close event.
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
uint16_t oFlags2
OpenFlags upper 16 bits.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
Open flags, may be or'd when appropriate.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Samefs
Open file on the same filesystem as another.
@ Update
Open for reading and writing.
@ Dup
Open file duplicating content from another.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
Code
XRootD query request codes.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted