XRootD
XrdClXRootDMsgHandler.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
26 #define __XRD_CL_XROOTD_MSG_HANDLER_HH__
27 
30 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClMessage.hh"
32 #include "XProtocol/XProtocol.hh"
33 #include "XrdCl/XrdClLog.hh"
34 #include "XrdCl/XrdClConstants.hh"
35 
41 
42 #include "XrdSys/XrdSysPthread.hh"
43 #include "XrdSys/XrdSysPageSize.hh"
45 #include "XrdSys/XrdSysPlatform.hh"
46 
48 #include "XrdOuc/XrdOucUtils.hh"
49 
50 #include <sys/uio.h>
51 #include <arpa/inet.h> // for network unmarshaling stuff
52 
53 #include <array>
54 #include <list>
55 #include <memory>
56 #include <atomic>
57 #include <memory>
58 
59 namespace XrdCl
60 {
61  class PostMaster;
62  class SIDManager;
63  class URL;
64  class LocalFileHandler;
65  class Socket;
66 
67  //----------------------------------------------------------------------------
68  // Single entry in the redirect-trace-back
69  //----------------------------------------------------------------------------
71  {
72  enum Type
73  {
77  EntryWait
78  };
79 
80  RedirectEntry( const URL &from, const URL &to, Type type ) :
81  from( from ), to( to ), type( type )
82  {
83 
84  }
85 
90 
91  std::string ToString( bool prevok = true )
92  {
93  const std::string tostr = to.GetLocation();
94  const std::string fromstr = from.GetLocation();
95 
96  if( prevok )
97  {
98  switch( type )
99  {
100  case EntryRedirect: return "Redirected from: " + fromstr + " to: "
101  + tostr;
102 
103  case EntryRedirectOnWait: return "Server responded with wait. "
104  "Falling back to virtual redirector: " + tostr;
105 
106  case EntryRetry: return "Retrying: " + tostr;
107 
108  case EntryWait: return "Waited at server request. Resending: "
109  + tostr;
110  }
111  }
112  return "Failed at: " + fromstr + ", retrying at: " + tostr;
113  }
114  };
115 
116  //----------------------------------------------------------------------------
118  //----------------------------------------------------------------------------
120  {
121  friend class HandleRspJob;
122 
123  public:
124  //------------------------------------------------------------------------
133  //------------------------------------------------------------------------
135  ResponseHandler *respHandler,
136  const URL *url,
137  std::shared_ptr<SIDManager> sidMgr,
138  LocalFileHandler *lFileHandler):
139  pRequest( msg ),
140  pResponseHandler( respHandler ),
141  pUrl( *url ),
142  pEffectiveDataServerUrl( 0 ),
143  pSidMgr( sidMgr ),
144  pLFileHandler( lFileHandler ),
145  pExpiration( 0 ),
146  pRedirectAsAnswer( false ),
147  pOksofarAsAnswer( false ),
148  pHasLoadBalancer( false ),
149  pHasSessionId( false ),
150  pChunkList( 0 ),
151  pKBuff( 0 ),
152  pRedirectCounter( 0 ),
153  pNotAuthorizedCounter( 0 ),
154 
155  pAsyncOffset( 0 ),
156  pAsyncChunkIndex( 0 ),
157 
158  pPgWrtCksumBuff( 4 ),
159  pPgWrtCurrentPageOffset( 0 ),
160  pPgWrtCurrentPageNb( 0 ),
161 
162  pOtherRawStarted( false ),
163 
164  pFollowMetalink( false ),
165 
166  pStateful( false ),
167 
168  pAggregatedWaitTime( 0 ),
169 
170  pSendingState( 0 ),
171 
172  pTimeoutFence( false ),
173 
174  pDirListStarted( false ),
175  pDirListWithStat( false ),
176 
177  pCV( 0 ),
178 
179  pSslErrCnt( 0 )
180  {
181  pPostMaster = DefaultEnv::GetPostMaster();
182  if( msg->GetSessionId() )
183  pHasSessionId = true;
184 
185  Log *log = DefaultEnv::GetLog();
186  log->Debug( ExDbgMsg, "[%s] MsgHandler created: %p (message: %s ).",
187  pUrl.GetHostId().c_str(), (void*)this,
188  pRequest->GetObfuscatedDescription().c_str() );
189 
190  ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
191  if( ntohs( hdr->requestid ) == kXR_pgread )
192  {
193  ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
194  pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
195  ntohl( pgrdreq->rlen ) ) );
196  }
197 
198  //----------------------------------------------------------------------
199  // Pass the reader our pUrl, not *url. The latter is a reference, likely
200  // from FileStateHandler such as *pDataServer. Accessing that throughout
201  // our lifetime may lead to concurrent access. In the case of read-
202  // recovery the FileStateHandler may entirely reallocate the url object.
203  //----------------------------------------------------------------------
204  if( ntohs( hdr->requestid ) == kXR_readv )
205  pBodyReader.reset( new AsyncVectorReader( pUrl, *pRequest ) );
206  else if( ntohs( hdr->requestid ) == kXR_read )
207  pBodyReader.reset( new AsyncRawReader( pUrl, *pRequest ) );
208  else
209  pBodyReader.reset( new AsyncDiscardReader( pUrl, *pRequest ) );
210  }
211 
212  //------------------------------------------------------------------------
214  //------------------------------------------------------------------------
216  {
217  DumpRedirectTraceBack();
218 
219  if( !pHasSessionId )
220  delete pRequest;
221  delete pEffectiveDataServerUrl;
222 
223  pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
224  pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
225  pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
226  pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
227  pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
228  pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
229 
230  Log *log = DefaultEnv::GetLog();
231  log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: %p.",
232  pUrl.GetHostId().c_str(), (void*)this );
233  }
234 
235  //------------------------------------------------------------------------
241  //------------------------------------------------------------------------
242  virtual uint16_t Examine( std::shared_ptr<Message> &msg ) override;
243 
244  //------------------------------------------------------------------------
253  //------------------------------------------------------------------------
254  virtual uint16_t InspectStatusRsp() override;
255 
256  //------------------------------------------------------------------------
260  //------------------------------------------------------------------------
261  virtual uint16_t GetSid() const override;
262 
263  //------------------------------------------------------------------------
267  //------------------------------------------------------------------------
268  virtual void Process() override;
269 
270  //------------------------------------------------------------------------
280  //------------------------------------------------------------------------
281  virtual XRootDStatus ReadMessageBody( Message *msg,
282  Socket *socket,
283  uint32_t &bytesRead ) override;
284 
285  //------------------------------------------------------------------------
290  //------------------------------------------------------------------------
291  virtual uint8_t OnStreamEvent( StreamEvent event,
292  XRootDStatus status ) override;
293 
294  //------------------------------------------------------------------------
296  //------------------------------------------------------------------------
297  virtual void OnStatusReady( const Message *message,
298  XRootDStatus status ) override;
299 
300  //------------------------------------------------------------------------
302  //------------------------------------------------------------------------
303  virtual bool IsRaw() const override;
304 
305  //------------------------------------------------------------------------
314  //------------------------------------------------------------------------
316  uint32_t &bytesWritten ) override;
317 
318  //------------------------------------------------------------------------
322  //------------------------------------------------------------------------
323  void WaitDone( time_t now );
324 
325  //------------------------------------------------------------------------
327  //------------------------------------------------------------------------
328  void SetExpiration( time_t expiration )
329  {
330  pExpiration = expiration;
331  }
332 
333  //------------------------------------------------------------------------
335  //------------------------------------------------------------------------
336  time_t GetExpiration() override
337  {
338  return pExpiration;
339  }
340 
341  //------------------------------------------------------------------------
344  //------------------------------------------------------------------------
345  void SetRedirectAsAnswer( bool redirectAsAnswer )
346  {
347  pRedirectAsAnswer = redirectAsAnswer;
348  }
349 
350  //------------------------------------------------------------------------
353  //------------------------------------------------------------------------
354  void SetOksofarAsAnswer( bool oksofarAsAnswer )
355  {
356  pOksofarAsAnswer = oksofarAsAnswer;
357  }
358 
359  //------------------------------------------------------------------------
361  //------------------------------------------------------------------------
362  const Message *GetRequest() const
363  {
364  return pRequest;
365  }
366 
367  //------------------------------------------------------------------------
369  //------------------------------------------------------------------------
370  void SetLoadBalancer( const HostInfo &loadBalancer )
371  {
372  if( !loadBalancer.url.IsValid() )
373  return;
374  pLoadBalancer = loadBalancer;
375  pHasLoadBalancer = true;
376  }
377 
378  //------------------------------------------------------------------------
380  //------------------------------------------------------------------------
381  void SetHostList( HostList *hostList )
382  {
383  pHosts.reset( hostList );
384  }
385 
386  //------------------------------------------------------------------------
388  //------------------------------------------------------------------------
389  void SetChunkList( ChunkList *chunkList )
390  {
391  pChunkList = chunkList;
392  if( pBodyReader )
393  pBodyReader->SetChunkList( chunkList );
394  if( chunkList )
395  pChunkStatus.resize( chunkList->size() );
396  else
397  pChunkStatus.clear();
398  }
399 
400  void SetCrc32cDigests( std::vector<uint32_t> && crc32cDigests )
401  {
402  pCrc32cDigests = std::move( crc32cDigests );
403  }
404 
405  //------------------------------------------------------------------------
407  //------------------------------------------------------------------------
409  {
410  pKBuff = kbuff;
411  }
412 
413  //------------------------------------------------------------------------
415  //------------------------------------------------------------------------
416  void SetRedirectCounter( uint16_t redirectCounter )
417  {
418  pRedirectCounter = redirectCounter;
419  }
420 
421  void SetFollowMetalink( bool followMetalink )
422  {
423  pFollowMetalink = followMetalink;
424  }
425 
426  void SetStateful( bool stateful )
427  {
428  pStateful = stateful;
429  }
430 
431  //------------------------------------------------------------------------
435  //------------------------------------------------------------------------
436  void PartialReceived();
437 
438  void OnReadyToSend( [[maybe_unused]] Message *msg ) override
439  {
440  pSendingState |= kSawReadySend;
441  }
442 
443  void OnWaitingToSend( [[maybe_unused]] Message *msg ) override
444  {
445  pSendingState = 0;
446  }
447 
448  private:
449 
450  // bit flags used with pSendingState
451  static constexpr int kSendDone = 0x0001;
452  static constexpr int kSawResp = 0x0002;
453  static constexpr int kFinalResp = 0x0004;
454  static constexpr int kSawReadySend = 0x0008;
455  static constexpr int kRetryAtSrv = 0x0010;
456  static constexpr int kInFlyDone = 0x0020;
457 
458  //------------------------------------------------------------------------
460  //------------------------------------------------------------------------
461  void HandleError( XRootDStatus status );
462 
463  //------------------------------------------------------------------------
465  //------------------------------------------------------------------------
466  Status RetryAtServer( const URL &url, RedirectEntry::Type entryType );
467 
468  //------------------------------------------------------------------------
470  //------------------------------------------------------------------------
471  void HandleResponse();
472 
473  //------------------------------------------------------------------------
475  //------------------------------------------------------------------------
476  XRootDStatus *ProcessStatus();
477 
478  //------------------------------------------------------------------------
481  //------------------------------------------------------------------------
482  Status ParseResponse( AnyObject *&response );
483 
484  //------------------------------------------------------------------------
487  //------------------------------------------------------------------------
488  Status ParseXAttrResponse( char *data, size_t len, AnyObject *&response );
489 
490  //------------------------------------------------------------------------
493  //------------------------------------------------------------------------
494  Status RewriteRequestRedirect( const URL &newUrl );
495 
496  //------------------------------------------------------------------------
498  //------------------------------------------------------------------------
499  Status RewriteRequestWait();
500 
501  //------------------------------------------------------------------------
503  //------------------------------------------------------------------------
504  void UpdateTriedCGI(uint32_t errNo=0);
505 
506  //------------------------------------------------------------------------
508  //------------------------------------------------------------------------
509  void SwitchOnRefreshFlag();
510 
511  //------------------------------------------------------------------------
514  //------------------------------------------------------------------------
515  void HandleRspOrQueue();
516 
517  //------------------------------------------------------------------------
519  //------------------------------------------------------------------------
520  void HandleLocalRedirect( URL *url );
521 
522  //------------------------------------------------------------------------
527  //------------------------------------------------------------------------
528  bool IsRetriable();
529 
530  //------------------------------------------------------------------------
537  //------------------------------------------------------------------------
538  bool OmitWait( Message &request, const URL &url );
539 
540  //------------------------------------------------------------------------
546  //------------------------------------------------------------------------
547  bool RetriableErrorResponse( const Status &status );
548 
549  //------------------------------------------------------------------------
551  //------------------------------------------------------------------------
552  void DumpRedirectTraceBack();
553 
560  //------------------------------------------------------------------------
561  template<typename T>
562  Status ReadFromBuffer( char *&buffer, size_t &buflen, T& result );
563 
564  //------------------------------------------------------------------------
571  //------------------------------------------------------------------------
572  Status ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result );
573 
574  //------------------------------------------------------------------------
582  //------------------------------------------------------------------------
583  Status ReadFromBuffer( char *&buffer, size_t &buflen, size_t size,
584  std::string &result );
585 
586  //------------------------------------------------------------------------
587  // Helper struct for async reading of chunks
588  //------------------------------------------------------------------------
589  struct ChunkStatus
590  {
591  ChunkStatus(): sizeError( false ), done( false ) {}
592  bool sizeError;
593  bool done;
594  };
595 
596  typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
597 
598  static const size_t CksumSize = sizeof( uint32_t );
599  static const size_t PageWithCksum = XrdSys::PageSize + CksumSize;
600  static const size_t MaxSslErrRetry = 3;
601 
602  inline static size_t NbPgPerRsp( uint64_t offset, uint32_t dlen )
603  {
604  uint32_t pgcnt = 0;
605  uint32_t remainder = offset % XrdSys::PageSize;
606  if( remainder > 0 )
607  {
608  // account for the first unaligned page
609  ++pgcnt;
610  // the size of the 1st unaligned page
611  uint32_t _1stpg = XrdSys::PageSize - remainder;
612  if( _1stpg + CksumSize > dlen )
613  _1stpg = dlen - CksumSize;
614  dlen -= _1stpg + CksumSize;
615  }
616  pgcnt += dlen / PageWithCksum;
617  if( dlen % PageWithCksum )
618  ++ pgcnt;
619  return pgcnt;
620  }
621 
622  //------------------------------------------------------------------------
623  // Used to track whether we need to relase or timeout SID
624  //------------------------------------------------------------------------
625  inline bool IsInFly() const
626  {
627  const int sst = pSendingState;
628  if ( ( sst & ( kSawResp|kSendDone ) ) && !( sst & kInFlyDone ) )
629  return true;
630  return false;
631  }
632 
633  Message *pRequest;
634  std::shared_ptr<Message> pResponse; //< the ownership is shared with MsgReader
635  std::vector<std::shared_ptr<Message>> pPartialResps; //< the ownership is shared with MsgReader
636  ResponseHandler *pResponseHandler;
637  URL pUrl;
638  URL *pEffectiveDataServerUrl;
639  PostMaster *pPostMaster;
640  std::shared_ptr<SIDManager> pSidMgr;
641  LocalFileHandler *pLFileHandler;
642  XRootDStatus pStatus;
643  Status pLastError;
644  time_t pExpiration;
645  bool pRedirectAsAnswer;
646  bool pOksofarAsAnswer;
647  std::unique_ptr<HostList> pHosts;
648  bool pHasLoadBalancer;
649  HostInfo pLoadBalancer;
650  bool pHasSessionId;
651  std::string pRedirectUrl;
652  ChunkList *pChunkList;
653  std::vector<uint32_t> pCrc32cDigests;
654  XrdSys::KernelBuffer *pKBuff;
655  std::vector<ChunkStatus> pChunkStatus;
656  uint16_t pRedirectCounter;
657  uint16_t pNotAuthorizedCounter;
658 
659  uint32_t pAsyncOffset;
660  uint32_t pAsyncChunkIndex;
661 
662  std::unique_ptr<AsyncPageReader> pPageReader;
663  std::unique_ptr<AsyncRawReaderIntfc> pBodyReader;
664 
665  Buffer pPgWrtCksumBuff;
666  uint32_t pPgWrtCurrentPageOffset;
667  uint32_t pPgWrtCurrentPageNb;
668 
669  bool pOtherRawStarted;
670 
671  bool pFollowMetalink;
672 
673  bool pStateful;
674  int pAggregatedWaitTime;
675 
676  std::unique_ptr<RedirectEntry> pRdirEntry;
677  RedirectTraceBack pRedirectTraceBack;
678 
679  std::atomic<int> pSendingState;
680 
681  //------------------------------------------------------------------------
682  // true if MsgHandler is both in inQueue and installed in respective
683  // Stream (this could happen if server gave oksofar response), otherwise
684  // false
685  //------------------------------------------------------------------------
686  std::atomic<bool> pTimeoutFence;
687 
688  //------------------------------------------------------------------------
689  // if we are serving chunked data to the user's handler in case of
690  // kXR_dirlist we need to memorize if the response contains stat info or
691  // not (the information is only encoded in the first chunk)
692  //------------------------------------------------------------------------
693  bool pDirListStarted;
694  bool pDirListWithStat;
695 
696  //------------------------------------------------------------------------
697  // synchronization is needed in case the MsgHandler has been configured
698  // to serve kXR_oksofar as a response to the user's handler
699  //------------------------------------------------------------------------
700  XrdSysCondVar pCV;
701 
702  //------------------------------------------------------------------------
703  // Count of consecutive `errTlsSslError` errors
704  //------------------------------------------------------------------------
705  size_t pSslErrCnt;
706 
707  //------------------------------------------------------------------------
708  // Used in case we need to delay retry sending while awaiting the initial
709  // message sent confirmation.
710  //------------------------------------------------------------------------
711  URL pRetryAtUrl;
712  RedirectEntry::Type pRetryAtEntryType;
713  };
714 }
715 
716 #endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__
kXR_unt16 requestid
Definition: XProtocol.hh:159
@ kXR_read
Definition: XProtocol.hh:126
@ kXR_readv
Definition: XProtocol.hh:138
@ kXR_pgread
Definition: XProtocol.hh:143
Object for discarding data.
Object for reading out data from the kXR_read response.
Object for reading out data from the VectorRead response.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
StreamEvent
Events that may have occurred to the stream.
A hub for dispatching and receiving messages.
Handle an async response.
A network socket.
Definition: XrdClSocket.hh:43
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition: XrdClURL.cc:344
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
Handle/Process/Forward XRootD messages.
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
void SetFollowMetalink(bool followMetalink)
const Message * GetRequest() const
Get the request pointer.
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
void SetHostList(HostList *hostList)
Set host list.
virtual uint16_t InspectStatusRsp() override
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
void SetCrc32cDigests(std::vector< uint32_t > &&crc32cDigests)
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
void OnReadyToSend([[maybe_unused]] Message *msg) override
XRootDMsgHandler(Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
virtual void Process() override
Process the message if it was "taken" by the examine action.
void OnWaitingToSend([[maybe_unused]] Message *msg) override
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
void SetStateful(bool stateful)
void SetOksofarAsAnswer(bool oksofarAsAnswer)
time_t GetExpiration() override
Get a timestamp after which we give up.
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
void SetKernelBuffer(XrdSys::KernelBuffer *kbuff)
Set the kernel buffer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
virtual bool IsRaw() const override
Are we a raw writer or not?
void SetRedirectAsAnswer(bool redirectAsAnswer)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
std::vector< HostInfo > HostList
const uint64_t ExDbgMsg
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
URL url
URL of the host.
RedirectEntry(const URL &from, const URL &to, Type type)
std::string ToString(bool prevok=true)
Procedure execution status.
Definition: XrdClStatus.hh:115