XRootD
XrdClFileStateHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClStatus.hh"
29 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClConstants.hh"
35 #include "XrdCl/XrdClMonitor.hh"
36 #include "XrdCl/XrdClFileTimer.hh"
38 #include "XrdCl/XrdClJobManager.hh"
40 #include "XrdCl/XrdClAnyObject.hh"
41 #include "XrdCl/XrdClUtils.hh"
42 
43 #ifdef WITH_XRDEC
44 #include "XrdCl/XrdClEcHandler.hh"
45 #endif
46 
47 #include "XrdOuc/XrdOucCRC.hh"
49 #include "XrdOuc/XrdOucUtils.hh"
50 
52 #include "XrdSys/XrdSysPageSize.hh"
53 #include "XrdSys/XrdSysPthread.hh"
54 
55 #include <sstream>
56 #include <memory>
57 #include <numeric>
58 #include <sys/time.h>
59 #include <uuid/uuid.h>
60 #include <mutex>
61 
62 namespace
63 {
64  //----------------------------------------------------------------------------
65  // Helper callback for handling PgRead responses
66  //----------------------------------------------------------------------------
67  class PgReadHandler : public XrdCl::ResponseHandler
68  {
69  friend class PgReadRetryHandler;
70 
71  public:
72 
73  //------------------------------------------------------------------------
74  // Constructor
75  //------------------------------------------------------------------------
76  PgReadHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
77  XrdCl::ResponseHandler *userHandler,
78  uint64_t orgOffset ) :
79  stateHandler( stateHandler ),
80  userHandler( userHandler ),
81  orgOffset( orgOffset ),
82  maincall( true ),
83  retrycnt( 0 ),
84  nbrepair( 0 )
85  {
86  }
87 
88  //------------------------------------------------------------------------
89  // Handle the response
90  //------------------------------------------------------------------------
92  XrdCl::AnyObject *response,
93  XrdCl::HostList *hostList )
94  {
95  using namespace XrdCl;
96 
97  std::unique_lock<std::mutex> lck( mtx );
98 
99  if( !maincall )
100  {
101  //--------------------------------------------------------------------
102  // We are serving PgRead retry request
103  //--------------------------------------------------------------------
104  --retrycnt;
105  if( !status->IsOK() )
106  st.reset( status );
107  else
108  {
109  delete status; // by convention other args are null (see PgReadRetryHandler)
110  ++nbrepair; // update number of repaired pages
111  }
112 
113  if( retrycnt == 0 )
114  {
115  //------------------------------------------------------------------
116  // All retries came back
117  //------------------------------------------------------------------
118  if( st->IsOK() )
119  {
120  PageInfo &pginf = XrdCl::To<PageInfo>( *resp );
121  pginf.SetNbRepair( nbrepair );
122  userHandler->HandleResponseWithHosts( st.release(), resp.release(), hosts.release() );
123  }
124  else
125  userHandler->HandleResponseWithHosts( st.release(), 0, 0 );
126  lck.unlock();
127  delete this;
128  }
129 
130  return;
131  }
132 
133  //----------------------------------------------------------------------
134  // We are serving main PgRead request
135  //----------------------------------------------------------------------
136  if( !status->IsOK() )
137  {
138  //--------------------------------------------------------------------
139  // The main PgRead request has failed
140  //--------------------------------------------------------------------
141  userHandler->HandleResponseWithHosts( status, response, hostList );
142  lck.unlock();
143  delete this;
144  return;
145  }
146 
147  maincall = false;
148 
149  //----------------------------------------------------------------------
150  // Do the integrity check
151  //----------------------------------------------------------------------
152  PageInfo *pginf = 0;
153  response->Get( pginf );
154 
155  uint64_t pgoff = pginf->GetOffset();
156  uint32_t bytesRead = pginf->GetLength();
157  std::vector<uint32_t> &cksums = pginf->GetCksums();
158  char *buffer = reinterpret_cast<char*>( pginf->GetBuffer() );
159  size_t nbpages = XrdOucPgrwUtils::csNum( pgoff, bytesRead );
160  uint32_t pgsize = XrdSys::PageSize - pgoff % XrdSys::PageSize;
161  if( pgsize > bytesRead ) pgsize = bytesRead;
162 
163  for( size_t pgnb = 0; pgnb < nbpages; ++pgnb )
164  {
165  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
166  if( crcval != cksums[pgnb] )
167  {
168  Log *log = DefaultEnv::GetLog();
169  log->Info( FileMsg, "[0x%x@%s] Received corrupted page, will retry page #%d.",
170  this, stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
171 
172  XRootDStatus st = XrdCl::FileStateHandler::PgReadRetry( stateHandler, pgoff, pgsize, pgnb, buffer, this, 0 );
173  if( !st.IsOK())
174  {
175  *status = st; // the reason for this failure
176  break;
177  }
178  ++retrycnt; // update the retry counter
179  }
180 
181  bytesRead -= pgsize;
182  buffer += pgsize;
183  pgoff += pgsize;
184  pgsize = XrdSys::PageSize;
185  if( pgsize > bytesRead ) pgsize = bytesRead;
186  }
187 
188 
189  if( retrycnt == 0 )
190  {
191  //--------------------------------------------------------------------
192  // All went well!
193  //--------------------------------------------------------------------
194  userHandler->HandleResponseWithHosts( status, response, hostList );
195  lck.unlock();
196  delete this;
197  return;
198  }
199 
200  //----------------------------------------------------------------------
201  // We have to wait for retries!
202  //----------------------------------------------------------------------
203  resp.reset( response );
204  hosts.reset( hostList );
205  st.reset( status );
206  }
207 
208  void UpdateCksum( size_t pgnb, uint32_t crcval )
209  {
210  if( resp )
211  {
212  XrdCl::PageInfo *pginf = 0;
213  resp->Get( pginf );
214  pginf->GetCksums()[pgnb] = crcval;
215  }
216  }
217 
218  private:
219 
220  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
221  XrdCl::ResponseHandler *userHandler;
222  uint64_t orgOffset;
223 
224  std::unique_ptr<XrdCl::AnyObject> resp;
225  std::unique_ptr<XrdCl::HostList> hosts;
226  std::unique_ptr<XrdCl::XRootDStatus> st;
227 
228  std::mutex mtx;
229  bool maincall;
230  size_t retrycnt;
231  size_t nbrepair;
232 
233  };
234 
235  //----------------------------------------------------------------------------
236  // Helper callback for handling PgRead retries
237  //----------------------------------------------------------------------------
238  class PgReadRetryHandler : public XrdCl::ResponseHandler
239  {
240  public:
241 
242  PgReadRetryHandler( PgReadHandler *pgReadHandler, size_t pgnb ) : pgReadHandler( pgReadHandler ),
243  pgnb( pgnb )
244  {
245 
246  }
247 
248  //------------------------------------------------------------------------
249  // Handle the response
250  //------------------------------------------------------------------------
251  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
252  XrdCl::AnyObject *response,
253  XrdCl::HostList *hostList )
254  {
255  using namespace XrdCl;
256 
257  if( !status->IsOK() )
258  {
259  Log *log = DefaultEnv::GetLog();
260  log->Info( FileMsg, "[0x%x@%s] Failed to recover page #%d.",
261  this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
262  pgReadHandler->HandleResponseWithHosts( status, response, hostList );
263  delete this;
264  return;
265  }
266 
267  XrdCl::PageInfo *pginf = 0;
268  response->Get( pginf );
269  if( pginf->GetLength() > (uint32_t)XrdSys::PageSize || pginf->GetCksums().size() != 1 )
270  {
271  Log *log = DefaultEnv::GetLog();
272  log->Info( FileMsg, "[0x%x@%s] Failed to recover page #%d.",
273  this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
274  // we retry a page at a time so the length cannot exceed 4KB
275  DeleteArgs( status, response, hostList );
276  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
277  delete this;
278  return;
279  }
280 
281  uint32_t crcval = XrdOucCRC::Calc32C( pginf->GetBuffer(), pginf->GetLength() );
282  if( crcval != pginf->GetCksums().front() )
283  {
284  Log *log = DefaultEnv::GetLog();
285  log->Info( FileMsg, "[0x%x@%s] Failed to recover page #%d.",
286  this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
287  DeleteArgs( status, response, hostList );
288  pgReadHandler->HandleResponseWithHosts( new XRootDStatus( stError, errDataError ), 0, 0 );
289  delete this;
290  return;
291  }
292 
293  Log *log = DefaultEnv::GetLog();
294  log->Info( FileMsg, "[0x%x@%s] Successfully recovered page #%d.",
295  this, pgReadHandler->stateHandler->pFileUrl->GetObfuscatedURL().c_str(), pgnb );
296 
297  DeleteArgs( 0, response, hostList );
298  pgReadHandler->UpdateCksum( pgnb, crcval );
299  pgReadHandler->HandleResponseWithHosts( status, 0, 0 );
300  delete this;
301  }
302 
303  private:
304 
305  inline void DeleteArgs( XrdCl::XRootDStatus *status,
306  XrdCl::AnyObject *response,
307  XrdCl::HostList *hostList )
308  {
309  delete status;
310  delete response;
311  delete hostList;
312  }
313 
314  PgReadHandler *pgReadHandler;
315  size_t pgnb;
316  };
317 
318  //----------------------------------------------------------------------------
319  // Handle PgRead substitution with ordinary Read
320  //----------------------------------------------------------------------------
322  {
323  public:
324 
325  //------------------------------------------------------------------------
326  // Constructor
327  //------------------------------------------------------------------------
328  PgReadSubstitutionHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
329  XrdCl::ResponseHandler *userHandler ) :
330  stateHandler( stateHandler ),
331  userHandler( userHandler )
332  {
333  }
334 
335  //------------------------------------------------------------------------
336  // Handle the response
337  //------------------------------------------------------------------------
338  void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
339  XrdCl::AnyObject *rdresp,
340  XrdCl::HostList *hostList )
341  {
342  if( !status->IsOK() )
343  {
344  userHandler->HandleResponseWithHosts( status, rdresp, hostList );
345  delete this;
346  return;
347  }
348 
349  using namespace XrdCl;
350 
351  ChunkInfo *chunk = 0;
352  rdresp->Get( chunk );
353 
354  std::vector<uint32_t> cksums;
355  if( stateHandler->pIsChannelEncrypted )
356  {
357  size_t nbpages = chunk->length / XrdSys::PageSize;
358  if( chunk->length % XrdSys::PageSize )
359  ++nbpages;
360  cksums.reserve( nbpages );
361 
362  size_t size = chunk->length;
363  char *buffer = reinterpret_cast<char*>( chunk->buffer );
364 
365  for( size_t pg = 0; pg < nbpages; ++pg )
366  {
367  size_t pgsize = XrdSys::PageSize;
368  if( pgsize > size ) pgsize = size;
369  uint32_t crcval = XrdOucCRC::Calc32C( buffer, pgsize );
370  cksums.push_back( crcval );
371  buffer += pgsize;
372  size -= pgsize;
373  }
374  }
375 
376  PageInfo *pages = new PageInfo( chunk->offset, chunk->length,
377  chunk->buffer, std::move( cksums ) );
378  delete rdresp;
379  AnyObject *response = new AnyObject();
380  response->Set( pages );
381  userHandler->HandleResponseWithHosts( status, response, hostList );
382 
383  delete this;
384  }
385 
386  private:
387 
388  std::shared_ptr<XrdCl::FileStateHandler> stateHandler;
389  XrdCl::ResponseHandler *userHandler;
390  };
391 
392  //----------------------------------------------------------------------------
393  // Object that does things to the FileStateHandler when kXR_open returns
394  // and then calls the user handler
395  //----------------------------------------------------------------------------
396  class OpenHandler: public XrdCl::ResponseHandler
397  {
398  public:
399  //------------------------------------------------------------------------
400  // Constructor
401  //------------------------------------------------------------------------
402  OpenHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
403  XrdCl::ResponseHandler *userHandler ):
404  pStateHandler( stateHandler ),
405  pUserHandler( userHandler )
406  {
407  }
408 
409  //------------------------------------------------------------------------
410  // Handle the response
411  //------------------------------------------------------------------------
412  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
413  XrdCl::AnyObject *response,
414  XrdCl::HostList *hostList )
415  {
416  using namespace XrdCl;
417 
418  //----------------------------------------------------------------------
419  // Extract the statistics info
420  //----------------------------------------------------------------------
421  OpenInfo *openInfo = 0;
422  if( status->IsOK() )
423  response->Get( openInfo );
424 #ifdef WITH_XRDEC
425  else
426  //--------------------------------------------------------------------
427  // Handle EC redirect
428  //--------------------------------------------------------------------
429  if( status->code == errRedirect )
430  {
431  std::string ecurl = status->GetErrorMessage();
432  EcHandler *ecHandler = GetEcHandler( hostList->front().url, ecurl );
433  if( ecHandler )
434  {
435  pStateHandler->pPlugin = ecHandler; // set the plugin for the File object
436  ecHandler->Open( pStateHandler->pOpenFlags, pUserHandler, 0/*TODO figure out right value for the timeout*/ );
437  return;
438  }
439  }
440 #endif
441  //----------------------------------------------------------------------
442  // Notify the state handler and the client and say bye bye
443  //----------------------------------------------------------------------
444  pStateHandler->OnOpen( status, openInfo, hostList );
445  delete response;
446  if( pUserHandler )
447  pUserHandler->HandleResponseWithHosts( status, 0, hostList );
448  else
449  {
450  delete status;
451  delete hostList;
452  }
453  delete this;
454  }
455 
456  private:
457  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
458  XrdCl::ResponseHandler *pUserHandler;
459  };
460 
461  //----------------------------------------------------------------------------
462  // Object that does things to the FileStateHandler when kXR_close returns
463  // and then calls the user handler
464  //----------------------------------------------------------------------------
465  class CloseHandler: public XrdCl::ResponseHandler
466  {
467  public:
468  //------------------------------------------------------------------------
469  // Constructor
470  //------------------------------------------------------------------------
471  CloseHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
472  XrdCl::ResponseHandler *userHandler,
473  XrdCl::Message *message ):
474  pStateHandler( stateHandler ),
475  pUserHandler( userHandler ),
476  pMessage( message )
477  {
478  }
479 
480  //------------------------------------------------------------------------
482  //------------------------------------------------------------------------
483  virtual ~CloseHandler()
484  {
485  delete pMessage;
486  }
487 
488  //------------------------------------------------------------------------
489  // Handle the response
490  //------------------------------------------------------------------------
491  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
492  XrdCl::AnyObject *response,
493  XrdCl::HostList *hostList )
494  {
495  pStateHandler->OnClose( status );
496  if( pUserHandler )
497  pUserHandler->HandleResponseWithHosts( status, response, hostList );
498  else
499  {
500  delete response;
501  delete status;
502  delete hostList;
503  }
504 
505  delete this;
506  }
507 
508  private:
509  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
510  XrdCl::ResponseHandler *pUserHandler;
511  XrdCl::Message *pMessage;
512  };
513 
514  //----------------------------------------------------------------------------
515  // Stateful message handler
516  //----------------------------------------------------------------------------
517  class StatefulHandler: public XrdCl::ResponseHandler
518  {
519  public:
520  //------------------------------------------------------------------------
521  // Constructor
522  //------------------------------------------------------------------------
523  StatefulHandler( std::shared_ptr<XrdCl::FileStateHandler> &stateHandler,
524  XrdCl::ResponseHandler *userHandler,
525  XrdCl::Message *message,
526  const XrdCl::MessageSendParams &sendParams ):
527  pStateHandler( stateHandler ),
528  pUserHandler( userHandler ),
529  pMessage( message ),
530  pSendParams( sendParams )
531  {
532  }
533 
534  //------------------------------------------------------------------------
535  // Destructor
536  //------------------------------------------------------------------------
537  virtual ~StatefulHandler()
538  {
539  delete pMessage;
540  delete pSendParams.chunkList;
541  delete pSendParams.kbuff;
542  }
543 
544  //------------------------------------------------------------------------
545  // Handle the response
546  //------------------------------------------------------------------------
547  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
548  XrdCl::AnyObject *response,
549  XrdCl::HostList *hostList )
550  {
551  using namespace XrdCl;
552  std::unique_ptr<AnyObject> responsePtr( response );
553  pSendParams.hostList = hostList;
554 
555  //----------------------------------------------------------------------
556  // Houston we have a problem...
557  //----------------------------------------------------------------------
558  if( !status->IsOK() )
559  {
560  XrdCl::FileStateHandler::OnStateError( pStateHandler, status, pMessage, this, pSendParams );
561  return;
562  }
563 
564  //----------------------------------------------------------------------
565  // We're clear
566  //----------------------------------------------------------------------
567  responsePtr.release();
568  XrdCl::FileStateHandler::OnStateResponse( pStateHandler, status, pMessage, response, hostList );
569  if( pUserHandler )
570  pUserHandler->HandleResponseWithHosts( status, response, hostList );
571  else
572  {
573  delete status,
574  delete response;
575  delete hostList;
576  }
577  delete this;
578  }
579 
580  //------------------------------------------------------------------------
582  //------------------------------------------------------------------------
583  XrdCl::ResponseHandler *GetUserHandler()
584  {
585  return pUserHandler;
586  }
587 
588  private:
589  std::shared_ptr<XrdCl::FileStateHandler> pStateHandler;
590  XrdCl::ResponseHandler *pUserHandler;
591  XrdCl::Message *pMessage;
592  XrdCl::MessageSendParams pSendParams;
593  };
594 
595  //----------------------------------------------------------------------------
596  // Release-buffer Handler
597  //----------------------------------------------------------------------------
598  class ReleaseBufferHandler: public XrdCl::ResponseHandler
599  {
600  public:
601 
602  //------------------------------------------------------------------------
603  // Constructor
604  //------------------------------------------------------------------------
605  ReleaseBufferHandler( XrdCl::Buffer &&buffer, XrdCl::ResponseHandler *handler ) :
606  buffer( std::move( buffer ) ),
607  handler( handler )
608  {
609  }
610 
611  //------------------------------------------------------------------------
612  // Handle the response
613  //------------------------------------------------------------------------
614  virtual void HandleResponseWithHosts( XrdCl::XRootDStatus *status,
615  XrdCl::AnyObject *response,
616  XrdCl::HostList *hostList )
617  {
618  if (handler)
619  handler->HandleResponseWithHosts( status, response, hostList );
620  }
621 
622  //------------------------------------------------------------------------
623  // Get the underlying buffer
624  //------------------------------------------------------------------------
625  XrdCl::Buffer& GetBuffer()
626  {
627  return buffer;
628  }
629 
630  private:
631  XrdCl::Buffer buffer;
632  XrdCl::ResponseHandler *handler;
633  };
634 }
635 
636 namespace XrdCl
637 {
638  //----------------------------------------------------------------------------
639  // Constructor
640  //----------------------------------------------------------------------------
641  FileStateHandler::FileStateHandler( FilePlugIn *& plugin ):
642  pFileState( Closed ),
643  pStatInfo( 0 ),
644  pFileUrl( 0 ),
645  pDataServer( 0 ),
646  pLoadBalancer( 0 ),
647  pStateRedirect( 0 ),
648  pWrtRecoveryRedir( 0 ),
649  pFileHandle( 0 ),
650  pOpenMode( 0 ),
651  pOpenFlags( 0 ),
652  pSessionId( 0 ),
653  pDoRecoverRead( true ),
654  pDoRecoverWrite( true ),
655  pFollowRedirects( true ),
656  pUseVirtRedirector( true ),
657  pIsChannelEncrypted( false ),
658  pAllowBundledClose( false ),
659  pPlugin( plugin )
660  {
661  pFileHandle = new uint8_t[4];
662  ResetMonitoringVars();
665  pLFileHandler = new LocalFileHandler();
666  }
667 
668  //------------------------------------------------------------------------
673  //------------------------------------------------------------------------
674  FileStateHandler::FileStateHandler( bool useVirtRedirector, FilePlugIn *& plugin ):
675  pFileState( Closed ),
676  pStatInfo( 0 ),
677  pFileUrl( 0 ),
678  pDataServer( 0 ),
679  pLoadBalancer( 0 ),
680  pStateRedirect( 0 ),
681  pWrtRecoveryRedir( 0 ),
682  pFileHandle( 0 ),
683  pOpenMode( 0 ),
684  pOpenFlags( 0 ),
685  pSessionId( 0 ),
686  pDoRecoverRead( true ),
687  pDoRecoverWrite( true ),
688  pFollowRedirects( true ),
689  pUseVirtRedirector( useVirtRedirector ),
690  pAllowBundledClose( false ),
691  pPlugin( plugin )
692  {
693  pFileHandle = new uint8_t[4];
694  ResetMonitoringVars();
697  pLFileHandler = new LocalFileHandler();
698  }
699 
700  //----------------------------------------------------------------------------
701  // Destructor
702  //----------------------------------------------------------------------------
704  {
705  //--------------------------------------------------------------------------
706  // This, in principle, should never ever happen. Except for the case
707  // when we're interfaced with ROOT that may call this desctructor from
708  // its garbage collector, from its __cxa_finalize, ie. after the XrdCl lib
709  // has been finalized by the linker. So, if we don't have the log object
710  // at this point we just give up the hope.
711  //--------------------------------------------------------------------------
712  if( DefaultEnv::GetLog() && pSessionId && !pDataServer->IsLocalFile() ) // if the file object was bound to a physical connection
713  DefaultEnv::GetPostMaster()->DecFileInstCnt( *pDataServer );
714 
717 
720 
721  if( pFileState != Closed && DefaultEnv::GetLog() )
722  {
723  XRootDStatus st;
724  MonitorClose( &st );
725  ResetMonitoringVars();
726  }
727 
728  // check if the logger is still there, this is only for root, as root might
729  // have unload us already so in this case we don't want to do anything
730  if( DefaultEnv::GetLog() && pUseVirtRedirector && pFileUrl && pFileUrl->IsMetalink() )
731  {
733  registry.Release( *pFileUrl );
734  }
735 
736  delete pStatInfo;
737  delete pFileUrl;
738  delete pDataServer;
739  delete pLoadBalancer;
740  delete [] pFileHandle;
741  delete pLFileHandler;
742  }
743 
744  //----------------------------------------------------------------------------
745  // Open the file pointed to by the given URL
746  //----------------------------------------------------------------------------
747  XRootDStatus FileStateHandler::Open( std::shared_ptr<FileStateHandler> &self,
748  const std::string &url,
749  uint16_t flags,
750  uint16_t mode,
751  ResponseHandler *handler,
752  uint16_t timeout )
753  {
754  XrdSysMutexHelper scopedLock( self->pMutex );
755 
756  //--------------------------------------------------------------------------
757  // Check if we can proceed
758  //--------------------------------------------------------------------------
759  if( self->pFileState == Error )
760  return self->pStatus;
761 
762  if( self->pFileState == OpenInProgress )
764 
765  if( self->pFileState == CloseInProgress || self->pFileState == Opened ||
766  self->pFileState == Recovering )
767  return XRootDStatus( stError, errInvalidOp );
768 
769  self->pFileState = OpenInProgress;
770 
771  //--------------------------------------------------------------------------
772  // Check if the parameters are valid
773  //--------------------------------------------------------------------------
774  Log *log = DefaultEnv::GetLog();
775 
776  if( self->pFileUrl )
777  {
778  if( self->pUseVirtRedirector && self->pFileUrl->IsMetalink() )
779  {
781  registry.Release( *self->pFileUrl );
782  }
783  delete self->pFileUrl;
784  self->pFileUrl = 0;
785  }
786 
787  self->pFileUrl = new URL( url );
788 
789  //--------------------------------------------------------------------------
790  // Add unique uuid to each open request so replays due to error/timeout
791  // recovery can be correctly handled.
792  //--------------------------------------------------------------------------
793  URL::ParamsMap cgi = self->pFileUrl->GetParams();
794  uuid_t uuid;
795  char requuid[37]= {0};
796  uuid_generate( uuid );
797  uuid_unparse( uuid, requuid );
798  cgi["xrdcl.requuid"] = requuid;
799  self->pFileUrl->SetParams( cgi );
800 
801  if( !self->pFileUrl->IsValid() )
802  {
803  log->Error( FileMsg, "[0x%x@%s] Trying to open invalid url: %s",
804  self.get(), self->pFileUrl->GetPath().c_str(), url.c_str() );
805  self->pStatus = XRootDStatus( stError, errInvalidArgs );
806  self->pFileState = Closed;
807  return self->pStatus;
808  }
809 
810  //--------------------------------------------------------------------------
811  // Check if the recovery procedures should be enabled
812  //--------------------------------------------------------------------------
813  const URL::ParamsMap &urlParams = self->pFileUrl->GetParams();
814  URL::ParamsMap::const_iterator it;
815  it = urlParams.find( "xrdcl.recover-reads" );
816  if( (it != urlParams.end() && it->second == "false") ||
817  !self->pDoRecoverRead )
818  {
819  self->pDoRecoverRead = false;
820  log->Debug( FileMsg, "[0x%x@%s] Read recovery procedures are disabled",
821  self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
822  }
823 
824  it = urlParams.find( "xrdcl.recover-writes" );
825  if( (it != urlParams.end() && it->second == "false") ||
826  !self->pDoRecoverWrite )
827  {
828  self->pDoRecoverWrite = false;
829  log->Debug( FileMsg, "[0x%x@%s] Write recovery procedures are disabled",
830  self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
831  }
832 
833  //--------------------------------------------------------------------------
834  // Open the file
835  //--------------------------------------------------------------------------
836  log->Debug( FileMsg, "[0x%x@%s] Sending an open command", self.get(),
837  self->pFileUrl->GetObfuscatedURL().c_str() );
838 
839  self->pOpenMode = mode;
840  self->pOpenFlags = flags;
841  OpenHandler *openHandler = new OpenHandler( self, handler );
842 
843  Message *msg;
844  ClientOpenRequest *req;
845  std::string path = self->pFileUrl->GetPathWithFilteredParams();
846  MessageUtils::CreateRequest( msg, req, path.length() );
847 
848  req->requestid = kXR_open;
849  req->mode = mode;
850  req->options = flags | kXR_async | kXR_retstat;
851  req->dlen = path.length();
852  msg->Append( path.c_str(), path.length(), 24 );
853 
855  MessageSendParams params; params.timeout = timeout;
856  params.followRedirects = self->pFollowRedirects;
858 
859  XRootDStatus st = self->IssueRequest( *self->pFileUrl, msg, openHandler, params );
860 
861  if( !st.IsOK() )
862  {
863  delete openHandler;
864  self->pStatus = st;
865  self->pFileState = Closed;
866  return st;
867  }
868  return st;
869  }
870 
871  //----------------------------------------------------------------------------
872  // Close the file object
873  //----------------------------------------------------------------------------
874  XRootDStatus FileStateHandler::Close( std::shared_ptr<FileStateHandler> &self,
875  ResponseHandler *handler,
876  uint16_t timeout )
877  {
878  XrdSysMutexHelper scopedLock( self->pMutex );
879 
880  //--------------------------------------------------------------------------
881  // Check if we can proceed
882  //--------------------------------------------------------------------------
883  if( self->pFileState == Error )
884  return self->pStatus;
885 
886  if( self->pFileState == CloseInProgress )
888 
889  if( self->pFileState == Closed )
890  return XRootDStatus( stOK, suAlreadyDone );
891 
892  if( self->pFileState == OpenInProgress || self->pFileState == Recovering )
893  return XRootDStatus( stError, errInvalidOp );
894 
895  if( !self->pAllowBundledClose && !self->pInTheFly.empty() )
896  return XRootDStatus( stError, errInvalidOp );
897 
898  self->pFileState = CloseInProgress;
899 
900  Log *log = DefaultEnv::GetLog();
901  log->Debug( FileMsg, "[0x%x@%s] Sending a close command for handle 0x%x to "
902  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
903  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
904 
905  //--------------------------------------------------------------------------
906  // Close the file
907  //--------------------------------------------------------------------------
908  Message *msg;
909  ClientCloseRequest *req;
910  MessageUtils::CreateRequest( msg, req );
911 
912  req->requestid = kXR_close;
913  memcpy( req->fhandle, self->pFileHandle, 4 );
914 
916  msg->SetSessionId( self->pSessionId );
917  CloseHandler *closeHandler = new CloseHandler( self, handler, msg );
918  MessageSendParams params;
919  params.timeout = timeout;
920  params.followRedirects = false;
921  params.stateful = true;
923 
924  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, closeHandler, params );
925 
926  if( !st.IsOK() )
927  {
928  // an invalid-session error means the connection to the server has been
929  // closed, which in turn means that the server closed the file already
930  if( st.code == errInvalidSession || st.code == errSocketDisconnected ||
932  st.code == errPollerError || st.code == errSocketError )
933  {
934  self->pFileState = Closed;
935  ResponseJob *job = new ResponseJob( closeHandler, new XRootDStatus(),
936  nullptr, nullptr );
938  return XRootDStatus();
939  }
940 
941  delete closeHandler;
942  self->pStatus = st;
943  self->pFileState = Error;
944  return st;
945  }
946  return st;
947  }
948 
949  //----------------------------------------------------------------------------
950  // Stat the file
951  //----------------------------------------------------------------------------
952  XRootDStatus FileStateHandler::Stat( std::shared_ptr<FileStateHandler> &self,
953  bool force,
954  ResponseHandler *handler,
955  uint16_t timeout )
956  {
957  XrdSysMutexHelper scopedLock( self->pMutex );
958 
959  if( self->pFileState == Error ) return self->pStatus;
960 
961  if( self->pFileState != Opened && self->pFileState != Recovering )
962  return XRootDStatus( stError, errInvalidOp );
963 
964  //--------------------------------------------------------------------------
965  // Return the cached info
966  //--------------------------------------------------------------------------
967  if( !force )
968  {
969  AnyObject *obj = new AnyObject();
970  obj->Set( new StatInfo( *self->pStatInfo ) );
971  if (handler)
972  handler->HandleResponseWithHosts( new XRootDStatus(), obj, new HostList() );
973  return XRootDStatus();
974  }
975 
976  Log *log = DefaultEnv::GetLog();
977  log->Debug( FileMsg, "[0x%x@%s] Sending a stat command for handle 0x%x to "
978  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
979  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
980 
981  //--------------------------------------------------------------------------
982  // Issue a new stat request
983  // stating a file handle doesn't work (fixed in 3.2.0) so we need to
984  // stat the pat
985  //--------------------------------------------------------------------------
986  Message *msg;
987  ClientStatRequest *req;
988  std::string path = self->pFileUrl->GetPath();
989  MessageUtils::CreateRequest( msg, req );
990 
991  req->requestid = kXR_stat;
992  memcpy( req->fhandle, self->pFileHandle, 4 );
993 
994  MessageSendParams params;
995  params.timeout = timeout;
996  params.followRedirects = false;
997  params.stateful = true;
999 
1001  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1002 
1003  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1004  }
1005 
1006  //----------------------------------------------------------------------------
1007  // Read a data chunk at a given offset - sync
1008  //----------------------------------------------------------------------------
1009  XRootDStatus FileStateHandler::Read( std::shared_ptr<FileStateHandler> &self,
1010  uint64_t offset,
1011  uint32_t size,
1012  void *buffer,
1013  ResponseHandler *handler,
1014  uint16_t timeout )
1015  {
1016  XrdSysMutexHelper scopedLock( self->pMutex );
1017 
1018  if( self->pFileState == Error ) return self->pStatus;
1019 
1020  if( self->pFileState != Opened && self->pFileState != Recovering )
1021  return XRootDStatus( stError, errInvalidOp );
1022 
1023  Log *log = DefaultEnv::GetLog();
1024  log->Debug( FileMsg, "[0x%x@%s] Sending a read command for handle 0x%x to "
1025  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1026  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1027 
1028  Message *msg;
1029  ClientReadRequest *req;
1030  MessageUtils::CreateRequest( msg, req );
1031 
1032  req->requestid = kXR_read;
1033  req->offset = offset;
1034  req->rlen = size;
1035  memcpy( req->fhandle, self->pFileHandle, 4 );
1036 
1037  ChunkList *list = new ChunkList();
1038  list->push_back( ChunkInfo( offset, size, buffer ) );
1039 
1041  MessageSendParams params;
1042  params.timeout = timeout;
1043  params.followRedirects = false;
1044  params.stateful = true;
1045  params.chunkList = list;
1047  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1048 
1049  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1050  }
1051 
1052  //------------------------------------------------------------------------
1053  // Read data pages at a given offset
1054  //------------------------------------------------------------------------
1055  XRootDStatus FileStateHandler::PgRead( std::shared_ptr<FileStateHandler> &self,
1056  uint64_t offset,
1057  uint32_t size,
1058  void *buffer,
1059  ResponseHandler *handler,
1060  uint16_t timeout )
1061  {
1062  int issupported = true;
1063  AnyObject obj;
1065  int protver = 0;
1066  XRootDStatus st2 = Utils::GetProtocolVersion( *self->pDataServer, protver );
1067  if( st1.IsOK() && st2.IsOK() )
1068  {
1069  int *ptr = 0;
1070  obj.Get( ptr );
1071  issupported = ( *ptr & kXR_suppgrw ) && ( protver >= kXR_PROTPGRWVERSION );
1072  delete ptr;
1073  }
1074  else
1075  issupported = false;
1076 
1077  if( !issupported )
1078  {
1079  DefaultEnv::GetLog()->Debug( FileMsg, "[0x%x@%s] PgRead not supported; substituting with Read.",
1080  self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
1081  ResponseHandler *substitHandler = new PgReadSubstitutionHandler( self, handler );
1082  auto st = Read( self, offset, size, buffer, substitHandler, timeout );
1083  if( !st.IsOK() ) delete substitHandler;
1084  return st;
1085  }
1086 
1087  ResponseHandler* pgHandler = new PgReadHandler( self, handler, offset );
1088  auto st = PgReadImpl( self, offset, size, buffer, PgReadFlags::None, pgHandler, timeout );
1089  if( !st.IsOK() ) delete pgHandler;
1090  return st;
1091  }
1092 
1093  XRootDStatus FileStateHandler::PgReadRetry( std::shared_ptr<FileStateHandler> &self,
1094  uint64_t offset,
1095  uint32_t size,
1096  size_t pgnb,
1097  void *buffer,
1098  PgReadHandler *handler,
1099  uint16_t timeout )
1100  {
1101  if( size > (uint32_t)XrdSys::PageSize )
1102  return XRootDStatus( stError, errInvalidArgs, EINVAL,
1103  "PgRead retry size exceeded 4KB." );
1104 
1105  ResponseHandler *retryHandler = new PgReadRetryHandler( handler, pgnb );
1106  XRootDStatus st = PgReadImpl( self, offset, size, buffer, PgReadFlags::Retry, retryHandler, timeout );
1107  if( !st.IsOK() ) delete retryHandler;
1108  return st;
1109  }
1110 
1111  XRootDStatus FileStateHandler::PgReadImpl( std::shared_ptr<FileStateHandler> &self,
1112  uint64_t offset,
1113  uint32_t size,
1114  void *buffer,
1115  uint16_t flags,
1116  ResponseHandler *handler,
1117  uint16_t timeout )
1118  {
1119  XrdSysMutexHelper scopedLock( self->pMutex );
1120 
1121  if( self->pFileState == Error ) return self->pStatus;
1122 
1123  if( self->pFileState != Opened && self->pFileState != Recovering )
1124  return XRootDStatus( stError, errInvalidOp );
1125 
1126  Log *log = DefaultEnv::GetLog();
1127  log->Debug( FileMsg, "[0x%x@%s] Sending a pgread command for handle 0x%x to "
1128  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1129  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1130 
1131  Message *msg;
1132  ClientPgReadRequest *req;
1133  MessageUtils::CreateRequest( msg, req, sizeof( ClientPgReadReqArgs ) );
1134 
1135  req->requestid = kXR_pgread;
1136  req->offset = offset;
1137  req->rlen = size;
1138  memcpy( req->fhandle, self->pFileHandle, 4 );
1139 
1140  //--------------------------------------------------------------------------
1141  // Now adjust the message size so it can hold PgRead arguments
1142  //--------------------------------------------------------------------------
1143  req->dlen = sizeof( ClientPgReadReqArgs );
1144  void *newBuf = msg->GetBuffer( sizeof( ClientPgReadRequest ) );
1145  memset( newBuf, 0, sizeof( ClientPgReadReqArgs ) );
1146  ClientPgReadReqArgs *args = reinterpret_cast<ClientPgReadReqArgs*>(
1147  msg->GetBuffer( sizeof( ClientPgReadRequest ) ) );
1148  args->reqflags = flags;
1149 
1150  ChunkList *list = new ChunkList();
1151  list->push_back( ChunkInfo( offset, size, buffer ) );
1152 
1154  MessageSendParams params;
1155  params.timeout = timeout;
1156  params.followRedirects = false;
1157  params.stateful = true;
1158  params.chunkList = list;
1160  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1161 
1162  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1163  }
1164 
1165  //----------------------------------------------------------------------------
1166  // Write a data chunk at a given offset - async
1167  //----------------------------------------------------------------------------
1168  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1169  uint64_t offset,
1170  uint32_t size,
1171  const void *buffer,
1172  ResponseHandler *handler,
1173  uint16_t timeout )
1174  {
1175  XrdSysMutexHelper scopedLock( self->pMutex );
1176 
1177  if( self->pFileState == Error ) return self->pStatus;
1178 
1179  if( self->pFileState != Opened && self->pFileState != Recovering )
1180  return XRootDStatus( stError, errInvalidOp );
1181 
1182  Log *log = DefaultEnv::GetLog();
1183  log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
1184  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1185  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1186 
1187  Message *msg;
1188  ClientWriteRequest *req;
1189  MessageUtils::CreateRequest( msg, req );
1190 
1191  req->requestid = kXR_write;
1192  req->offset = offset;
1193  req->dlen = size;
1194  memcpy( req->fhandle, self->pFileHandle, 4 );
1195 
1196  ChunkList *list = new ChunkList();
1197  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
1198 
1199  MessageSendParams params;
1200  params.timeout = timeout;
1201  params.followRedirects = false;
1202  params.stateful = true;
1203  params.chunkList = list;
1204 
1206 
1208  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1209 
1210  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1211  }
1212 
1213  //----------------------------------------------------------------------------
1214  // Write a data chunk at a given offset
1215  //----------------------------------------------------------------------------
1216  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1217  uint64_t offset,
1218  Buffer &&buffer,
1219  ResponseHandler *handler,
1220  uint16_t timeout )
1221  {
1222  //--------------------------------------------------------------------------
1223  // If the memory is not page (4KB) aligned we cannot use the kernel buffer
1224  // so fall back to normal write
1225  //--------------------------------------------------------------------------
1226  if( !XrdSys::KernelBuffer::IsPageAligned( buffer.GetBuffer() ) || self->pIsChannelEncrypted )
1227  {
1228  Log *log = DefaultEnv::GetLog();
1229  log->Info( FileMsg, "[0x%x@%s] Buffer is not page aligned (4KB), cannot "
1230  "convert it to kernel space buffer.", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1231  *((uint32_t*)self->pFileHandle) );
1232 
1233  void *buff = buffer.GetBuffer();
1234  uint32_t size = buffer.GetSize();
1235  ReleaseBufferHandler *wrtHandler =
1236  new ReleaseBufferHandler( std::move( buffer ), handler );
1237  XRootDStatus st = self->Write( self, offset, size, buff, wrtHandler, timeout );
1238  if( !st.IsOK() )
1239  {
1240  buffer = std::move( wrtHandler->GetBuffer() );
1241  delete wrtHandler;
1242  }
1243  return st;
1244  }
1245 
1246  //--------------------------------------------------------------------------
1247  // Transfer the data from user space to kernel space
1248  //--------------------------------------------------------------------------
1249  uint32_t length = buffer.GetSize();
1250  char *ubuff = buffer.Release();
1251 
1252  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1253  ssize_t ret = XrdSys::Move( ubuff, *kbuff, length );
1254  if( ret < 0 )
1255  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1256 
1257  //--------------------------------------------------------------------------
1258  // Now create a write request and enqueue it
1259  //--------------------------------------------------------------------------
1260  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1261  }
1262 
1263  //----------------------------------------------------------------------------
1264  // Write a data from a given file descriptor at a given offset - async
1265  //----------------------------------------------------------------------------
1266  XRootDStatus FileStateHandler::Write( std::shared_ptr<FileStateHandler> &self,
1267  uint64_t offset,
1268  uint32_t size,
1269  Optional<uint64_t> fdoff,
1270  int fd,
1271  ResponseHandler *handler,
1272  uint16_t timeout )
1273  {
1274  //--------------------------------------------------------------------------
1275  // Read the data from the file descriptor into a kernel buffer
1276  //--------------------------------------------------------------------------
1277  std::unique_ptr<XrdSys::KernelBuffer> kbuff( new XrdSys::KernelBuffer() );
1278  ssize_t ret = fdoff ? XrdSys::Read( fd, *kbuff, size, *fdoff ) :
1279  XrdSys::Read( fd, *kbuff, size );
1280  if( ret < 0 )
1281  return XRootDStatus( stError, errInternal, XProtocol::mapError( errno ) );
1282 
1283  //--------------------------------------------------------------------------
1284  // Now create a write request and enqueue it
1285  //--------------------------------------------------------------------------
1286  return WriteKernelBuffer( self, offset, ret, std::move( kbuff ), handler, timeout );
1287  }
1288 
1289  //----------------------------------------------------------------------------
1290  // Write number of pages at a given offset - async
1291  //----------------------------------------------------------------------------
1292  XRootDStatus FileStateHandler::PgWrite( std::shared_ptr<FileStateHandler> &self,
1293  uint64_t offset,
1294  uint32_t size,
1295  const void *buffer,
1296  std::vector<uint32_t> &cksums,
1297  ResponseHandler *handler,
1298  uint16_t timeout )
1299  {
1300  //--------------------------------------------------------------------------
1301  // Resolve timeout value
1302  //--------------------------------------------------------------------------
1303  if( timeout == 0 )
1304  {
1305  int val = DefaultRequestTimeout;
1306  XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
1307  timeout = val;
1308  }
1309 
1310  //--------------------------------------------------------------------------
1311  // Validate the digest vector size
1312  //--------------------------------------------------------------------------
1313  if( cksums.empty() )
1314  {
1315  const char *data = static_cast<const char*>( buffer );
1316  XrdOucPgrwUtils::csCalc( data, offset, size, cksums );
1317  }
1318  else
1319  {
1320  size_t crc32cCnt = XrdOucPgrwUtils::csNum( offset, size );
1321  if( crc32cCnt != cksums.size() )
1322  return XRootDStatus( stError, errInvalidArgs, 0, "Wrong number of crc32c digests." );
1323  }
1324 
1325  //--------------------------------------------------------------------------
1326  // Create a context for PgWrite operation
1327  //--------------------------------------------------------------------------
1328  struct pgwrt_t
1329  {
1330  pgwrt_t( ResponseHandler *h ) : handler( h ), status( nullptr )
1331  {
1332  }
1333 
1334  ~pgwrt_t()
1335  {
1336  if( handler )
1337  {
1338  // if all retries were successful no error status was set
1339  if( !status ) status = new XRootDStatus();
1340  handler->HandleResponse( status, nullptr );
1341  }
1342  }
1343 
1344  static size_t GetPgNb( uint64_t pgoff, uint64_t offset, uint32_t fstpglen )
1345  {
1346  if( pgoff == offset ) return 0; // we need this if statement because we operate on unsigned integers
1347  return ( pgoff - ( offset + fstpglen ) ) / XrdSys::PageSize + 1;
1348  }
1349 
1350  inline void SetStatus( XRootDStatus* s )
1351  {
1352  if( !status ) status = s;
1353  else delete s;
1354  }
1355 
1356  ResponseHandler *handler;
1357  XRootDStatus *status;
1358  };
1359  auto pgwrt = std::make_shared<pgwrt_t>( handler );
1360 
1361  int fLen, lLen;
1362  XrdOucPgrwUtils::csNum( offset, size, fLen, lLen );
1363  uint32_t fstpglen = fLen;
1364 
1365  time_t start = ::time( nullptr );
1366  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1367  {
1368  std::unique_ptr<AnyObject> scoped( r );
1369  // if the request failed simply pass the status to the
1370  // user handler
1371  if( !s->IsOK() )
1372  {
1373  pgwrt->SetStatus( s );
1374  return; // pgwrt destructor will call the handler
1375  }
1376  // also if the request was sucessful and there were no
1377  // corrupted pages pass the status to the user handler
1378  RetryInfo *inf = nullptr;
1379  r->Get( inf );
1380  if( !inf->NeedRetry() )
1381  {
1382  pgwrt->SetStatus( s );
1383  return; // pgwrt destructor will call the handler
1384  }
1385  delete s;
1386  // first adjust the timeout value
1387  uint16_t elapsed = ::time( nullptr ) - start;
1388  if( elapsed >= timeout )
1389  {
1390  pgwrt->SetStatus( new XRootDStatus( stError, errOperationExpired ) );
1391  return; // pgwrt destructor will call the handler
1392  }
1393  else timeout -= elapsed;
1394  // retransmit the corrupted pages
1395  for( size_t i = 0; i < inf->Size(); ++i )
1396  {
1397  auto tpl = inf->At( i );
1398  uint64_t pgoff = std::get<0>( tpl );
1399  uint32_t pglen = std::get<1>( tpl );
1400  const void *pgbuf = static_cast<const char*>( buffer ) + ( pgoff - offset );
1401  uint32_t pgdigest = cksums[pgwrt_t::GetPgNb( pgoff, offset, fstpglen )];
1402  auto h = ResponseHandler::Wrap( [=]( XrdCl::XRootDStatus *s, XrdCl::AnyObject *r ) mutable
1403  {
1404  std::unique_ptr<AnyObject> scoped( r );
1405  // if we failed simply set the status
1406  if( !s->IsOK() )
1407  {
1408  pgwrt->SetStatus( s );
1409  return; // the destructor will call the handler
1410  }
1411  delete s;
1412  // otherwise check if the data were not corrupted again
1413  RetryInfo *inf = nullptr;
1414  r->Get( inf );
1415  if( inf->NeedRetry() ) // so we failed in the end
1416  {
1417  DefaultEnv::GetLog()->Warning( FileMsg, "[0x%x@%s] Failed retransmitting corrupted "
1418  "page: pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1419  self->pFileUrl->GetObfuscatedURL().c_str(), pgoff, pglen, pgdigest );
1420  pgwrt->SetStatus( new XRootDStatus( stError, errDataError, 0,
1421  "Failed to retransmit corrupted page" ) );
1422  }
1423  else
1424  DefaultEnv::GetLog()->Info( FileMsg, "[0x%x@%s] Succesfuly retransmitted corrupted "
1425  "page: pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1426  self->pFileUrl->GetObfuscatedURL().c_str(), pgoff, pglen, pgdigest );
1427  } );
1428  auto st = PgWriteRetry( self, pgoff, pglen, pgbuf, pgdigest, h, timeout );
1429  if( !st.IsOK() ) pgwrt->SetStatus( new XRootDStatus( st ) );
1430  DefaultEnv::GetLog()->Info( FileMsg, "[0x%x@%s] Retransmitting corrupted page: "
1431  "pgoff=%llu, pglen=%du, pgdigest=%du", self.get(),
1432  self->pFileUrl->GetObfuscatedURL().c_str(), pgoff, pglen, pgdigest );
1433  }
1434  } );
1435 
1436  auto st = PgWriteImpl( self, offset, size, buffer, cksums, 0, h, timeout );
1437  if( !st.IsOK() )
1438  {
1439  pgwrt->handler = nullptr;
1440  delete h;
1441  }
1442  return st;
1443  }
1444 
1445  //------------------------------------------------------------------------
1446  // Write number of pages at a given offset - async
1447  //------------------------------------------------------------------------
1448  XRootDStatus FileStateHandler::PgWriteRetry( std::shared_ptr<FileStateHandler> &self,
1449  uint64_t offset,
1450  uint32_t size,
1451  const void *buffer,
1452  uint32_t digest,
1453  ResponseHandler *handler,
1454  uint16_t timeout )
1455  {
1456  std::vector<uint32_t> cksums{ digest };
1457  return PgWriteImpl( self, offset, size, buffer, cksums, PgReadFlags::Retry, handler, timeout );
1458  }
1459 
1460  //------------------------------------------------------------------------
1461  // Write number of pages at a given offset - async
1462  //------------------------------------------------------------------------
1463  XRootDStatus FileStateHandler::PgWriteImpl( std::shared_ptr<FileStateHandler> &self,
1464  uint64_t offset,
1465  uint32_t size,
1466  const void *buffer,
1467  std::vector<uint32_t> &cksums,
1468  kXR_char flags,
1469  ResponseHandler *handler,
1470  uint16_t timeout )
1471  {
1472  XrdSysMutexHelper scopedLock( self->pMutex );
1473 
1474  if( self->pFileState == Error ) return self->pStatus;
1475 
1476  if( self->pFileState != Opened && self->pFileState != Recovering )
1477  return XRootDStatus( stError, errInvalidOp );
1478 
1479  Log *log = DefaultEnv::GetLog();
1480  log->Debug( FileMsg, "[0x%x@%s] Sending a pgwrite command for handle 0x%x to "
1481  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1482  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1483 
1484  //--------------------------------------------------------------------------
1485  // Create the message
1486  //--------------------------------------------------------------------------
1487  Message *msg;
1488  ClientPgWriteRequest *req;
1489  MessageUtils::CreateRequest( msg, req );
1490 
1491  req->requestid = kXR_pgwrite;
1492  req->offset = offset;
1493  req->dlen = size + cksums.size() * sizeof( uint32_t );
1494  req->reqflags = flags;
1495  memcpy( req->fhandle, self->pFileHandle, 4 );
1496 
1497  ChunkList *list = new ChunkList();
1498  list->push_back( ChunkInfo( offset, size, (char*)buffer ) );
1499 
1500  MessageSendParams params;
1501  params.timeout = timeout;
1502  params.followRedirects = false;
1503  params.stateful = true;
1504  params.chunkList = list;
1505  params.crc32cDigests.swap( cksums );
1506 
1508 
1510  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1511 
1512  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1513  }
1514 
1515  //----------------------------------------------------------------------------
1516  // Commit all pending disk writes - async
1517  //----------------------------------------------------------------------------
1518  XRootDStatus FileStateHandler::Sync( std::shared_ptr<FileStateHandler> &self,
1519  ResponseHandler *handler,
1520  uint16_t timeout )
1521  {
1522  XrdSysMutexHelper scopedLock( self->pMutex );
1523 
1524  if( self->pFileState == Error ) return self->pStatus;
1525 
1526  if( self->pFileState != Opened && self->pFileState != Recovering )
1527  return XRootDStatus( stError, errInvalidOp );
1528 
1529  Log *log = DefaultEnv::GetLog();
1530  log->Debug( FileMsg, "[0x%x@%s] Sending a sync command for handle 0x%x to "
1531  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1532  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1533 
1534  Message *msg;
1535  ClientSyncRequest *req;
1536  MessageUtils::CreateRequest( msg, req );
1537 
1538  req->requestid = kXR_sync;
1539  memcpy( req->fhandle, self->pFileHandle, 4 );
1540 
1541  MessageSendParams params;
1542  params.timeout = timeout;
1543  params.followRedirects = false;
1544  params.stateful = true;
1546 
1548  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1549 
1550  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1551  }
1552 
1553  //----------------------------------------------------------------------------
1554  // Truncate the file to a particular size - async
1555  //----------------------------------------------------------------------------
1556  XRootDStatus FileStateHandler::Truncate( std::shared_ptr<FileStateHandler> &self,
1557  uint64_t size,
1558  ResponseHandler *handler,
1559  uint16_t timeout )
1560  {
1561  XrdSysMutexHelper scopedLock( self->pMutex );
1562 
1563  if( self->pFileState == Error ) return self->pStatus;
1564 
1565  if( self->pFileState != Opened && self->pFileState != Recovering )
1566  return XRootDStatus( stError, errInvalidOp );
1567 
1568  Log *log = DefaultEnv::GetLog();
1569  log->Debug( FileMsg, "[0x%x@%s] Sending a truncate command for handle 0x%x to "
1570  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1571  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1572 
1573  Message *msg;
1574  ClientTruncateRequest *req;
1575  MessageUtils::CreateRequest( msg, req );
1576 
1577  req->requestid = kXR_truncate;
1578  memcpy( req->fhandle, self->pFileHandle, 4 );
1579  req->offset = size;
1580 
1581  MessageSendParams params;
1582  params.timeout = timeout;
1583  params.followRedirects = false;
1584  params.stateful = true;
1586 
1588  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1589 
1590  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1591  }
1592 
1593  //----------------------------------------------------------------------------
1594  // Read scattered data chunks in one operation - async
1595  //----------------------------------------------------------------------------
1596  XRootDStatus FileStateHandler::VectorRead( std::shared_ptr<FileStateHandler> &self,
1597  const ChunkList &chunks,
1598  void *buffer,
1599  ResponseHandler *handler,
1600  uint16_t timeout )
1601  {
1602  //--------------------------------------------------------------------------
1603  // Sanity check
1604  //--------------------------------------------------------------------------
1605  XrdSysMutexHelper scopedLock( self->pMutex );
1606 
1607  if( self->pFileState == Error ) return self->pStatus;
1608 
1609  if( self->pFileState != Opened && self->pFileState != Recovering )
1610  return XRootDStatus( stError, errInvalidOp );
1611 
1612  Log *log = DefaultEnv::GetLog();
1613  log->Debug( FileMsg, "[0x%x@%s] Sending a vector read command for handle "
1614  "0x%x to %s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1615  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1616 
1617  //--------------------------------------------------------------------------
1618  // Build the message
1619  //--------------------------------------------------------------------------
1620  Message *msg;
1621  ClientReadVRequest *req;
1622  MessageUtils::CreateRequest( msg, req, sizeof(readahead_list)*chunks.size() );
1623 
1624  req->requestid = kXR_readv;
1625  req->dlen = sizeof(readahead_list)*chunks.size();
1626 
1627  ChunkList *list = new ChunkList();
1628  char *cursor = (char*)buffer;
1629 
1630  //--------------------------------------------------------------------------
1631  // Copy the chunk info
1632  //--------------------------------------------------------------------------
1633  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
1634  for( size_t i = 0; i < chunks.size(); ++i )
1635  {
1636  dataChunk[i].rlen = chunks[i].length;
1637  dataChunk[i].offset = chunks[i].offset;
1638  memcpy( dataChunk[i].fhandle, self->pFileHandle, 4 );
1639 
1640  void *chunkBuffer;
1641  if( cursor )
1642  {
1643  chunkBuffer = cursor;
1644  cursor += chunks[i].length;
1645  }
1646  else
1647  chunkBuffer = chunks[i].buffer;
1648 
1649  list->push_back( ChunkInfo( chunks[i].offset,
1650  chunks[i].length,
1651  chunkBuffer ) );
1652  }
1653 
1654  //--------------------------------------------------------------------------
1655  // Send the message
1656  //--------------------------------------------------------------------------
1657  MessageSendParams params;
1658  params.timeout = timeout;
1659  params.followRedirects = false;
1660  params.stateful = true;
1661  params.chunkList = list;
1663 
1665  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1666 
1667  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1668  }
1669 
1670  //------------------------------------------------------------------------
1671  // Write scattered data chunks in one operation - async
1672  //------------------------------------------------------------------------
1673  XRootDStatus FileStateHandler::VectorWrite( std::shared_ptr<FileStateHandler> &self,
1674  const ChunkList &chunks,
1675  ResponseHandler *handler,
1676  uint16_t timeout )
1677  {
1678  //--------------------------------------------------------------------------
1679  // Sanity check
1680  //--------------------------------------------------------------------------
1681  XrdSysMutexHelper scopedLock( self->pMutex );
1682 
1683  if( self->pFileState == Error ) return self->pStatus;
1684 
1685  if( self->pFileState != Opened && self->pFileState != Recovering )
1686  return XRootDStatus( stError, errInvalidOp );
1687 
1688  Log *log = DefaultEnv::GetLog();
1689  log->Debug( FileMsg, "[0x%x@%s] Sending a vector write command for handle "
1690  "0x%x to %s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1691  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1692 
1693  //--------------------------------------------------------------------------
1694  // Determine the size of the payload
1695  //--------------------------------------------------------------------------
1696 
1697  // the size of write vector
1698  uint32_t payloadSize = sizeof(XrdProto::write_list) * chunks.size();
1699 
1700  //--------------------------------------------------------------------------
1701  // Build the message
1702  //--------------------------------------------------------------------------
1703  Message *msg;
1704  ClientWriteVRequest *req;
1705  MessageUtils::CreateRequest( msg, req, payloadSize );
1706 
1707  req->requestid = kXR_writev;
1708  req->dlen = sizeof(XrdProto::write_list) * chunks.size();
1709 
1710  ChunkList *list = new ChunkList();
1711 
1712  //--------------------------------------------------------------------------
1713  // Copy the chunk info
1714  //--------------------------------------------------------------------------
1715  XrdProto::write_list *writeList =
1716  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
1717 
1718 
1719 
1720  for( size_t i = 0; i < chunks.size(); ++i )
1721  {
1722  writeList[i].wlen = chunks[i].length;
1723  writeList[i].offset = chunks[i].offset;
1724  memcpy( writeList[i].fhandle, self->pFileHandle, 4 );
1725 
1726  list->push_back( ChunkInfo( chunks[i].offset,
1727  chunks[i].length,
1728  chunks[i].buffer ) );
1729  }
1730 
1731  //--------------------------------------------------------------------------
1732  // Send the message
1733  //--------------------------------------------------------------------------
1734  MessageSendParams params;
1735  params.timeout = timeout;
1736  params.followRedirects = false;
1737  params.stateful = true;
1738  params.chunkList = list;
1740 
1742  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1743 
1744  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1745  }
1746 
1747  //------------------------------------------------------------------------
1748  // Write scattered buffers in one operation - async
1749  //------------------------------------------------------------------------
1750  XRootDStatus FileStateHandler::WriteV( std::shared_ptr<FileStateHandler> &self,
1751  uint64_t offset,
1752  const struct iovec *iov,
1753  int iovcnt,
1754  ResponseHandler *handler,
1755  uint16_t timeout )
1756  {
1757  XrdSysMutexHelper scopedLock( self->pMutex );
1758 
1759  if( self->pFileState == Error ) return self->pStatus;
1760 
1761  if( self->pFileState != Opened && self->pFileState != Recovering )
1762  return XRootDStatus( stError, errInvalidOp );
1763 
1764  Log *log = DefaultEnv::GetLog();
1765  log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
1766  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1767  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1768 
1769  Message *msg;
1770  ClientWriteRequest *req;
1771  MessageUtils::CreateRequest( msg, req );
1772 
1773  ChunkList *list = new ChunkList();
1774 
1775  uint32_t size = 0;
1776  for( int i = 0; i < iovcnt; ++i )
1777  {
1778  if( iov[i].iov_len == 0 ) continue;
1779  size += iov[i].iov_len;
1780  list->push_back( ChunkInfo( 0, iov[i].iov_len,
1781  (char*)iov[i].iov_base ) );
1782  }
1783 
1784  req->requestid = kXR_write;
1785  req->offset = offset;
1786  req->dlen = size;
1787  memcpy( req->fhandle, self->pFileHandle, 4 );
1788 
1789  MessageSendParams params;
1790  params.timeout = timeout;
1791  params.followRedirects = false;
1792  params.stateful = true;
1793  params.chunkList = list;
1794 
1796 
1798  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1799 
1800  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1801  }
1802 
1803  //------------------------------------------------------------------------
1804  // Read data into scattered buffers in one operation - async
1805  //------------------------------------------------------------------------
1806  XRootDStatus FileStateHandler::ReadV( std::shared_ptr<FileStateHandler> &self,
1807  uint64_t offset,
1808  struct iovec *iov,
1809  int iovcnt,
1810  ResponseHandler *handler,
1811  uint16_t timeout )
1812  {
1813  XrdSysMutexHelper scopedLock( self->pMutex );
1814 
1815  if( self->pFileState == Error ) return self->pStatus;
1816 
1817  if( self->pFileState != Opened && self->pFileState != Recovering )
1818  return XRootDStatus( stError, errInvalidOp );
1819 
1820  Log *log = DefaultEnv::GetLog();
1821  log->Debug( FileMsg, "[0x%x@%s] Sending a read command for handle 0x%x to "
1822  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1823  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1824 
1825  Message *msg;
1826  ClientReadRequest *req;
1827  MessageUtils::CreateRequest( msg, req );
1828 
1829  // calculate the total read size
1830  size_t size = std::accumulate( iov, iov + iovcnt, 0, []( size_t acc, iovec &rhs )
1831  {
1832  return acc + rhs.iov_len;
1833  } );
1834  req->requestid = kXR_read;
1835  req->offset = offset;
1836  req->rlen = size;
1837  msg->SetVirtReqID( kXR_virtReadv );
1838  memcpy( req->fhandle, self->pFileHandle, 4 );
1839 
1840  ChunkList *list = new ChunkList();
1841  list->reserve( iovcnt );
1842  uint64_t choff = offset;
1843  for( int i = 0; i < iovcnt; ++i )
1844  {
1845  list->emplace_back( choff, iov[i].iov_len, iov[i].iov_base );
1846  choff += iov[i].iov_len;
1847  }
1848 
1850  MessageSendParams params;
1851  params.timeout = timeout;
1852  params.followRedirects = false;
1853  params.stateful = true;
1854  params.chunkList = list;
1856  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1857 
1858  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1859  }
1860 
1861  //----------------------------------------------------------------------------
1862  // Performs a custom operation on an open file, server implementation
1863  // dependent - async
1864  //----------------------------------------------------------------------------
1865  XRootDStatus FileStateHandler::Fcntl( std::shared_ptr<FileStateHandler> &self,
1866  const Buffer &arg,
1867  ResponseHandler *handler,
1868  uint16_t timeout )
1869  {
1870  XrdSysMutexHelper scopedLock( self->pMutex );
1871 
1872  if( self->pFileState == Error ) return self->pStatus;
1873 
1874  if( self->pFileState != Opened && self->pFileState != Recovering )
1875  return XRootDStatus( stError, errInvalidOp );
1876 
1877  Log *log = DefaultEnv::GetLog();
1878  log->Debug( FileMsg, "[0x%x@%s] Sending a fcntl command for handle 0x%x to "
1879  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1880  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1881 
1882  Message *msg;
1883  ClientQueryRequest *req;
1884  MessageUtils::CreateRequest( msg, req, arg.GetSize() );
1885 
1886  req->requestid = kXR_query;
1887  req->infotype = kXR_Qopaqug;
1888  req->dlen = arg.GetSize();
1889  memcpy( req->fhandle, self->pFileHandle, 4 );
1890  msg->Append( arg.GetBuffer(), arg.GetSize(), 24 );
1891 
1892  MessageSendParams params;
1893  params.timeout = timeout;
1894  params.followRedirects = false;
1895  params.stateful = true;
1897 
1899  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1900 
1901  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1902  }
1903 
1904  //----------------------------------------------------------------------------
1905  // Get access token to a file - async
1906  //----------------------------------------------------------------------------
1907  XRootDStatus FileStateHandler::Visa( std::shared_ptr<FileStateHandler> &self,
1908  ResponseHandler *handler,
1909  uint16_t timeout )
1910  {
1911  XrdSysMutexHelper scopedLock( self->pMutex );
1912 
1913  if( self->pFileState == Error ) return self->pStatus;
1914 
1915  if( self->pFileState != Opened && self->pFileState != Recovering )
1916  return XRootDStatus( stError, errInvalidOp );
1917 
1918  Log *log = DefaultEnv::GetLog();
1919  log->Debug( FileMsg, "[0x%x@%s] Sending a visa command for handle 0x%x to "
1920  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1921  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1922 
1923  Message *msg;
1924  ClientQueryRequest *req;
1925  MessageUtils::CreateRequest( msg, req );
1926 
1927  req->requestid = kXR_query;
1928  req->infotype = kXR_Qvisa;
1929  memcpy( req->fhandle, self->pFileHandle, 4 );
1930 
1931  MessageSendParams params;
1932  params.timeout = timeout;
1933  params.followRedirects = false;
1934  params.stateful = true;
1936 
1938  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
1939 
1940  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
1941  }
1942 
1943  //------------------------------------------------------------------------
1944  // Set extended attributes - async
1945  //------------------------------------------------------------------------
1946  XRootDStatus FileStateHandler::SetXAttr( std::shared_ptr<FileStateHandler> &self,
1947  const std::vector<xattr_t> &attrs,
1948  ResponseHandler *handler,
1949  uint16_t timeout )
1950  {
1951  XrdSysMutexHelper scopedLock( self->pMutex );
1952 
1953  if( self->pFileState == Error ) return self->pStatus;
1954 
1955  if( self->pFileState != Opened && self->pFileState != Recovering )
1956  return XRootDStatus( stError, errInvalidOp );
1957 
1958  Log *log = DefaultEnv::GetLog();
1959  log->Debug( FileMsg, "[0x%x@%s] Sending a fattr set command for handle 0x%x to "
1960  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1961  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1962 
1963  //--------------------------------------------------------------------------
1964  // Issue a new fattr get request
1965  //--------------------------------------------------------------------------
1966  return XAttrOperationImpl( self, kXR_fattrSet, 0, attrs, handler, timeout );
1967  }
1968 
1969  //------------------------------------------------------------------------
1970  // Get extended attributes - async
1971  //------------------------------------------------------------------------
1972  XRootDStatus FileStateHandler::GetXAttr( std::shared_ptr<FileStateHandler> &self,
1973  const std::vector<std::string> &attrs,
1974  ResponseHandler *handler,
1975  uint16_t timeout )
1976  {
1977  XrdSysMutexHelper scopedLock( self->pMutex );
1978 
1979  if( self->pFileState == Error ) return self->pStatus;
1980 
1981  if( self->pFileState != Opened && self->pFileState != Recovering )
1982  return XRootDStatus( stError, errInvalidOp );
1983 
1984  Log *log = DefaultEnv::GetLog();
1985  log->Debug( FileMsg, "[0x%x@%s] Sending a fattr get command for handle 0x%x to "
1986  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
1987  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
1988 
1989  //--------------------------------------------------------------------------
1990  // Issue a new fattr get request
1991  //--------------------------------------------------------------------------
1992  return XAttrOperationImpl( self, kXR_fattrGet, 0, attrs, handler, timeout );
1993  }
1994 
1995  //------------------------------------------------------------------------
1996  // Delete extended attributes - async
1997  //------------------------------------------------------------------------
1998  XRootDStatus FileStateHandler::DelXAttr( std::shared_ptr<FileStateHandler> &self,
1999  const std::vector<std::string> &attrs,
2000  ResponseHandler *handler,
2001  uint16_t timeout )
2002  {
2003  XrdSysMutexHelper scopedLock( self->pMutex );
2004 
2005  if( self->pFileState == Error ) return self->pStatus;
2006 
2007  if( self->pFileState != Opened && self->pFileState != Recovering )
2008  return XRootDStatus( stError, errInvalidOp );
2009 
2010  Log *log = DefaultEnv::GetLog();
2011  log->Debug( FileMsg, "[0x%x@%s] Sending a fattr del command for handle 0x%x to "
2012  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2013  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2014 
2015  //--------------------------------------------------------------------------
2016  // Issue a new fattr del request
2017  //--------------------------------------------------------------------------
2018  return XAttrOperationImpl( self, kXR_fattrDel, 0, attrs, handler, timeout );
2019  }
2020 
2021  //------------------------------------------------------------------------
2022  // List extended attributes - async
2023  //------------------------------------------------------------------------
2024  XRootDStatus FileStateHandler::ListXAttr( std::shared_ptr<FileStateHandler> &self,
2025  ResponseHandler *handler,
2026  uint16_t timeout )
2027  {
2028  XrdSysMutexHelper scopedLock( self->pMutex );
2029 
2030  if( self->pFileState == Error ) return self->pStatus;
2031 
2032  if( self->pFileState != Opened && self->pFileState != Recovering )
2033  return XRootDStatus( stError, errInvalidOp );
2034 
2035  Log *log = DefaultEnv::GetLog();
2036  log->Debug( FileMsg, "[0x%x@%s] Sending a fattr list command for handle 0x%x to "
2037  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2038  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2039 
2040  //--------------------------------------------------------------------------
2041  // Issue a new fattr get request
2042  //--------------------------------------------------------------------------
2043  static const std::vector<std::string> nothing;
2044  return XAttrOperationImpl( self, kXR_fattrList, ClientFattrRequest::aData,
2045  nothing, handler, timeout );
2046  }
2047 
2048  //------------------------------------------------------------------------
2058  //------------------------------------------------------------------------
2059  XRootDStatus FileStateHandler::Checkpoint( std::shared_ptr<FileStateHandler> &self,
2060  kXR_char code,
2061  ResponseHandler *handler,
2062  uint16_t timeout )
2063  {
2064  XrdSysMutexHelper scopedLock( self->pMutex );
2065 
2066  if( self->pFileState == Error ) return self->pStatus;
2067 
2068  if( self->pFileState != Opened && self->pFileState != Recovering )
2069  return XRootDStatus( stError, errInvalidOp );
2070 
2071  Log *log = DefaultEnv::GetLog();
2072  log->Debug( FileMsg, "[0x%x@%s] Sending a checkpoint command for "
2073  "handle 0x%x to %s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2074  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2075 
2076  Message *msg;
2077  ClientChkPointRequest *req;
2078  MessageUtils::CreateRequest( msg, req );
2079 
2080  req->requestid = kXR_chkpoint;
2081  req->opcode = code;
2082  memcpy( req->fhandle, self->pFileHandle, 4 );
2083 
2084  MessageSendParams params;
2085  params.timeout = timeout;
2086  params.followRedirects = false;
2087  params.stateful = true;
2088 
2090 
2092  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2093 
2094  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2095  }
2096 
2097  //------------------------------------------------------------------------
2107  //------------------------------------------------------------------------
2108  XRootDStatus FileStateHandler::ChkptWrt( std::shared_ptr<FileStateHandler> &self,
2109  uint64_t offset,
2110  uint32_t size,
2111  const void *buffer,
2112  ResponseHandler *handler,
2113  uint16_t timeout )
2114  {
2115  XrdSysMutexHelper scopedLock( self->pMutex );
2116 
2117  if( self->pFileState == Error ) return self->pStatus;
2118 
2119  if( self->pFileState != Opened && self->pFileState != Recovering )
2120  return XRootDStatus( stError, errInvalidOp );
2121 
2122  Log *log = DefaultEnv::GetLog();
2123  log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
2124  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2125  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2126 
2127  Message *msg;
2128  ClientChkPointRequest *req;
2129  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2130 
2131  req->requestid = kXR_chkpoint;
2132  req->opcode = kXR_ckpXeq;
2133  req->dlen = 24; // as specified in the protocol specification
2134  memcpy( req->fhandle, self->pFileHandle, 4 );
2135 
2137  wrtreq->requestid = kXR_write;
2138  wrtreq->offset = offset;
2139  wrtreq->dlen = size;
2140  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2141 
2142  ChunkList *list = new ChunkList();
2143  list->push_back( ChunkInfo( 0, size, (char*)buffer ) );
2144 
2145  MessageSendParams params;
2146  params.timeout = timeout;
2147  params.followRedirects = false;
2148  params.stateful = true;
2149  params.chunkList = list;
2150 
2152 
2154  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2155 
2156  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2157  }
2158 
2159  //------------------------------------------------------------------------
2169  //------------------------------------------------------------------------
2170  XRootDStatus FileStateHandler::ChkptWrtV( std::shared_ptr<FileStateHandler> &self,
2171  uint64_t offset,
2172  const struct iovec *iov,
2173  int iovcnt,
2174  ResponseHandler *handler,
2175  uint16_t timeout )
2176  {
2177  XrdSysMutexHelper scopedLock( self->pMutex );
2178 
2179  if( self->pFileState == Error ) return self->pStatus;
2180 
2181  if( self->pFileState != Opened && self->pFileState != Recovering )
2182  return XRootDStatus( stError, errInvalidOp );
2183 
2184  Log *log = DefaultEnv::GetLog();
2185  log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
2186  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2187  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
2188 
2189  Message *msg;
2190  ClientChkPointRequest *req;
2191  MessageUtils::CreateRequest( msg, req, sizeof( ClientWriteRequest ) );
2192 
2193  req->requestid = kXR_chkpoint;
2194  req->opcode = kXR_ckpXeq;
2195  req->dlen = 24; // as specified in the protocol specification
2196  memcpy( req->fhandle, self->pFileHandle, 4 );
2197 
2198  ChunkList *list = new ChunkList();
2199  uint32_t size = 0;
2200  for( int i = 0; i < iovcnt; ++i )
2201  {
2202  if( iov[i].iov_len == 0 ) continue;
2203  size += iov[i].iov_len;
2204  list->push_back( ChunkInfo( 0, iov[i].iov_len,
2205  (char*)iov[i].iov_base ) );
2206  }
2207 
2209  wrtreq->requestid = kXR_write;
2210  wrtreq->offset = offset;
2211  wrtreq->dlen = size;
2212  memcpy( wrtreq->fhandle, self->pFileHandle, 4 );
2213 
2214  MessageSendParams params;
2215  params.timeout = timeout;
2216  params.followRedirects = false;
2217  params.stateful = true;
2218  params.chunkList = list;
2219 
2221 
2223  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2224 
2225  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2226  }
2227 
2228  //----------------------------------------------------------------------------
2229  // Check if the file is open
2230  //----------------------------------------------------------------------------
2232  {
2233  XrdSysMutexHelper scopedLock( pMutex );
2234 
2235  if( pFileState == Opened || pFileState == Recovering )
2236  return true;
2237  return false;
2238  }
2239 
2240  //----------------------------------------------------------------------------
2241  // Set file property
2242  //----------------------------------------------------------------------------
2243  bool FileStateHandler::SetProperty( const std::string &name,
2244  const std::string &value )
2245  {
2246  XrdSysMutexHelper scopedLock( pMutex );
2247  if( name == "ReadRecovery" )
2248  {
2249  if( value == "true" ) pDoRecoverRead = true;
2250  else pDoRecoverRead = false;
2251  return true;
2252  }
2253  else if( name == "WriteRecovery" )
2254  {
2255  if( value == "true" ) pDoRecoverWrite = true;
2256  else pDoRecoverWrite = false;
2257  return true;
2258  }
2259  else if( name == "FollowRedirects" )
2260  {
2261  if( value == "true" ) pFollowRedirects = true;
2262  else pFollowRedirects = false;
2263  return true;
2264  }
2265  else if( name == "BundledClose" )
2266  {
2267  if( value == "true" ) pAllowBundledClose = true;
2268  else pAllowBundledClose = false;
2269  return true;
2270  }
2271  return false;
2272  }
2273 
2274  //----------------------------------------------------------------------------
2275  // Get file property
2276  //----------------------------------------------------------------------------
2277  bool FileStateHandler::GetProperty( const std::string &name,
2278  std::string &value ) const
2279  {
2280  XrdSysMutexHelper scopedLock( pMutex );
2281  if( name == "ReadRecovery" )
2282  {
2283  if( pDoRecoverRead ) value = "true";
2284  else value = "false";
2285  return true;
2286  }
2287  else if( name == "WriteRecovery" )
2288  {
2289  if( pDoRecoverWrite ) value = "true";
2290  else value = "false";
2291  return true;
2292  }
2293  else if( name == "FollowRedirects" )
2294  {
2295  if( pFollowRedirects ) value = "true";
2296  else value = "false";
2297  return true;
2298  }
2299  else if( name == "DataServer" && pDataServer )
2300  { value = pDataServer->GetHostId(); return true; }
2301  else if( name == "LastURL" && pDataServer )
2302  { value = pDataServer->GetURL(); return true; }
2303  else if( name == "WrtRecoveryRedir" && pWrtRecoveryRedir )
2304  { value = pWrtRecoveryRedir->GetHostId(); return true; }
2305  value = "";
2306  return false;
2307  }
2308 
2309  //----------------------------------------------------------------------------
2310  // Process the results of the opening operation
2311  //----------------------------------------------------------------------------
2313  const OpenInfo *openInfo,
2314  const HostList *hostList )
2315  {
2316  Log *log = DefaultEnv::GetLog();
2317  XrdSysMutexHelper scopedLock( pMutex );
2318 
2319  //--------------------------------------------------------------------------
2320  // Assign the data server and the load balancer
2321  //--------------------------------------------------------------------------
2322  std::string lastServer = pFileUrl->GetHostId();
2323  if( hostList )
2324  {
2325  delete pDataServer;
2326  delete pLoadBalancer;
2327  pLoadBalancer = 0;
2328  delete pWrtRecoveryRedir;
2329  pWrtRecoveryRedir = 0;
2330 
2331  pDataServer = new URL( hostList->back().url );
2332  pDataServer->SetParams( pFileUrl->GetParams() );
2333  if( !( pUseVirtRedirector && pFileUrl->IsMetalink() ) ) pDataServer->SetPath( pFileUrl->GetPath() );
2334  lastServer = pDataServer->GetHostId();
2335  HostList::const_iterator itC;
2336  URL::ParamsMap params = pDataServer->GetParams();
2337  for( itC = hostList->begin(); itC != hostList->end(); ++itC )
2338  {
2339  MessageUtils::MergeCGI( params,
2340  itC->url.GetParams(),
2341  true );
2342  }
2343  pDataServer->SetParams( params );
2344 
2345  HostList::const_reverse_iterator it;
2346  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2347  if( it->loadBalancer )
2348  {
2349  pLoadBalancer = new URL( it->url );
2350  break;
2351  }
2352 
2353  for( it = hostList->rbegin(); it != hostList->rend(); ++it )
2354  if( it->flags & kXR_recoverWrts )
2355  {
2356  pWrtRecoveryRedir = new URL( it->url );
2357  break;
2358  }
2359  }
2360 
2361  log->Debug(FileMsg, "[0x%x@%s] Open has returned with status %s",
2362  this, pFileUrl->GetObfuscatedURL().c_str(), status->ToStr().c_str() );
2363 
2364  if( pDataServer && !pDataServer->IsLocalFile() )
2365  {
2366  //------------------------------------------------------------------------
2367  // Check if we are using a secure connection
2368  //------------------------------------------------------------------------
2369  XrdCl::AnyObject isencobj;
2371  QueryTransport( *pDataServer, XRootDQuery::IsEncrypted, isencobj );
2372  if( st.IsOK() )
2373  {
2374  bool *isenc;
2375  isencobj.Get( isenc );
2376  pIsChannelEncrypted = *isenc;
2377  delete isenc;
2378  }
2379  }
2380 
2381  //--------------------------------------------------------------------------
2382  // We have failed
2383  //--------------------------------------------------------------------------
2384  pStatus = *status;
2385  if( !pStatus.IsOK() || !openInfo )
2386  {
2387  log->Debug(FileMsg, "[0x%x@%s] Error while opening at %s: %s",
2388  this, pFileUrl->GetObfuscatedURL().c_str(), lastServer.c_str(),
2389  pStatus.ToStr().c_str() );
2390  FailQueuedMessages( pStatus );
2391  pFileState = Error;
2392 
2393  //------------------------------------------------------------------------
2394  // Report to monitoring
2395  //------------------------------------------------------------------------
2397  if( mon )
2398  {
2400  i.file = pFileUrl;
2401  i.status = status;
2403  mon->Event( Monitor::EvErrIO, &i );
2404  }
2405  }
2406  //--------------------------------------------------------------------------
2407  // We have succeeded
2408  //--------------------------------------------------------------------------
2409  else
2410  {
2411  //------------------------------------------------------------------------
2412  // Store the response info
2413  //------------------------------------------------------------------------
2414  openInfo->GetFileHandle( pFileHandle );
2415  pSessionId = openInfo->GetSessionId();
2416  if( openInfo->GetStatInfo() )
2417  {
2418  delete pStatInfo;
2419  pStatInfo = new StatInfo( *openInfo->GetStatInfo() );
2420  }
2421 
2422  log->Debug( FileMsg, "[0x%x@%s] successfully opened at %s, handle: 0x%x, "
2423  "session id: %ld", this, pFileUrl->GetObfuscatedURL().c_str(),
2424  pDataServer->GetHostId().c_str(), *((uint32_t*)pFileHandle),
2425  pSessionId );
2426 
2427  //------------------------------------------------------------------------
2428  // Inform the monitoring about opening success
2429  //------------------------------------------------------------------------
2430  gettimeofday( &pOpenTime, 0 );
2432  if( mon )
2433  {
2435  i.file = pFileUrl;
2436  i.dataServer = pDataServer->GetHostId();
2437  i.oFlags = pOpenFlags;
2438  i.fSize = pStatInfo ? pStatInfo->GetSize() : 0;
2439  mon->Event( Monitor::EvOpen, &i );
2440  }
2441 
2442  //------------------------------------------------------------------------
2443  // Resend the queued messages if any
2444  //------------------------------------------------------------------------
2445  ReSendQueuedMessages();
2446  pFileState = Opened;
2447  }
2448  }
2449 
2450  //----------------------------------------------------------------------------
2451  // Process the results of the closing operation
2452  //----------------------------------------------------------------------------
2454  {
2455  Log *log = DefaultEnv::GetLog();
2456  XrdSysMutexHelper scopedLock( pMutex );
2457 
2458  log->Debug(FileMsg, "[0x%x@%s] Close returned from %s with: %s", this,
2459  pFileUrl->GetObfuscatedURL().c_str(), pDataServer->GetHostId().c_str(),
2460  status->ToStr().c_str() );
2461 
2462  log->Dump(FileMsg, "[0x%x@%s] Items in the fly %d, queued for recovery %d",
2463  this, pFileUrl->GetObfuscatedURL().c_str(), pInTheFly.size(),
2464  pToBeRecovered.size() );
2465 
2466  MonitorClose( status );
2467  ResetMonitoringVars();
2468 
2469  pStatus = *status;
2470  pFileState = Closed;
2471  }
2472 
2473  //----------------------------------------------------------------------------
2474  // Handle an error while sending a stateful message
2475  //----------------------------------------------------------------------------
2476  void FileStateHandler::OnStateError( std::shared_ptr<FileStateHandler> &self,
2477  XRootDStatus *status,
2478  Message *message,
2479  ResponseHandler *userHandler,
2480  MessageSendParams &sendParams )
2481  {
2482  //--------------------------------------------------------------------------
2483  // It may be a redirection
2484  //--------------------------------------------------------------------------
2485  if( !status->IsOK() && status->code == errRedirect && self->pFollowRedirects )
2486  {
2487  static const std::string root = "root", xroot = "xroot", file = "file",
2488  roots = "roots", xroots = "xroots";
2489  std::string msg = status->GetErrorMessage();
2490  if( !msg.compare( 0, root.size(), root ) ||
2491  !msg.compare( 0, xroot.size(), xroot ) ||
2492  !msg.compare( 0, file.size(), file ) ||
2493  !msg.compare( 0, roots.size(), roots ) ||
2494  !msg.compare( 0, xroots.size(), xroots ) )
2495  {
2496  FileStateHandler::OnStateRedirection( self, msg, message, userHandler, sendParams );
2497  return;
2498  }
2499  }
2500 
2501  //--------------------------------------------------------------------------
2502  // Handle error
2503  //--------------------------------------------------------------------------
2504  Log *log = DefaultEnv::GetLog();
2505  XrdSysMutexHelper scopedLock( self->pMutex );
2506  self->pInTheFly.erase( message );
2507 
2508  log->Dump( FileMsg, "[0x%x@%s] File state error encountered. Message %s "
2509  "returned with %s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2510  message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2511 
2512  //--------------------------------------------------------------------------
2513  // Report to monitoring
2514  //--------------------------------------------------------------------------
2516  if( mon )
2517  {
2519  i.file = self->pFileUrl;
2520  i.status = status;
2521 
2522  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2523  switch( req->header.requestid )
2524  {
2525  case kXR_read: i.opCode = Monitor::ErrorInfo::ErrRead; break;
2531  default: i.opCode = Monitor::ErrorInfo::ErrUnc;
2532  }
2533 
2534  mon->Event( Monitor::EvErrIO, &i );
2535  }
2536 
2537  //--------------------------------------------------------------------------
2538  // The message is not recoverable
2539  // (message using a kernel buffer is not recoverable by definition)
2540  //--------------------------------------------------------------------------
2541  if( !self->IsRecoverable( *status ) || sendParams.kbuff )
2542  {
2543  log->Error( FileMsg, "[0x%x@%s] Fatal file state error. Message %s "
2544  "returned with %s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2545  message->GetObfuscatedDescription().c_str(), status->ToStr().c_str() );
2546 
2547  self->FailMessage( RequestData( message, userHandler, sendParams ), *status );
2548  delete status;
2549  return;
2550  }
2551 
2552  //--------------------------------------------------------------------------
2553  // Insert the message to the recovery queue and start the recovery
2554  // procedure if we don't have any more message in the fly
2555  //--------------------------------------------------------------------------
2556  self->pCloseReason = *status;
2557  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2558  delete status;
2559  }
2560 
2561  //----------------------------------------------------------------------------
2562  // Handle stateful redirect
2563  //----------------------------------------------------------------------------
2564  void FileStateHandler::OnStateRedirection( std::shared_ptr<FileStateHandler> &self,
2565  const std::string &redirectUrl,
2566  Message *message,
2567  ResponseHandler *userHandler,
2568  MessageSendParams &sendParams )
2569  {
2570  XrdSysMutexHelper scopedLock( self->pMutex );
2571  self->pInTheFly.erase( message );
2572 
2573  //--------------------------------------------------------------------------
2574  // Register the state redirect url and append the new cgi information to
2575  // the file URL
2576  //--------------------------------------------------------------------------
2577  if( !self->pStateRedirect )
2578  {
2579  std::ostringstream o;
2580  self->pStateRedirect = new URL( redirectUrl );
2581  URL::ParamsMap params = self->pFileUrl->GetParams();
2582  MessageUtils::MergeCGI( params,
2583  self->pStateRedirect->GetParams(),
2584  false );
2585  self->pFileUrl->SetParams( params );
2586  }
2587 
2588  RecoverMessage( self, RequestData( message, userHandler, sendParams ) );
2589  }
2590 
2591  //----------------------------------------------------------------------------
2592  // Handle stateful response
2593  //----------------------------------------------------------------------------
2594  void FileStateHandler::OnStateResponse( std::shared_ptr<FileStateHandler> &self,
2595  XRootDStatus *status,
2596  Message *message,
2597  AnyObject *response,
2598  HostList */*urlList*/ )
2599  {
2600  Log *log = DefaultEnv::GetLog();
2601  XrdSysMutexHelper scopedLock( self->pMutex );
2602 
2603  log->Dump( FileMsg, "[0x%x@%s] Got state response for message %s",
2604  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2605  message->GetObfuscatedDescription().c_str() );
2606 
2607  //--------------------------------------------------------------------------
2608  // Since this message may be the last "in-the-fly" and no recovery
2609  // is done if messages are in the fly, we may need to trigger recovery
2610  //--------------------------------------------------------------------------
2611  self->pInTheFly.erase( message );
2612  RunRecovery( self );
2613 
2614  //--------------------------------------------------------------------------
2615  // Play with the actual response before returning it. This is a good
2616  // place to do caching in the future.
2617  //--------------------------------------------------------------------------
2618  ClientRequest *req = (ClientRequest*)message->GetBuffer();
2619  switch( req->header.requestid )
2620  {
2621  //------------------------------------------------------------------------
2622  // Cache the stat response
2623  //------------------------------------------------------------------------
2624  case kXR_stat:
2625  {
2626  StatInfo *info = 0;
2627  response->Get( info );
2628  delete self->pStatInfo;
2629  self->pStatInfo = new StatInfo( *info );
2630  break;
2631  }
2632 
2633  //------------------------------------------------------------------------
2634  // Handle read response
2635  //------------------------------------------------------------------------
2636  case kXR_read:
2637  {
2638  ++self->pRCount;
2639  self->pRBytes += req->read.rlen;
2640  break;
2641  }
2642 
2643  //------------------------------------------------------------------------
2644  // Handle read response
2645  //------------------------------------------------------------------------
2646  case kXR_pgread:
2647  {
2648  ++self->pRCount;
2649  self->pRBytes += req->pgread.rlen;
2650  break;
2651  }
2652 
2653  //------------------------------------------------------------------------
2654  // Handle readv response
2655  //------------------------------------------------------------------------
2656  case kXR_readv:
2657  {
2658  ++self->pVRCount;
2659  size_t segs = req->header.dlen/sizeof(readahead_list);
2660  readahead_list *dataChunk = (readahead_list*)message->GetBuffer( 24 );
2661  for( size_t i = 0; i < segs; ++i )
2662  self->pVRBytes += dataChunk[i].rlen;
2663  self->pVSegs += segs;
2664  break;
2665  }
2666 
2667  //------------------------------------------------------------------------
2668  // Handle write response
2669  //------------------------------------------------------------------------
2670  case kXR_write:
2671  {
2672  ++self->pWCount;
2673  self->pWBytes += req->write.dlen;
2674  break;
2675  }
2676 
2677  //------------------------------------------------------------------------
2678  // Handle write response
2679  //------------------------------------------------------------------------
2680  case kXR_pgwrite:
2681  {
2682  ++self->pWCount;
2683  self->pWBytes += req->pgwrite.dlen;
2684  break;
2685  }
2686 
2687  //------------------------------------------------------------------------
2688  // Handle writev response
2689  //------------------------------------------------------------------------
2690  case kXR_writev:
2691  {
2692  ++self->pVWCount;
2693  size_t size = req->header.dlen/sizeof(readahead_list);
2694  XrdProto::write_list *wrtList =
2695  reinterpret_cast<XrdProto::write_list*>( message->GetBuffer( 24 ) );
2696  for( size_t i = 0; i < size; ++i )
2697  self->pVWBytes += wrtList[i].wlen;
2698  break;
2699  }
2700  };
2701  }
2702 
2703  //------------------------------------------------------------------------
2705  //------------------------------------------------------------------------
2706  void FileStateHandler::Tick( time_t now )
2707  {
2708  if (pMutex.CondLock())
2709  {TimeOutRequests( now );
2710  pMutex.UnLock();
2711  }
2712  }
2713 
2714  //----------------------------------------------------------------------------
2715  // Declare timeout on requests being recovered
2716  //----------------------------------------------------------------------------
2718  {
2719  if( !pToBeRecovered.empty() )
2720  {
2721  Log *log = DefaultEnv::GetLog();
2722  log->Dump( FileMsg, "[0x%x@%s] Got a timer event", this,
2723  pFileUrl->GetObfuscatedURL().c_str() );
2724  RequestList::iterator it;
2726  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); )
2727  {
2728  if( it->params.expires <= now )
2729  {
2730  jobMan->QueueJob( new ResponseJob(
2731  it->handler,
2733  0, it->params.hostList ) );
2734  it = pToBeRecovered.erase( it );
2735  }
2736  else
2737  ++it;
2738  }
2739  }
2740  }
2741 
2742  //----------------------------------------------------------------------------
2743  // Called in the child process after the fork
2744  //----------------------------------------------------------------------------
2746  {
2747  Log *log = DefaultEnv::GetLog();
2748 
2749  if( pFileState == Closed || pFileState == Error )
2750  return;
2751 
2752  if( (IsReadOnly() && pDoRecoverRead) ||
2753  (!IsReadOnly() && pDoRecoverWrite) )
2754  {
2755  log->Debug( FileMsg, "[0x%x@%s] Putting the file in recovery state in "
2756  "process %d", this, pFileUrl->GetObfuscatedURL().c_str(), getpid() );
2757  pFileState = Recovering;
2758  pInTheFly.clear();
2759  pToBeRecovered.clear();
2760  }
2761  else
2762  pFileState = Error;
2763  }
2764 
2765  //------------------------------------------------------------------------
2766  // Try other data server
2767  //------------------------------------------------------------------------
2768  XRootDStatus FileStateHandler::TryOtherServer( std::shared_ptr<FileStateHandler> &self, uint16_t timeout )
2769  {
2770  XrdSysMutexHelper scopedLock( self->pMutex );
2771 
2772  if( self->pFileState != Opened || !self->pLoadBalancer )
2773  return XRootDStatus( stError, errInvalidOp );
2774 
2775  self->pFileState = Recovering;
2776 
2777  Log *log = DefaultEnv::GetLog();
2778  log->Debug( FileMsg, "[0x%x@%s] Reopen file at next data server.",
2779  self.get(), self->pFileUrl->GetObfuscatedURL().c_str() );
2780 
2781  // merge CGI
2782  auto lbcgi = self->pLoadBalancer->GetParams();
2783  auto dtcgi = self->pDataServer->GetParams();
2784  MessageUtils::MergeCGI( lbcgi, dtcgi, false );
2785  // update tried CGI
2786  auto itr = lbcgi.find( "tried" );
2787  if( itr == lbcgi.end() )
2788  lbcgi["tried"] = self->pDataServer->GetHostName();
2789  else
2790  {
2791  std::string tried = itr->second;
2792  tried += "," + self->pDataServer->GetHostName();
2793  lbcgi["tried"] = tried;
2794  }
2795  self->pLoadBalancer->SetParams( lbcgi );
2796 
2797  return ReOpenFileAtServer( self, *self->pLoadBalancer, timeout );
2798  }
2799 
2800  //------------------------------------------------------------------------
2801  // Generic implementation of xattr operation
2802  //------------------------------------------------------------------------
2803  template<typename T>
2804  Status FileStateHandler::XAttrOperationImpl( std::shared_ptr<FileStateHandler> &self,
2805  kXR_char subcode,
2806  kXR_char options,
2807  const std::vector<T> &attrs,
2808  ResponseHandler *handler,
2809  uint16_t timeout )
2810  {
2811  //--------------------------------------------------------------------------
2812  // Issue a new fattr request
2813  //--------------------------------------------------------------------------
2814  Message *msg;
2815  ClientFattrRequest *req;
2816  MessageUtils::CreateRequest( msg, req );
2817 
2818  req->requestid = kXR_fattr;
2819  req->subcode = subcode;
2820  req->numattr = attrs.size();
2821  req->options = options;
2822  memcpy( req->fhandle, self->pFileHandle, 4 );
2823  XRootDStatus st = MessageUtils::CreateXAttrBody( msg, attrs );
2824  if( !st.IsOK() ) return st;
2825 
2826  MessageSendParams params;
2827  params.timeout = timeout;
2828  params.followRedirects = false;
2829  params.stateful = true;
2831 
2833  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
2834 
2835  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
2836  }
2837 
2838  //----------------------------------------------------------------------------
2839  // Send a message to a host or put it in the recovery queue
2840  //----------------------------------------------------------------------------
2841  Status FileStateHandler::SendOrQueue( std::shared_ptr<FileStateHandler> &self,
2842  const URL &url,
2843  Message *msg,
2844  ResponseHandler *handler,
2845  MessageSendParams &sendParams )
2846  {
2847  //--------------------------------------------------------------------------
2848  // Recovering
2849  //--------------------------------------------------------------------------
2850  if( self->pFileState == Recovering )
2851  {
2852  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2853  }
2854 
2855  //--------------------------------------------------------------------------
2856  // Trying to send
2857  //--------------------------------------------------------------------------
2858  if( self->pFileState == Opened )
2859  {
2860  msg->SetSessionId( self->pSessionId );
2861  XRootDStatus st = self->IssueRequest( *self->pDataServer, msg, handler, sendParams );
2862 
2863  //------------------------------------------------------------------------
2864  // Invalid session id means that the connection has been broken while we
2865  // were idle so we haven't been informed about this fact earlier.
2866  //------------------------------------------------------------------------
2867  if( !st.IsOK() && st.code == errInvalidSession && self->IsRecoverable( st ) )
2868  return RecoverMessage( self, RequestData( msg, handler, sendParams ), false );
2869 
2870  if( st.IsOK() )
2871  self->pInTheFly.insert(msg);
2872  else
2873  delete handler;
2874  return st;
2875  }
2876  return Status( stError, errInvalidOp );
2877  }
2878 
2879  //----------------------------------------------------------------------------
2880  // Check if the stateful error is recoverable
2881  //----------------------------------------------------------------------------
2882  bool FileStateHandler::IsRecoverable( const XRootDStatus &status ) const
2883  {
2884  const auto recoverable_errors = {
2888  errInternal,
2889  errTlsError,
2891  };
2892 
2893  if (pDoRecoverRead || pDoRecoverWrite)
2894  for (const auto error : recoverable_errors)
2895  if (status.code == error)
2896  return IsReadOnly() ? pDoRecoverRead : pDoRecoverWrite;
2897 
2898  return false;
2899  }
2900 
2901  //----------------------------------------------------------------------------
2902  // Check if the file is open for read only
2903  //----------------------------------------------------------------------------
2904  bool FileStateHandler::IsReadOnly() const
2905  {
2906  if( (pOpenFlags & kXR_open_read) && !(pOpenFlags & kXR_open_updt) &&
2907  !(pOpenFlags & kXR_open_apnd ) )
2908  return true;
2909  return false;
2910  }
2911 
2912  //----------------------------------------------------------------------------
2913  // Recover a message
2914  //----------------------------------------------------------------------------
2915  Status FileStateHandler::RecoverMessage( std::shared_ptr<FileStateHandler> &self,
2916  RequestData rd,
2917  bool callbackOnFailure )
2918  {
2919  self->pFileState = Recovering;
2920 
2921  Log *log = DefaultEnv::GetLog();
2922  log->Dump( FileMsg, "[0x%x@%s] Putting message %s in the recovery list",
2923  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
2924  rd.request->GetObfuscatedDescription().c_str() );
2925 
2926  Status st = RunRecovery( self );
2927  if( st.IsOK() )
2928  {
2929  self->pToBeRecovered.push_back( rd );
2930  return st;
2931  }
2932 
2933  if( callbackOnFailure )
2934  self->FailMessage( rd, st );
2935 
2936  return st;
2937  }
2938 
2939  //----------------------------------------------------------------------------
2940  // Run the recovery procedure if appropriate
2941  //----------------------------------------------------------------------------
2942  Status FileStateHandler::RunRecovery( std::shared_ptr<FileStateHandler> &self )
2943  {
2944  if( self->pFileState != Recovering )
2945  return Status();
2946 
2947  if( !self->pInTheFly.empty() )
2948  return Status();
2949 
2950  Log *log = DefaultEnv::GetLog();
2951  log->Debug( FileMsg, "[0x%x@%s] Running the recovery procedure", self.get(),
2952  self->pFileUrl->GetObfuscatedURL().c_str() );
2953 
2954  Status st;
2955  if( self->pStateRedirect )
2956  {
2957  SendClose( self, 0 );
2958  st = ReOpenFileAtServer( self, *self->pStateRedirect, 0 );
2959  delete self->pStateRedirect; self->pStateRedirect = 0;
2960  }
2961  else if( self->IsReadOnly() && self->pLoadBalancer )
2962  st = ReOpenFileAtServer( self, *self->pLoadBalancer, 0 );
2963  else
2964  st = ReOpenFileAtServer( self, *self->pDataServer, 0 );
2965 
2966  if( !st.IsOK() )
2967  {
2968  self->pFileState = Error;
2969  self->pStatus = st;
2970  self->FailQueuedMessages( st );
2971  }
2972 
2973  return st;
2974  }
2975 
2976  //----------------------------------------------------------------------------
2977  // Send a close and ignore the response
2978  //----------------------------------------------------------------------------
2979  XRootDStatus FileStateHandler::SendClose( std::shared_ptr<FileStateHandler> &self,
2980  uint16_t timeout )
2981  {
2982  Message *msg;
2983  ClientCloseRequest *req;
2984  MessageUtils::CreateRequest( msg, req );
2985 
2986  req->requestid = kXR_close;
2987  memcpy( req->fhandle, self->pFileHandle, 4 );
2988 
2990  msg->SetSessionId( self->pSessionId );
2992  [self]( XRootDStatus&, AnyObject& ) mutable { self.reset(); } );
2993  MessageSendParams params;
2994  params.timeout = timeout;
2995  params.followRedirects = false;
2996  params.stateful = true;
2997 
2999 
3000  return self->IssueRequest( *self->pDataServer, msg, handler, params );
3001  }
3002 
3003  //----------------------------------------------------------------------------
3004  // Re-open the current file at a given server
3005  //----------------------------------------------------------------------------
3006  XRootDStatus FileStateHandler::ReOpenFileAtServer( std::shared_ptr<FileStateHandler> &self,
3007  const URL &url,
3008  uint16_t timeout )
3009  {
3010  Log *log = DefaultEnv::GetLog();
3011  log->Dump( FileMsg, "[0x%x@%s] Sending a recovery open command to %s",
3012  self.get(), self->pFileUrl->GetObfuscatedURL().c_str(), url.GetObfuscatedURL().c_str() );
3013 
3014  //--------------------------------------------------------------------------
3015  // Remove the kXR_delete and kXR_new flags, as we don't want the recovery
3016  // procedure to delete a file that has been partially updated or fail it
3017  // because a partially uploaded file already exists.
3018  //--------------------------------------------------------------------------
3019  if( self->pOpenFlags & kXR_delete)
3020  {
3021  self->pOpenFlags &= ~kXR_delete;
3022  self->pOpenFlags |= kXR_open_updt;
3023  }
3024 
3025  self->pOpenFlags &= ~kXR_new;
3026 
3027  Message *msg;
3028  ClientOpenRequest *req;
3029  URL u = url;
3030 
3031  if( url.GetPath().empty() )
3032  u.SetPath( self->pFileUrl->GetPath() );
3033 
3034  std::string path = u.GetPathWithFilteredParams();
3035  MessageUtils::CreateRequest( msg, req, path.length() );
3036 
3037  req->requestid = kXR_open;
3038  req->mode = self->pOpenMode;
3039  req->options = self->pOpenFlags;
3040  req->dlen = path.length();
3041  msg->Append( path.c_str(), path.length(), 24 );
3042 
3043  // create a new reopen handler
3044  // (it is not assigned to 'pReOpenHandler' in order not to bump the reference counter
3045  // until we know that 'SendMessage' was successful)
3046  OpenHandler *openHandler = new OpenHandler( self, 0 );
3047  MessageSendParams params; params.timeout = timeout;
3050 
3051  //--------------------------------------------------------------------------
3052  // Issue the open request
3053  //--------------------------------------------------------------------------
3054  XRootDStatus st = self->IssueRequest( url, msg, openHandler, params );
3055 
3056  // if there was a problem destroy the open handler
3057  if( !st.IsOK() )
3058  {
3059  delete openHandler;
3060  self->pStatus = st;
3061  self->pFileState = Closed;
3062  }
3063  return st;
3064  }
3065 
3066  //------------------------------------------------------------------------
3067  // Fail a message
3068  //------------------------------------------------------------------------
3069  void FileStateHandler::FailMessage( RequestData rd, XRootDStatus status )
3070  {
3071  Log *log = DefaultEnv::GetLog();
3072  log->Dump( FileMsg, "[0x%x@%s] Failing message %s with %s",
3073  this, pFileUrl->GetObfuscatedURL().c_str(),
3074  rd.request->GetObfuscatedDescription().c_str(),
3075  status.ToStr().c_str() );
3076 
3077  StatefulHandler *sh = dynamic_cast<StatefulHandler*>(rd.handler);
3078  if( !sh )
3079  {
3080  Log *log = DefaultEnv::GetLog();
3081  log->Error( FileMsg, "[0x%x@%s] Internal error while recovering %s",
3082  this, pFileUrl->GetObfuscatedURL().c_str(),
3083  rd.request->GetObfuscatedDescription().c_str() );
3084  return;
3085  }
3086 
3088  ResponseHandler *userHandler = sh->GetUserHandler();
3089  jobMan->QueueJob( new ResponseJob(
3090  userHandler,
3091  new XRootDStatus( status ),
3092  0, rd.params.hostList ) );
3093 
3094  delete sh;
3095  }
3096 
3097  //----------------------------------------------------------------------------
3098  // Fail queued messages
3099  //----------------------------------------------------------------------------
3100  void FileStateHandler::FailQueuedMessages( XRootDStatus status )
3101  {
3102  RequestList::iterator it;
3103  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3104  FailMessage( *it, status );
3105  pToBeRecovered.clear();
3106  }
3107 
3108  //------------------------------------------------------------------------
3109  // Re-send queued messages
3110  //------------------------------------------------------------------------
3111  void FileStateHandler::ReSendQueuedMessages()
3112  {
3113  RequestList::iterator it;
3114  for( it = pToBeRecovered.begin(); it != pToBeRecovered.end(); ++it )
3115  {
3116  it->request->SetSessionId( pSessionId );
3117  ReWriteFileHandle( it->request );
3118  XRootDStatus st = IssueRequest( *pDataServer, it->request,
3119  it->handler, it->params );
3120  if( !st.IsOK() )
3121  FailMessage( *it, st );
3122  }
3123  pToBeRecovered.clear();
3124  }
3125 
3126  //------------------------------------------------------------------------
3127  // Re-write file handle
3128  //------------------------------------------------------------------------
3129  void FileStateHandler::ReWriteFileHandle( Message *msg )
3130  {
3132  switch( hdr->requestid )
3133  {
3134  case kXR_read:
3135  {
3137  memcpy( req->fhandle, pFileHandle, 4 );
3138  break;
3139  }
3140  case kXR_write:
3141  {
3143  memcpy( req->fhandle, pFileHandle, 4 );
3144  break;
3145  }
3146  case kXR_sync:
3147  {
3149  memcpy( req->fhandle, pFileHandle, 4 );
3150  break;
3151  }
3152  case kXR_truncate:
3153  {
3155  memcpy( req->fhandle, pFileHandle, 4 );
3156  break;
3157  }
3158  case kXR_readv:
3159  {
3161  readahead_list *dataChunk = (readahead_list*)msg->GetBuffer( 24 );
3162  for( size_t i = 0; i < req->dlen/sizeof(readahead_list); ++i )
3163  memcpy( dataChunk[i].fhandle, pFileHandle, 4 );
3164  break;
3165  }
3166  case kXR_writev:
3167  {
3168  ClientWriteVRequest *req =
3169  reinterpret_cast<ClientWriteVRequest*>( msg->GetBuffer() );
3170  XrdProto::write_list *wrtList =
3171  reinterpret_cast<XrdProto::write_list*>( msg->GetBuffer( 24 ) );
3172  size_t size = req->dlen / sizeof(XrdProto::write_list);
3173  for( size_t i = 0; i < size; ++i )
3174  memcpy( wrtList[i].fhandle, pFileHandle, 4 );
3175  break;
3176  }
3177  case kXR_pgread:
3178  {
3180  memcpy( req->fhandle, pFileHandle, 4 );
3181  break;
3182  }
3183  case kXR_pgwrite:
3184  {
3186  memcpy( req->fhandle, pFileHandle, 4 );
3187  break;
3188  }
3189  }
3190 
3191  Log *log = DefaultEnv::GetLog();
3192  log->Dump( FileMsg, "[0x%x@%s] Rewritten file handle for %s to 0x%x",
3193  this, pFileUrl->GetObfuscatedURL().c_str(), msg->GetObfuscatedDescription().c_str(),
3194  *((uint32_t*)pFileHandle) );
3196  }
3197 
3198  //----------------------------------------------------------------------------
3199  // Dispatch monitoring information on close
3200  //----------------------------------------------------------------------------
3201  void FileStateHandler::MonitorClose( const XRootDStatus *status )
3202  {
3204  if( mon )
3205  {
3207  i.file = pFileUrl;
3208  i.oTOD = pOpenTime;
3209  gettimeofday( &i.cTOD, 0 );
3210  i.rBytes = pRBytes;
3211  i.vrBytes = pVRBytes;
3212  i.wBytes = pWBytes;
3213  i.vwBytes = pVWBytes;
3214  i.vSegs = pVSegs;
3215  i.rCount = pRCount;
3216  i.vCount = pVRCount;
3217  i.wCount = pWCount;
3218  i.status = status;
3219  mon->Event( Monitor::EvClose, &i );
3220  }
3221  }
3222 
3223  XRootDStatus FileStateHandler::IssueRequest( const URL &url,
3224  Message *msg,
3225  ResponseHandler *handler,
3226  MessageSendParams &sendParams )
3227  {
3228  // first handle Metalinks
3229  if( pUseVirtRedirector && url.IsMetalink() )
3230  return MessageUtils::RedirectMessage( url, msg, handler,
3231  sendParams, pLFileHandler );
3232 
3233  // than local file access
3234  if( url.IsLocalFile() )
3235  return pLFileHandler->ExecRequest( url, msg, handler, sendParams );
3236 
3237  // and finally ordinary XRootD requests
3238  return MessageUtils::SendMessage( url, msg, handler,
3239  sendParams, pLFileHandler );
3240  }
3241 
3242  //------------------------------------------------------------------------
3243  // Send a write request with payload being stored in a kernel buffer
3244  //------------------------------------------------------------------------
3245  XRootDStatus FileStateHandler::WriteKernelBuffer( std::shared_ptr<FileStateHandler> &self,
3246  uint64_t offset,
3247  uint32_t length,
3248  std::unique_ptr<XrdSys::KernelBuffer> kbuff,
3249  ResponseHandler *handler,
3250  uint16_t timeout )
3251  {
3252  //--------------------------------------------------------------------------
3253  // Create the write request
3254  //--------------------------------------------------------------------------
3255  XrdSysMutexHelper scopedLock( self->pMutex );
3256 
3257  if( self->pFileState != Opened && self->pFileState != Recovering )
3258  return XRootDStatus( stError, errInvalidOp );
3259 
3260  Log *log = DefaultEnv::GetLog();
3261  log->Debug( FileMsg, "[0x%x@%s] Sending a write command for handle 0x%x to "
3262  "%s", self.get(), self->pFileUrl->GetObfuscatedURL().c_str(),
3263  *((uint32_t*)self->pFileHandle), self->pDataServer->GetHostId().c_str() );
3264 
3265  Message *msg;
3266  ClientWriteRequest *req;
3267  MessageUtils::CreateRequest( msg, req );
3268 
3269  req->requestid = kXR_write;
3270  req->offset = offset;
3271  req->dlen = length;
3272  memcpy( req->fhandle, self->pFileHandle, 4 );
3273 
3274  MessageSendParams params;
3275  params.timeout = timeout;
3276  params.followRedirects = false;
3277  params.stateful = true;
3278  params.kbuff = kbuff.release();
3279  params.chunkList = new ChunkList();
3280 
3282 
3284  StatefulHandler *stHandler = new StatefulHandler( self, handler, msg, params );
3285 
3286  return SendOrQueue( self, *self->pDataServer, msg, stHandler, params );
3287  }
3288 }
kXR_unt16 requestid
Definition: XProtocol.hh:479
kXR_unt16 requestid
Definition: XProtocol.hh:630
kXR_unt16 requestid
Definition: XProtocol.hh:806
@ kXR_fattrDel
Definition: XProtocol.hh:270
@ kXR_fattrSet
Definition: XProtocol.hh:273
@ kXR_fattrList
Definition: XProtocol.hh:272
@ kXR_fattrGet
Definition: XProtocol.hh:271
#define kXR_suppgrw
Definition: XProtocol.hh:1174
kXR_char fhandle[4]
Definition: XProtocol.hh:531
kXR_char fhandle[4]
Definition: XProtocol.hh:782
struct ClientPgReadRequest pgread
Definition: XProtocol.hh:861
kXR_char fhandle[4]
Definition: XProtocol.hh:807
kXR_char fhandle[4]
Definition: XProtocol.hh:771
kXR_int64 offset
Definition: XProtocol.hh:646
kXR_unt16 requestid
Definition: XProtocol.hh:644
@ kXR_virtReadv
Definition: XProtocol.hh:150
kXR_unt16 options
Definition: XProtocol.hh:481
static const int kXR_ckpXeq
Definition: XProtocol.hh:216
struct ClientPgWriteRequest pgwrite
Definition: XProtocol.hh:862
kXR_unt16 requestid
Definition: XProtocol.hh:228
@ kXR_async
Definition: XProtocol.hh:458
@ kXR_delete
Definition: XProtocol.hh:453
@ kXR_open_read
Definition: XProtocol.hh:456
@ kXR_open_updt
Definition: XProtocol.hh:457
@ kXR_new
Definition: XProtocol.hh:455
@ kXR_open_apnd
Definition: XProtocol.hh:462
@ kXR_retstat
Definition: XProtocol.hh:463
struct ClientRequestHdr header
Definition: XProtocol.hh:846
kXR_char fhandle[4]
Definition: XProtocol.hh:509
#define kXR_recoverWrts
Definition: XProtocol.hh:1166
kXR_unt16 infotype
Definition: XProtocol.hh:631
kXR_char fhandle[4]
Definition: XProtocol.hh:645
kXR_char fhandle[4]
Definition: XProtocol.hh:229
kXR_unt16 requestid
Definition: XProtocol.hh:157
kXR_char fhandle[4]
Definition: XProtocol.hh:633
@ kXR_read
Definition: XProtocol.hh:125
@ kXR_open
Definition: XProtocol.hh:122
@ kXR_writev
Definition: XProtocol.hh:143
@ kXR_readv
Definition: XProtocol.hh:137
@ kXR_sync
Definition: XProtocol.hh:128
@ kXR_fattr
Definition: XProtocol.hh:132
@ kXR_query
Definition: XProtocol.hh:113
@ kXR_write
Definition: XProtocol.hh:131
@ kXR_truncate
Definition: XProtocol.hh:140
@ kXR_stat
Definition: XProtocol.hh:129
@ kXR_pgread
Definition: XProtocol.hh:142
@ kXR_chkpoint
Definition: XProtocol.hh:124
@ kXR_close
Definition: XProtocol.hh:115
@ kXR_pgwrite
Definition: XProtocol.hh:138
struct ClientReadRequest read
Definition: XProtocol.hh:867
kXR_int32 rlen
Definition: XProtocol.hh:660
kXR_unt16 requestid
Definition: XProtocol.hh:768
kXR_int32 dlen
Definition: XProtocol.hh:483
kXR_char fhandle[4]
Definition: XProtocol.hh:794
kXR_unt16 mode
Definition: XProtocol.hh:480
kXR_unt16 requestid
Definition: XProtocol.hh:508
kXR_unt16 requestid
Definition: XProtocol.hh:781
kXR_char fhandle[4]
Definition: XProtocol.hh:204
kXR_int64 offset
Definition: XProtocol.hh:661
#define kXR_PROTPGRWVERSION
Definition: XProtocol.hh:73
kXR_int64 offset
Definition: XProtocol.hh:808
struct ClientWriteRequest write
Definition: XProtocol.hh:876
kXR_int32 rlen
Definition: XProtocol.hh:647
kXR_unt16 requestid
Definition: XProtocol.hh:670
@ kXR_Qopaqug
Definition: XProtocol.hh:625
@ kXR_Qvisa
Definition: XProtocol.hh:622
kXR_int32 dlen
Definition: XProtocol.hh:159
unsigned char kXR_char
Definition: XPtypes.hh:65
static int mapError(int rc)
Definition: XProtocol.hh:1361
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
Definition: XrdClBuffer.hh:34
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void Append(const char *buffer, uint32_t size)
Append data at the position pointed to by the append cursor.
Definition: XrdClBuffer.hh:164
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static FileTimer * GetFileTimer()
Get file timer task.
static ForkHandler * GetForkHandler()
Get the fork handler.
static Env * GetEnv()
Get default client environment.
XRootDStatus Open(uint16_t flags, ResponseHandler *handler, uint16_t timeout)
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
An interface for file plug-ins.
static XRootDStatus PgReadRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, size_t pgnb, void *buffer, PgReadHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, kXR_char flags, ResponseHandler *handler, uint16_t timeout=0)
void AfterForkChild()
Called in the child process after the fork.
static XRootDStatus Stat(std::shared_ptr< FileStateHandler > &self, bool force, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateRedirection(std::shared_ptr< FileStateHandler > &self, const std::string &redirectUrl, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle stateful redirect.
static XRootDStatus Sync(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
void TimeOutRequests(time_t now)
Declare timeout on requests being recovered.
static XRootDStatus DelXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus GetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< std::string > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ListXAttr(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus SetXAttr(std::shared_ptr< FileStateHandler > &self, const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
static void OnStateError(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, ResponseHandler *userHandler, MessageSendParams &sendParams)
Handle an error while sending a stateful message.
FileStateHandler(FilePlugIn *&plugin)
Constructor.
static XRootDStatus ReadV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgReadImpl(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, uint16_t flags, ResponseHandler *handler, uint16_t timeout=0)
@ OpenInProgress
Opening is in progress.
@ CloseInProgress
Closing operation is in progress.
@ Closed
The file is closed.
@ Opened
Opening has succeeded.
@ Error
Opening has failed.
@ Recovering
Recovering from an error.
static XRootDStatus ChkptWrt(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool SetProperty(const std::string &name, const std::string &value)
static void OnStateResponse(std::shared_ptr< FileStateHandler > &self, XRootDStatus *status, Message *message, AnyObject *response, HostList *hostList)
Handle stateful response.
static XRootDStatus Read(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
void OnClose(const XRootDStatus *status)
Process the results of the closing operation.
static XRootDStatus Fcntl(std::shared_ptr< FileStateHandler > &self, const Buffer &arg, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Truncate(std::shared_ptr< FileStateHandler > &self, uint64_t size, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Close(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus ChkptWrtV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWrite(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, std::vector< uint32_t > &cksums, ResponseHandler *handler, uint16_t timeout=0)
void OnOpen(const XRootDStatus *status, const OpenInfo *openInfo, const HostList *hostList)
Process the results of the opening operation.
static XRootDStatus PgRead(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus PgWriteRetry(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, uint32_t digest, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorWrite(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus WriteV(std::shared_ptr< FileStateHandler > &self, uint64_t offset, const struct iovec *iov, int iovcnt, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Visa(std::shared_ptr< FileStateHandler > &self, ResponseHandler *handler, uint16_t timeout=0)
bool GetProperty(const std::string &name, std::string &value) const
static XRootDStatus Open(std::shared_ptr< FileStateHandler > &self, const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus VectorRead(std::shared_ptr< FileStateHandler > &self, const ChunkList &chunks, void *buffer, ResponseHandler *handler, uint16_t timeout=0)
bool IsOpen() const
Check if the file is open.
static XRootDStatus Write(std::shared_ptr< FileStateHandler > &self, uint64_t offset, uint32_t size, const void *buffer, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus Checkpoint(std::shared_ptr< FileStateHandler > &self, kXR_char code, ResponseHandler *handler, uint16_t timeout=0)
static XRootDStatus TryOtherServer(std::shared_ptr< FileStateHandler > &self, uint16_t timeout)
Try other data server.
void UnRegisterFileObject(FileStateHandler *file)
Un-register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file state handler.
void RegisterFileObject(FileStateHandler *file)
Register a file object.
void UnRegisterFileObject(FileStateHandler *file)
A synchronized queue.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
XRootDStatus ExecRequest(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams)
Translate an XRootD request into LocalFileHandler call.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static Status CreateXAttrBody(Message *msg, const std::vector< T > &vec, const std::string &path="")
static Status RedirectMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Redirect message.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
void SetSessionId(uint64_t sessionId)
Set the session ID which this message is meant for.
void SetVirtReqID(uint16_t virtReqID)
Set virtual request ID for the message.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvClose
CloseInfo: File closed.
@ EvErrIO
ErrorInfo: An I/O error occurred.
@ EvOpen
OpenInfo: File opened.
virtual void Event(EventCode evCode, void *evData)=0
Information returned by file open operation.
void GetFileHandle(uint8_t *fileHandle) const
Get the file handle (4bytes)
const StatInfo * GetStatInfo() const
Get the stat info.
uint64_t GetSessionId() const
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
Handle an async response.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static ResponseHandler * Wrap(std::function< void(XRootDStatus &, AnyObject &)> func)
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Call the user callback.
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:458
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:395
std::string GetPathWithFilteredParams() const
Get the path with params, filteres out 'xrdcl.'.
Definition: XrdClURL.cc:324
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition: XrdClURL.cc:491
void SetPath(const std::string &path)
Set the path.
Definition: XrdClURL.hh:225
bool IsLocalFile() const
Definition: XrdClURL.cc:467
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
static XrdCl::XRootDStatus GetProtocolVersion(const XrdCl::URL url, int &protver)
Definition: XrdClUtils.hh:235
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static void SetDescription(Message *msg)
Get the description of a message.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition: XrdOucCRC.cc:190
static void csCalc(const char *data, off_t offs, size_t count, uint32_t *csval)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
static bool IsPageAligned(const void *ptr)
const uint16_t errSocketOptError
Definition: XrdClStatus.hh:76
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t errPollerError
Definition: XrdClStatus.hh:75
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInProgress
Definition: XrdClStatus.hh:59
const uint16_t errSocketTimeout
Definition: XrdClStatus.hh:73
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const uint64_t FileMsg
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
const uint16_t errConnectionError
Definition: XrdClStatus.hh:78
const uint16_t errSocketError
Definition: XrdClStatus.hh:72
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errSocketDisconnected
Definition: XrdClStatus.hh:74
static const int PageSize
ssize_t Read(int fd, KernelBuffer &buffer, uint32_t length, int64_t offset)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
static const int aData
Definition: XProtocol.hh:298
kXR_char fhandle[4]
Definition: XProtocol.hh:288
kXR_unt16 requestid
Definition: XProtocol.hh:287
kXR_unt16 requestid
Definition: XProtocol.hh:820
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
std::vector< uint32_t > crc32cDigests
XrdSys::KernelBuffer * kbuff
Describe a file close event.
uint64_t vwBytes
Total number of bytes written vie writev.
const XRootDStatus * status
Close status.
uint32_t wCount
Total count of writes.
uint64_t vSegs
Total count of readv segments.
uint64_t vrBytes
Total number of bytes read via readv.
timeval cTOD
gettimeofday() when file was closed
uint32_t vCount
Total count of readv.
const URL * file
The file in question.
uint64_t rBytes
Total number of bytes read via read.
timeval oTOD
gettimeofday() when file was opened
uint64_t wBytes
Total number of bytes written.
uint32_t rCount
Total count of reads.
Describe an encountered file-based error.
@ ErrUnc
Unclassified operation.
const XRootDStatus * status
Status code.
const URL * file
The file in question.
Operation opCode
The associated operation.
Describe a file open event to the monitor.
uint64_t fSize
File size in bytes.
const URL * file
File in question.
std::string dataServer
Actual fata server.
uint16_t oFlags
OpenFlags.
void SetNbRepair(size_t nbrepair)
Set number of repaired pages.
std::vector< uint32_t > & GetCksums()
Get the checksums.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
std::tuple< uint64_t, uint32_t > At(size_t i)
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
static const uint16_t ServerFlags
returns server flags
static const uint16_t IsEncrypted
returns true if the channel is encrypted