XRootD
XrdCl::XRootDMsgHandler Class Reference

Handle/Process/Forward XRootD messages. More...

#include <XrdClXRootDMsgHandler.hh>

+ Inheritance diagram for XrdCl::XRootDMsgHandler:
+ Collaboration diagram for XrdCl::XRootDMsgHandler:

Public Member Functions

 XRootDMsgHandler (Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
 
 ~XRootDMsgHandler ()
 Destructor. More...
 
virtual uint16_t Examine (std::shared_ptr< Message > &msg) override
 
time_t GetExpiration () override
 Get a timestamp after which we give up. More...
 
const MessageGetRequest () const
 Get the request pointer. More...
 
virtual uint16_t GetSid () const override
 
virtual uint16_t InspectStatusRsp () override
 
virtual bool IsRaw () const override
 Are we a raw writer or not? More...
 
void OnReadyToSend ([[maybe_unused]] Message *msg) override
 
virtual void OnStatusReady (const Message *message, XRootDStatus status) override
 The requested action has been performed and the status is available. More...
 
virtual uint8_t OnStreamEvent (StreamEvent event, XRootDStatus status) override
 
void OnWaitingToSend ([[maybe_unused]] Message *msg) override
 
void PartialReceived ()
 
virtual void Process () override
 Process the message if it was "taken" by the examine action. More...
 
virtual XRootDStatus ReadMessageBody (Message *msg, Socket *socket, uint32_t &bytesRead) override
 
void SetChunkList (ChunkList *chunkList)
 Set the chunk list. More...
 
void SetCrc32cDigests (std::vector< uint32_t > &&crc32cDigests)
 
void SetExpiration (time_t expiration)
 Set a timestamp after which we give up. More...
 
void SetFollowMetalink (bool followMetalink)
 
void SetHostList (HostList *hostList)
 Set host list. More...
 
void SetKernelBuffer (XrdSys::KernelBuffer *kbuff)
 Set the kernel buffer. More...
 
void SetLoadBalancer (const HostInfo &loadBalancer)
 Set the load balancer. More...
 
void SetOksofarAsAnswer (bool oksofarAsAnswer)
 
void SetRedirectAsAnswer (bool redirectAsAnswer)
 
void SetRedirectCounter (uint16_t redirectCounter)
 Set the redirect counter. More...
 
void SetStateful (bool stateful)
 
void WaitDone (time_t now)
 
XRootDStatus WriteMessageBody (Socket *socket, uint32_t &bytesWritten) override
 
- Public Member Functions inherited from XrdCl::MsgHandler
virtual ~MsgHandler ()
 Event types that the message handler may receive. More...
 
virtual void OnReadyToSend (Message *msg)
 
virtual void OnWaitingToSend (Message *msg)
 Called to indicate the message is waiting to be sent. More...
 

Friends

class HandleRspJob
 

Additional Inherited Members

- Public Types inherited from XrdCl::MsgHandler
enum  Action {
  None = 0x0000 ,
  Nop = 0x0001 ,
  Ignore = 0x0002 ,
  RemoveHandler = 0x0004 ,
  Raw = 0x0008 ,
  NoProcess = 0x0010 ,
  Corrupted = 0x0020 ,
  More = 0x0040
}
 Actions to be taken after a message is processed by the handler. More...
 
enum  StreamEvent {
  Ready = 1 ,
  Broken = 2 ,
  Timeout = 3 ,
  FatalError = 4
}
 Events that may have occurred to the stream. More...
 

Detailed Description

Handle/Process/Forward XRootD messages.

Definition at line 119 of file XrdClXRootDMsgHandler.hh.

Constructor & Destructor Documentation

◆ XRootDMsgHandler()

XrdCl::XRootDMsgHandler::XRootDMsgHandler ( Message msg,
ResponseHandler respHandler,
const URL url,
std::shared_ptr< SIDManager sidMgr,
LocalFileHandler lFileHandler 
)
inline

Constructor

Parameters
msgmessage that has been sent out
respHandlerresponse handler to be called then the final final response arrives
urlthe url the message has been sent to
sidMgrthe sid manager used to allocate SID for the initial message

Definition at line 134 of file XrdClXRootDMsgHandler.hh.

138  :
139  pRequest( msg ),
140  pResponseHandler( respHandler ),
141  pUrl( *url ),
142  pEffectiveDataServerUrl( 0 ),
143  pSidMgr( sidMgr ),
144  pLFileHandler( lFileHandler ),
145  pExpiration( 0 ),
146  pRedirectAsAnswer( false ),
147  pOksofarAsAnswer( false ),
148  pHasLoadBalancer( false ),
149  pHasSessionId( false ),
150  pChunkList( 0 ),
151  pKBuff( 0 ),
152  pRedirectCounter( 0 ),
153  pNotAuthorizedCounter( 0 ),
154 
155  pAsyncOffset( 0 ),
156  pAsyncChunkIndex( 0 ),
157 
158  pPgWrtCksumBuff( 4 ),
159  pPgWrtCurrentPageOffset( 0 ),
160  pPgWrtCurrentPageNb( 0 ),
161 
162  pOtherRawStarted( false ),
163 
164  pFollowMetalink( false ),
165 
166  pStateful( false ),
167 
168  pAggregatedWaitTime( 0 ),
169 
170  pSendingState( 0 ),
171 
172  pTimeoutFence( false ),
173 
174  pDirListStarted( false ),
175  pDirListWithStat( false ),
176 
177  pCV( 0 ),
178 
179  pSslErrCnt( 0 )
180  {
181  pPostMaster = DefaultEnv::GetPostMaster();
182  if( msg->GetSessionId() )
183  pHasSessionId = true;
184 
185  Log *log = DefaultEnv::GetLog();
186  log->Debug( ExDbgMsg, "[%s] MsgHandler created: %p (message: %s ).",
187  pUrl.GetHostId().c_str(), (void*)this,
188  pRequest->GetObfuscatedDescription().c_str() );
189 
190  ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
191  if( ntohs( hdr->requestid ) == kXR_pgread )
192  {
193  ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
194  pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
195  ntohl( pgrdreq->rlen ) ) );
196  }
197 
198  //----------------------------------------------------------------------
199  // Pass the reader our pUrl, not *url. The latter is a reference, likely
200  // from FileStateHandler such as *pDataServer. Accessing that throughout
201  // our lifetime may lead to concurrent access. In the case of read-
202  // recovery the FileStateHandler may entirely reallocate the url object.
203  //----------------------------------------------------------------------
204  if( ntohs( hdr->requestid ) == kXR_readv )
205  pBodyReader.reset( new AsyncVectorReader( pUrl, *pRequest ) );
206  else if( ntohs( hdr->requestid ) == kXR_read )
207  pBodyReader.reset( new AsyncRawReader( pUrl, *pRequest ) );
208  else
209  pBodyReader.reset( new AsyncDiscardReader( pUrl, *pRequest ) );
210  }
kXR_unt16 requestid
Definition: XProtocol.hh:159
@ kXR_read
Definition: XProtocol.hh:126
@ kXR_readv
Definition: XProtocol.hh:138
@ kXR_pgread
Definition: XProtocol.hh:143
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint64_t ExDbgMsg
XrdSysError Log
Definition: XrdConfig.cc:113

References XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Message::GetSessionId(), kXR_pgread, kXR_read, kXR_readv, ClientPgReadRequest::offset, ClientRequestHdr::requestid, and ClientPgReadRequest::rlen.

+ Here is the call graph for this function:

◆ ~XRootDMsgHandler()

XrdCl::XRootDMsgHandler::~XRootDMsgHandler ( )
inline

Destructor.

Definition at line 215 of file XrdClXRootDMsgHandler.hh.

216  {
217  DumpRedirectTraceBack();
218 
219  if( !pHasSessionId )
220  delete pRequest;
221  delete pEffectiveDataServerUrl;
222 
223  pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
224  pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
225  pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
226  pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
227  pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
228  pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
229 
230  Log *log = DefaultEnv::GetLog();
231  log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: %p.",
232  pUrl.GetHostId().c_str(), (void*)this );
233  }
std::vector< ChunkInfo > ChunkList
List of chunks.

References XrdCl::Log::Debug(), XrdCl::ExDbgMsg, XrdCl::URL::GetHostId(), and XrdCl::DefaultEnv::GetLog().

+ Here is the call graph for this function:

Member Function Documentation

◆ Examine()

uint16_t XrdCl::XRootDMsgHandler::Examine ( std::shared_ptr< Message > &  msg)
overridevirtual

Examine an incoming message, and decide on the action to be taken

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 109 of file XrdClXRootDMsgHandler.cc.

110  {
111  const int sst = pSendingState.fetch_or( kSawResp );
112 
113  if( !( sst & kSendDone ) && !( sst & kSawResp ) )
114  {
115  // we must have been sent although we haven't got the OnStatusReady
116  // notification yet. Set the inflight notice.
117 
118  Log *log = DefaultEnv::GetLog();
119  log->Dump( XRootDMsg, "[%s] Message %s reply received before notification "
120  "that it was sent, assuming it was sent ok.",
121  pUrl.GetHostId().c_str(),
122  pRequest->GetObfuscatedDescription().c_str() );
123  }
124 
125  //--------------------------------------------------------------------------
126  // if the MsgHandler is already being used to process another request
127  // (kXR_oksofar) we need to wait
128  //--------------------------------------------------------------------------
129  if( pOksofarAsAnswer )
130  {
131  XrdSysCondVarHelper lck( pCV );
132  while( pResponse ) pCV.Wait();
133  }
134  else
135  {
136  if( pResponse )
137  {
138  Log *log = DefaultEnv::GetLog();
139  log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
140  "it already owns a response: %p (message: %s ).",
141  pUrl.GetHostId().c_str(), (void*)this,
142  pRequest->GetObfuscatedDescription().c_str() );
143  }
144  }
145 
146  if( msg->GetSize() < 8 )
147  return Ignore;
148 
149  ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
150  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
151  uint16_t status = 0;
152  uint32_t dlen = 0;
153 
154  //--------------------------------------------------------------------------
155  // We only care about async responses, but those are extracted now
156  // in the SocketHandler.
157  //--------------------------------------------------------------------------
158  if( rsp->hdr.status == kXR_attn )
159  {
160  return Ignore;
161  }
162  //--------------------------------------------------------------------------
163  // We got a sync message - check if it belongs to us
164  //--------------------------------------------------------------------------
165  else
166  {
167  if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
168  rsp->hdr.streamid[1] != req->header.streamid[1] )
169  return Ignore;
170 
171  status = rsp->hdr.status;
172  dlen = rsp->hdr.dlen;
173  }
174 
175  //--------------------------------------------------------------------------
176  // We take the ownership of the message and decide what we will do
177  // with the handler itself, the options are:
178  // 1) we want to either read in raw mode (the Raw flag) or have the message
179  // body reconstructed for us by the TransportHandler by the time
180  // Process() is called (default, no extra flag)
181  // 2) we either got a full response in which case we don't want to be
182  // notified about anything anymore (RemoveHandler) or we got a partial
183  // answer and we need to wait for more (default, no extra flag)
184  //--------------------------------------------------------------------------
185  pResponse = msg;
186  pBodyReader->SetDataLength( dlen );
187 
188  Log *log = DefaultEnv::GetLog();
189  switch( status )
190  {
191  //------------------------------------------------------------------------
192  // Handle the cached cases
193  //------------------------------------------------------------------------
194  case kXR_error:
195  case kXR_redirect:
196  case kXR_wait:
197  return RemoveHandler;
198 
199  case kXR_waitresp:
200  {
201  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
202  "message %s", pUrl.GetHostId().c_str(),
203  pRequest->GetObfuscatedDescription().c_str() );
204 
205  pResponse.reset();
206  return Ignore; // This must be handled synchronously!
207  }
208 
209  //------------------------------------------------------------------------
210  // Handle the potential raw cases
211  //------------------------------------------------------------------------
212  case kXR_ok:
213  {
214  //----------------------------------------------------------------------
215  // For kXR_read we read in raw mode
216  //----------------------------------------------------------------------
217  uint16_t reqId = ntohs( req->header.requestid );
218  if( reqId == kXR_read )
219  {
220  return Raw | RemoveHandler;
221  }
222 
223  //----------------------------------------------------------------------
224  // kXR_readv is the same as kXR_read
225  //----------------------------------------------------------------------
226  if( reqId == kXR_readv )
227  {
228  return Raw | RemoveHandler;
229  }
230 
231  //----------------------------------------------------------------------
232  // For everything else we just take what we got
233  //----------------------------------------------------------------------
234  return RemoveHandler;
235  }
236 
237  //------------------------------------------------------------------------
238  // kXR_oksofars are special, they are not full responses, so we reset
239  // the response pointer to 0 and add the message to the partial list
240  //------------------------------------------------------------------------
241  case kXR_oksofar:
242  {
243  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
244  "%s", pUrl.GetHostId().c_str(),
245  pRequest->GetObfuscatedDescription().c_str() );
246 
247  if( !pOksofarAsAnswer )
248  {
249  pPartialResps.emplace_back( std::move( pResponse ) );
250  }
251 
252  //----------------------------------------------------------------------
253  // For kXR_read we either read in raw mode if the message has not
254  // been fully reconstructed already, if it has, we adjust
255  // the buffer offset to prepare for the next one
256  //----------------------------------------------------------------------
257  uint16_t reqId = ntohs( req->header.requestid );
258  if( reqId == kXR_read )
259  {
260  pTimeoutFence.store( true, std::memory_order_relaxed );
261  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
262  }
263 
264  //----------------------------------------------------------------------
265  // kXR_readv is similar to read, except that the payload is different
266  //----------------------------------------------------------------------
267  if( reqId == kXR_readv )
268  {
269  pTimeoutFence.store( true, std::memory_order_relaxed );
270  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
271  }
272 
273  return ( pOksofarAsAnswer ? None : NoProcess );
274  }
275 
276  case kXR_status:
277  {
278  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
279  "%s", pUrl.GetHostId().c_str(),
280  pRequest->GetObfuscatedDescription().c_str() );
281 
282  uint16_t reqId = ntohs( req->header.requestid );
283  if( reqId == kXR_pgwrite )
284  {
285  //--------------------------------------------------------------------
286  // In case of pgwrite by definition this wont be a partial response
287  // so we can already remove the handler from the in-queue
288  //--------------------------------------------------------------------
289  return RemoveHandler;
290  }
291 
292  //----------------------------------------------------------------------
293  // Otherwise (pgread), first of all we need to read the body of the
294  // kXR_status response, we can handle the raw data (if any) only after
295  // we have the whole kXR_status body
296  //----------------------------------------------------------------------
297  pTimeoutFence.store( true, std::memory_order_relaxed );
298  return None;
299  }
300 
301  //------------------------------------------------------------------------
302  // Default
303  //------------------------------------------------------------------------
304  default:
305  return RemoveHandler;
306  }
307  return RemoveHandler;
308  }
kXR_char streamid[2]
Definition: XProtocol.hh:158
kXR_char streamid[2]
Definition: XProtocol.hh:956
@ kXR_waitresp
Definition: XProtocol.hh:948
@ kXR_redirect
Definition: XProtocol.hh:946
@ kXR_oksofar
Definition: XProtocol.hh:942
@ kXR_status
Definition: XProtocol.hh:949
@ kXR_ok
Definition: XProtocol.hh:941
@ kXR_attn
Definition: XProtocol.hh:943
@ kXR_wait
Definition: XProtocol.hh:947
@ kXR_error
Definition: XProtocol.hh:945
struct ClientRequestHdr header
Definition: XProtocol.hh:887
@ kXR_pgwrite
Definition: XProtocol.hh:139
ServerResponseHeader hdr
Definition: XProtocol.hh:1330
@ Ignore
Ignore the message.
const uint64_t XRootDMsg

References ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::ExDbgMsg, XrdCl::Buffer::GetBuffer(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, kXR_attn, kXR_error, kXR_ok, kXR_oksofar, kXR_pgwrite, kXR_read, kXR_readv, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::MsgHandler::None, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseHeader::status, ClientRequestHdr::streamid, ServerResponseHeader::streamid, XrdSysCondVar::Wait(), XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ GetExpiration()

time_t XrdCl::XRootDMsgHandler::GetExpiration ( )
inlineoverridevirtual

Get a timestamp after which we give up.

Implements XrdCl::MsgHandler.

Definition at line 336 of file XrdClXRootDMsgHandler.hh.

337  {
338  return pExpiration;
339  }

◆ GetRequest()

const Message* XrdCl::XRootDMsgHandler::GetRequest ( ) const
inline

Get the request pointer.

Definition at line 362 of file XrdClXRootDMsgHandler.hh.

363  {
364  return pRequest;
365  }

◆ GetSid()

uint16_t XrdCl::XRootDMsgHandler::GetSid ( ) const
overridevirtual

Get handler sid

return sid of the corresponding request, otherwise 0

Implements XrdCl::MsgHandler.

Definition at line 403 of file XrdClXRootDMsgHandler.cc.

404  {
405  ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
406  return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
407  }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, and ClientRequestHdr::streamid.

+ Here is the call graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::XRootDMsgHandler::InspectStatusRsp ( )
overridevirtual

Reexamine the incoming message, and decide on the action to be taken

In case of kXR_status the message can be only fully examined after reading the whole body (without raw data).

Parameters
msgthe message, may be zero if receive failed
Returns
action type that needs to be take wrt the message and the handler

Implements XrdCl::MsgHandler.

Definition at line 313 of file XrdClXRootDMsgHandler.cc.

314  {
315  if( !pResponse )
316  return 0;
317 
318  Log *log = DefaultEnv::GetLog();
319  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
320 
321  //--------------------------------------------------------------------------
322  // Additional action is only required for kXR_status
323  //--------------------------------------------------------------------------
324  if( rsp->hdr.status != kXR_status ) return 0;
325 
326  //--------------------------------------------------------------------------
327  // Ignore malformed status response
328  //--------------------------------------------------------------------------
329  if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
330  {
331  log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
332  return Corrupted;
333  }
334 
335  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
336  uint16_t reqId = ntohs( req->header.requestid );
337  //--------------------------------------------------------------------------
338  // Unmarshal the status body
339  //--------------------------------------------------------------------------
340  XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
341 
342  if( !st.IsOK() && st.code == errDataError )
343  {
344  log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
345  st.GetErrorMessage().c_str() );
346  return Corrupted;
347  }
348 
349  if( !st.IsOK() )
350  {
351  log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
352  pUrl.GetHostId().c_str() );
353  pStatus = st;
354  HandleRspOrQueue();
355  return Ignore;
356  }
357 
358  //--------------------------------------------------------------------------
359  // Common handling for partial results
360  //--------------------------------------------------------------------------
361  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
363  {
364  pPartialResps.push_back( std::move( pResponse ) );
365  }
366 
367  //--------------------------------------------------------------------------
368  // Decide the actions that we need to take
369  //--------------------------------------------------------------------------
370  uint16_t action = 0;
371  if( reqId == kXR_pgread )
372  {
373  //----------------------------------------------------------------------
374  // The message contains only Status header and body but no raw data
375  //----------------------------------------------------------------------
376  if( !pPageReader )
377  pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
378  pPageReader->SetRsp( rspst );
379 
380  action |= Raw;
381 
383  action |= NoProcess;
384  else
385  action |= RemoveHandler;
386  }
387  else if( reqId == kXR_pgwrite )
388  {
389  // if data corruption has been detected on the server side we will
390  // send some additional data pointing to the pages that need to be
391  // retransmitted
392  if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
393  pResponse->GetCursor() )
394  action |= More;
395  }
396 
397  return action;
398  }
ServerResponseStatus status
Definition: XProtocol.hh:1352
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1304
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1303
@ More
there are more (non-raw) data to be read
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
@ kXR_PartialResult
Definition: XProtocol.hh:1293

References ServerResponseStatus::bdy, XrdCl::Status::code, XrdCl::MsgHandler::Corrupted, ServerResponseHeader::dlen, ServerResponseBody_Status::dlen, XrdCl::errDataError, XrdCl::Log::Error(), XrdCl::Buffer::GetBuffer(), XrdCl::XRootDStatus::GetErrorMessage(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), ServerResponseStatus::hdr, ServerResponse::hdr, ClientRequest::header, XrdCl::MsgHandler::Ignore, XrdCl::Status::IsOK(), XrdProto::kXR_PartialResult, kXR_pgread, kXR_pgwrite, kXR_status, XrdCl::MsgHandler::More, XrdCl::MsgHandler::NoProcess, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, ClientRequestHdr::requestid, ServerResponseBody_Status::resptype, ServerResponseHeader::status, ServerResponseV2::status, XrdCl::XRootDTransport::UnMarshalStatusBody(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ IsRaw()

bool XrdCl::XRootDMsgHandler::IsRaw ( ) const
overridevirtual

Are we a raw writer or not?

Reimplemented from XrdCl::MsgHandler.

Definition at line 996 of file XrdClXRootDMsgHandler.cc.

997  {
998  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
999  uint16_t reqId = ntohs( req->header.requestid );
1000  if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
1001  return true;
1002  // checkpoint + execute
1003  if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
1004  {
1005  ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
1006  reqId = ntohs( xeq->header.requestid );
1007  return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
1008  }
1009 
1010  return false;
1011  }
static const int kXR_ckpXeq
Definition: XProtocol.hh:218
@ kXR_writev
Definition: XProtocol.hh:144
@ kXR_write
Definition: XProtocol.hh:132
@ kXR_truncate
Definition: XProtocol.hh:141
@ kXR_chkpoint
Definition: XProtocol.hh:125
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:890

References ClientRequest::chkpoint, XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_chkpoint, kXR_ckpXeq, kXR_pgwrite, kXR_truncate, kXR_write, kXR_writev, ClientChkPointRequest::opcode, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ OnReadyToSend()

void XrdCl::XRootDMsgHandler::OnReadyToSend ( [[maybe_unused] ] Message msg)
inlineoverride

Definition at line 438 of file XrdClXRootDMsgHandler.hh.

439  {
440  pSendingState |= kSawReadySend;
441  }

◆ OnStatusReady()

void XrdCl::XRootDMsgHandler::OnStatusReady ( const Message message,
XRootDStatus  status 
)
overridevirtual

The requested action has been performed and the status is available.

Implements XrdCl::MsgHandler.

Definition at line 919 of file XrdClXRootDMsgHandler.cc.

921  {
922  Log *log = DefaultEnv::GetLog();
923 
924  if( status.IsOK() )
925  {
926  log->Dump( XRootDMsg, "[%s] Got notification that outgoing message %s "
927  "was sent successfully.", pUrl.GetHostId().c_str(),
928  message->GetObfuscatedDescription().c_str() );
929  }
930 
931  // After setting kSendDone processing of this object may continue in
932  // another thread. Unless we're in an error condition our object may
933  // be modified or even destroyed after this point.
934  const int sst = pSendingState.fetch_or( kSendDone );
935 
936  // ignore if we're already in this state
937  if( status.IsOK() && ( sst & kSendDone ) ) return;
938 
939  // if we have already seen a response we should be getting notified
940  // of a successful send. But if not, log and do our best to recover.
941  if( !status.IsOK() && ( ( sst & kFinalResp ) || ( sst & kSawResp ) ) )
942  {
943  log->Error( XRootDMsg, "[%s] Unexpected error for message %s. Trying to "
944  "recover.", pUrl.GetHostId().c_str(),
945  message->GetObfuscatedDescription().c_str() );
946  HandleError( status );
947  return;
948  }
949 
950  if( sst & kFinalResp )
951  {
952  // late notification and we already have final response for user,
953  // need to queue handler callback.
954  HandleRspOrQueue();
955  return;
956  }
957 
958  if( sst & kRetryAtSrv )
959  {
960  // late notification and we already received a response and know
961  // we need to retry at differnt server.
962  HandleError( RetryAtServer( pRetryAtUrl, pRetryAtEntryType ) );
963  return;
964  }
965 
966  if( sst & kSawResp )
967  {
968  // late notification, response processing may be happening in another
969  // thread.
970  return;
971  }
972 
973  //--------------------------------------------------------------------------
974  // We were successful, so we now need to listen for a response
975  //--------------------------------------------------------------------------
976  if( status.IsOK() )
977  {
978  // this is the expcted order, we got the notificaiton but no response
979  // received yet. However another thread is liable to be processing
980  // one or sending a final response and deleting us at any point now.
981  return;
982  }
983 
984  //--------------------------------------------------------------------------
985  // We have failed, recover if possible
986  //--------------------------------------------------------------------------
987  log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
988  "recover.", pUrl.GetHostId().c_str(),
989  message->GetObfuscatedDescription().c_str() );
990  HandleError( status );
991  }

References XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Status::IsOK(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ OnStreamEvent()

uint8_t XrdCl::XRootDMsgHandler::OnStreamEvent ( StreamEvent  event,
XRootDStatus  status 
)
overridevirtual

Handle an event other that a message arrival

Parameters
eventtype of the event
statusstatus info

Reimplemented from XrdCl::MsgHandler.

Definition at line 880 of file XrdClXRootDMsgHandler.cc.

882  {
883  Log *log = DefaultEnv::GetLog();
884  log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
885  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
886 
887  if( event == Ready )
888  return 0;
889 
890  if( pTimeoutFence.load( std::memory_order_relaxed ) )
891  return 0;
892 
893  HandleError( status );
894  return RemoveHandler;
895  }
@ Ready
The stream has become connected.

References XrdCl::Log::Dump(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::MsgHandler::Ready, XrdCl::MsgHandler::RemoveHandler, and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ OnWaitingToSend()

void XrdCl::XRootDMsgHandler::OnWaitingToSend ( [[maybe_unused] ] Message msg)
inlineoverride

Definition at line 443 of file XrdClXRootDMsgHandler.hh.

444  {
445  pSendingState = 0;
446  }

◆ PartialReceived()

void XrdCl::XRootDMsgHandler::PartialReceived ( )

Bookkeeping after partial response has been received:

  • take down the timeout fence after oksofar response has been handled
  • reset status-response-body marshaled flag

Definition at line 1173 of file XrdClXRootDMsgHandler.cc.

1174  {
1175  pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1176  }

Referenced by XrdCl::Stream::OnIncoming().

+ Here is the caller graph for this function:

◆ Process()

void XrdCl::XRootDMsgHandler::Process ( )
overridevirtual

Process the message if it was "taken" by the examine action.

Process the message if it was "taken" by the examine action

Parameters
msgthe message to be processed

Reimplemented from XrdCl::MsgHandler.

Definition at line 412 of file XrdClXRootDMsgHandler.cc.

413  {
414  Log *log = DefaultEnv::GetLog();
415 
416  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
417 
418  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
419 
420  //--------------------------------------------------------------------------
421  // If it is a local file, it can be only a metalink redirector
422  //--------------------------------------------------------------------------
423  if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
424  pHosts->back().protocol = kXR_PROTOCOLVERSION;
425 
426  //--------------------------------------------------------------------------
427  // We got an answer, check who we were talking to
428  //--------------------------------------------------------------------------
429  else
430  {
431  AnyObject qryResult;
432  int *qryResponse = nullptr;
433  pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
434  qryResult.Get( qryResponse );
435  if (qryResponse) {
436  pHosts->back().flags = *qryResponse;
437  delete qryResponse;
438  qryResponse = nullptr;
439  }
440  pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
441  qryResult.Get( qryResponse );
442  if (qryResponse) {
443  pHosts->back().protocol = *qryResponse;
444  delete qryResponse;
445  }
446  }
447 
448  //--------------------------------------------------------------------------
449  // Process the message
450  //--------------------------------------------------------------------------
451  Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
452  if( !st.IsOK() )
453  {
454  pStatus = Status( stFatal, errInvalidMessage );
455  HandleResponse();
456  return;
457  }
458 
459  //--------------------------------------------------------------------------
460  // we have an response for the message so it's not in fly anymore
461  //--------------------------------------------------------------------------
462  pSendingState.fetch_or( kInFlyDone );
463 
464  //--------------------------------------------------------------------------
465  // Reset the aggregated wait (used to omit wait response in case of Metalink
466  // redirector)
467  //--------------------------------------------------------------------------
468  if( rsp->hdr.status != kXR_wait )
469  pAggregatedWaitTime = 0;
470 
471  switch( rsp->hdr.status )
472  {
473  //------------------------------------------------------------------------
474  // kXR_ok - we're done here
475  //------------------------------------------------------------------------
476  case kXR_ok:
477  {
478  log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
479  pUrl.GetHostId().c_str(),
480  pRequest->GetObfuscatedDescription().c_str() );
481  pStatus = Status();
482  HandleResponse();
483  return;
484  }
485 
486  case kXR_status:
487  {
488  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
489  pUrl.GetHostId().c_str(),
490  pRequest->GetObfuscatedDescription().c_str() );
491  pStatus = Status();
492  HandleResponse();
493  return;
494  }
495 
496  //------------------------------------------------------------------------
497  // kXR_ok - we're serving partial result to the user
498  //------------------------------------------------------------------------
499  case kXR_oksofar:
500  {
501  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
502  pUrl.GetHostId().c_str(),
503  pRequest->GetObfuscatedDescription().c_str() );
504  pStatus = Status( stOK, suContinue );
505  HandleResponse();
506  return;
507  }
508 
509  //------------------------------------------------------------------------
510  // kXR_error - we've got a problem
511  //------------------------------------------------------------------------
512  case kXR_error:
513  {
514  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
515  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
516  log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
517  "[%d] %s", pUrl.GetHostId().c_str(),
518  pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
519  errmsg );
520  delete [] errmsg;
521 
522  HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
523  return;
524  }
525 
526  //------------------------------------------------------------------------
527  // kXR_redirect - they tell us to go elsewhere
528  //------------------------------------------------------------------------
529  case kXR_redirect:
530  {
531  if( rsp->hdr.dlen <= 4 )
532  {
533  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
534  pUrl.GetHostId().c_str() );
535  pStatus = Status( stError, errInvalidResponse );
536  HandleResponse();
537  return;
538  }
539 
540  char *urlInfoBuff = new char[rsp->hdr.dlen-3];
541  urlInfoBuff[rsp->hdr.dlen-4] = 0;
542  memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
543  std::string urlInfo = urlInfoBuff;
544  delete [] urlInfoBuff;
545  log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
546  "message %s: %s, port %d", pUrl.GetHostId().c_str(),
547  pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
548  rsp->body.redirect.port );
549 
550  //----------------------------------------------------------------------
551  // Check if we can proceed
552  //----------------------------------------------------------------------
553  if( !pRedirectCounter )
554  {
555  log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
556  "message %s, the last known error is: %s",
557  pUrl.GetHostId().c_str(),
558  pRequest->GetObfuscatedDescription().c_str(),
559  pLastError.ToString().c_str() );
560 
561 
562  pStatus = Status( stFatal, errRedirectLimit );
563  HandleResponse();
564  return;
565  }
566  --pRedirectCounter;
567 
568  //----------------------------------------------------------------------
569  // Keep the info about this server if we still need to find a load
570  // balancer
571  //----------------------------------------------------------------------
572  uint32_t flags = pHosts->back().flags;
573  if( !pHasLoadBalancer )
574  {
575  if( flags & kXR_isManager )
576  {
577  //------------------------------------------------------------------
578  // If the current server is a meta manager then it supersedes
579  // any existing load balancer, otherwise we assign a load-balancer
580  // only if it has not been already assigned
581  //------------------------------------------------------------------
582  if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
583  {
584  pLoadBalancer = pHosts->back();
585  log->Dump( XRootDMsg, "[%s] Current server has been assigned "
586  "as a load-balancer for message %s",
587  pUrl.GetHostId().c_str(),
588  pRequest->GetObfuscatedDescription().c_str() );
589  HostList::iterator it;
590  for( it = pHosts->begin(); it != pHosts->end(); ++it )
591  it->loadBalancer = false;
592  pHosts->back().loadBalancer = true;
593  }
594  }
595  }
596 
597  //----------------------------------------------------------------------
598  // If the redirect comes from a data server safe the URL because
599  // in case of a failure we will use it as the effective data server URL
600  // for the tried CGI opaque info
601  //----------------------------------------------------------------------
602  if( flags & kXR_isServer )
603  pEffectiveDataServerUrl = new URL( pHosts->back().url );
604 
605  //----------------------------------------------------------------------
606  // Build the URL and check it's validity
607  //----------------------------------------------------------------------
608  std::vector<std::string> urlComponents;
609  std::string newCgi;
610  Utils::splitString( urlComponents, urlInfo, "?" );
611 
612  std::ostringstream o;
613 
614  o << urlComponents[0];
615  if( rsp->body.redirect.port > 0 )
616  o << ":" << rsp->body.redirect.port << "/";
617  else if( rsp->body.redirect.port < 0 )
618  {
619  //--------------------------------------------------------------------
620  // check if the manager wants to enforce write recovery at himself
621  // (beware we are dealing here with negative flags)
622  //--------------------------------------------------------------------
623  if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
624  pHosts->back().flags |= kXR_recoverWrts;
625 
626  //--------------------------------------------------------------------
627  // check if the manager wants to collapse the communication channel
628  // (the redirect host is to replace the current host)
629  //--------------------------------------------------------------------
630  if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
631  {
632  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
633  pPostMaster->CollapseRedirect( pUrl, url );
634  }
635 
636  if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
637  {
638  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
639  if( Utils::CheckEC( pRequest, url ) )
640  pRedirectAsAnswer = true;
641  }
642  }
643 
644  URL newUrl = URL( o.str() );
645  if( !newUrl.IsValid() )
646  {
647  pStatus = Status( stError, errInvalidRedirectURL );
648  log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
649  pUrl.GetHostId().c_str(), urlInfo.c_str() );
650  HandleResponse();
651  return;
652  }
653 
654  if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
655  newUrl.SetUserName( pUrl.GetUserName() );
656 
657  if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
658  newUrl.SetPassword( pUrl.GetPassword() );
659 
660  //----------------------------------------------------------------------
661  // Forward any "xrd.*" params from the original client request also to
662  // the new redirection url
663  // Also, we need to preserve any "xrdcl.*' as they are important for
664  // our internal workflows.
665  //----------------------------------------------------------------------
666  std::ostringstream ossXrd;
667  const URL::ParamsMap &urlParams = pUrl.GetParams();
668 
669  for(URL::ParamsMap::const_iterator it = urlParams.begin();
670  it != urlParams.end(); ++it )
671  {
672  if( it->first.compare( 0, 4, "xrd." ) &&
673  it->first.compare( 0, 6, "xrdcl." ) )
674  continue;
675 
676  ossXrd << it->first << '=' << it->second << '&';
677  }
678 
679  std::string xrdCgi = ossXrd.str();
680  pRedirectUrl = newUrl.GetURL();
681 
682  URL cgiURL;
683  if( urlComponents.size() > 1 )
684  {
685  pRedirectUrl += "?";
686  pRedirectUrl += urlComponents[1];
687  std::ostringstream o;
688  o << "fake://fake:111//fake?";
689  o << urlComponents[1];
690 
691  if( urlComponents.size() == 3 )
692  o << '?' << urlComponents[2];
693 
694  if (!xrdCgi.empty())
695  {
696  o << '&' << xrdCgi;
697  pRedirectUrl += '&';
698  pRedirectUrl += xrdCgi;
699  }
700 
701  cgiURL = URL( o.str() );
702  }
703  else {
704  if (!xrdCgi.empty())
705  {
706  std::ostringstream o;
707  o << "fake://fake:111//fake?";
708  o << xrdCgi;
709  cgiURL = URL( o.str() );
710  pRedirectUrl += '?';
711  pRedirectUrl += xrdCgi;
712  }
713  }
714 
715  //----------------------------------------------------------------------
716  // Check if we need to return the URL as a response
717  //----------------------------------------------------------------------
718  if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
719  newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
720  !newUrl.IsLocalFile() )
721  pRedirectAsAnswer = true;
722 
723  if( pRedirectAsAnswer )
724  {
725  pStatus = Status( stError, errRedirect );
726  HandleResponse();
727  return;
728  }
729 
730  //----------------------------------------------------------------------
731  // Rewrite the message in a way required to send it to another server
732  //----------------------------------------------------------------------
733  newUrl.SetParams( cgiURL.GetParams() );
734  Status st = RewriteRequestRedirect( newUrl );
735  if( !st.IsOK() )
736  {
737  pStatus = st;
738  HandleResponse();
739  return;
740  }
741 
742  //----------------------------------------------------------------------
743  // Make sure we don't change the protocol by accident (root vs roots)
744  //----------------------------------------------------------------------
745  if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
746  ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
747  newUrl.SetProtocol( "roots" );
748 
749  //----------------------------------------------------------------------
750  // Send the request to the new location
751  //----------------------------------------------------------------------
752  HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
753  return;
754  }
755 
756  //------------------------------------------------------------------------
757  // kXR_wait - we wait, and re-issue the request later
758  //------------------------------------------------------------------------
759  case kXR_wait:
760  {
761  uint32_t waitSeconds = 0;
762 
763  if( rsp->hdr.dlen >= 4 )
764  {
765  char *infoMsg = new char[rsp->hdr.dlen-3];
766  infoMsg[rsp->hdr.dlen-4] = 0;
767  memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
768  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
769  "message %s: %s", pUrl.GetHostId().c_str(),
770  rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
771  infoMsg );
772  delete [] infoMsg;
773  waitSeconds = rsp->body.wait.seconds;
774  }
775  else
776  {
777  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
778  "message %s", pUrl.GetHostId().c_str(),
779  pRequest->GetObfuscatedDescription().c_str() );
780  }
781 
782  pAggregatedWaitTime += waitSeconds;
783 
784  // We need a special case if the data node comes from metalink
785  // redirector. In this case it might make more sense to try the
786  // next entry in the Metalink than wait.
787  if( OmitWait( *pRequest, pLoadBalancer.url ) )
788  {
789  int maxWait = DefaultMaxMetalinkWait;
790  DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
791  if( pAggregatedWaitTime > maxWait )
792  {
793  UpdateTriedCGI();
794  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
795  return;
796  }
797  }
798 
799  //----------------------------------------------------------------------
800  // Some messages require rewriting before they can be sent again
801  // after wait
802  //----------------------------------------------------------------------
803  Status st = RewriteRequestWait();
804  if( !st.IsOK() )
805  {
806  pStatus = st;
807  HandleResponse();
808  return;
809  }
810 
811  //----------------------------------------------------------------------
812  // Register a task to resend the message in some seconds, if we still
813  // have time to do that, and report a timeout otherwise
814  //----------------------------------------------------------------------
815  time_t resendTime = ::time(0)+waitSeconds;
816 
817  if( resendTime < pExpiration )
818  {
819  log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
820  pUrl.GetHostId().c_str(), (void*)this,
821  pRequest->GetObfuscatedDescription().c_str() );
822 
823  TaskManager *taskMgr = pPostMaster->GetTaskManager();
824  taskMgr->RegisterTask( new WaitTask( this ), resendTime );
825  }
826  else
827  {
828  log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
829  pUrl.GetHostId().c_str(),
830  pRequest->GetObfuscatedDescription().c_str() );
831  HandleError( Status( stError, errOperationExpired) );
832  }
833  return;
834  }
835 
836  //------------------------------------------------------------------------
837  // kXR_waitresp - the response will be returned in some seconds as an
838  // unsolicited message. Currently all messages of this type are handled
839  // one step before in the XrdClStream::OnIncoming as they need to be
840  // processed synchronously.
841  //------------------------------------------------------------------------
842  case kXR_waitresp:
843  {
844  if( rsp->hdr.dlen < 4 )
845  {
846  log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
847  pUrl.GetHostId().c_str() );
848  pStatus = Status( stError, errInvalidResponse );
849  HandleResponse();
850  return;
851  }
852 
853  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
854  "message %s", pUrl.GetHostId().c_str(),
855  rsp->body.waitresp.seconds,
856  pRequest->GetObfuscatedDescription().c_str() );
857  return;
858  }
859 
860  //------------------------------------------------------------------------
861  // Default - unrecognized/unsupported response, declare an error
862  //------------------------------------------------------------------------
863  default:
864  {
865  log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
866  "message %s", pUrl.GetHostId().c_str(),
867  rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
868  pStatus = Status( stError, errInvalidResponse );
869  HandleResponse();
870  return;
871  }
872  }
873 
874  return;
875  }
#define kXR_isManager
Definition: XProtocol.hh:1198
union ServerResponse::@0 body
#define kXR_collapseRedir
Definition: XProtocol.hh:1209
#define kXR_attrMeta
Definition: XProtocol.hh:1201
#define kXR_recoverWrts
Definition: XProtocol.hh:1208
#define kXR_isServer
Definition: XProtocol.hh:1199
#define kXR_PROTOCOLVERSION
Definition: XProtocol.hh:70
#define kXR_ecRedir
Definition: XProtocol.hh:1210
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
void RegisterTask(Task *task, time_t time, bool own=true)
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:465
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
const std::string & GetUserName() const
Get the username.
Definition: XrdClURL.hh:135
const std::string & GetPassword() const
Get the password.
Definition: XrdClURL.hh:153
bool IsLocalFile() const
Definition: XrdClURL.cc:474
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Definition: XrdClUtils.cc:703
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
const uint16_t errRedirectLimit
Definition: XrdClStatus.hh:102
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errInvalidRedirectURL
Definition: XrdClStatus.hh:98
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85
URL url
URL of the host.
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version

References ServerResponse::body, XrdCl::Utils::CheckEC(), XrdCl::PostMaster::CollapseRedirect(), XrdCl::Log::Debug(), XrdCl::DefaultMaxMetalinkWait, ServerResponseHeader::dlen, XrdCl::Log::Dump(), XrdCl::RedirectEntry::EntryRedirect, XrdCl::RedirectEntry::EntryRedirectOnWait, XrdCl::errErrorResponse, XrdCl::errInvalidMessage, XrdCl::errInvalidRedirectURL, XrdCl::errInvalidResponse, XrdCl::errOperationExpired, XrdCl::Log::Error(), XrdCl::errRedirect, XrdCl::errRedirectLimit, XrdCl::ExDbgMsg, XrdCl::AnyObject::Get(), XrdCl::Buffer::GetBuffer(), XrdCl::DefaultEnv::GetEnv(), XrdCl::URL::GetHostId(), XrdCl::Env::GetInt(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::URL::GetParams(), XrdCl::URL::GetPassword(), XrdCl::URL::GetProtocol(), XrdCl::PostMaster::GetTaskManager(), XrdCl::URL::GetURL(), XrdCl::URL::GetUserName(), ServerResponse::hdr, ClientRequest::header, XrdCl::URL::IsLocalFile(), XrdCl::URL::IsMetalink(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), kXR_attrMeta, kXR_collapseRedir, kXR_ecRedir, kXR_error, kXR_isManager, kXR_isServer, kXR_ok, kXR_oksofar, kXR_PROTOCOLVERSION, kXR_recoverWrts, kXR_redirect, kXR_status, kXR_wait, kXR_waitresp, XrdCl::XRootDQuery::ProtocolVersion, XrdCl::PostMaster::QueryTransport(), XrdCl::TaskManager::RegisterTask(), ClientRequestHdr::requestid, XrdCl::XRootDQuery::ServerFlags, XrdCl::URL::SetParams(), XrdCl::URL::SetPassword(), XrdCl::URL::SetProtocol(), XrdCl::URL::SetUserName(), XrdCl::Utils::splitString(), ServerResponseHeader::status, XrdCl::stError, XrdCl::stFatal, XrdCl::stOK, XrdCl::suContinue, XrdCl::Status::ToString(), XrdCl::XRootDTransport::UnMarshallBody(), XrdCl::HostInfo::url, XrdCl::Log::Warning(), and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

◆ ReadMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::ReadMessageBody ( Message msg,
Socket socket,
uint32_t &  bytesRead 
)
overridevirtual

Read message body directly from a socket - called if Examine returns Raw flag - only socket related errors may be returned here

Parameters
msgthe corresponding message header
socketthe socket to read from
bytesReadnumber of bytes read by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data is needed stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 900 of file XrdClXRootDMsgHandler.cc.

903  {
904  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
905  uint16_t reqId = ntohs( req->header.requestid );
906 
907  if( reqId == kXR_pgread )
908  return pPageReader->Read( *socket, bytesRead );
909 
910  return pBodyReader->Read( *socket, bytesRead );
911  }

References XrdCl::Buffer::GetBuffer(), ClientRequest::header, kXR_pgread, and ClientRequestHdr::requestid.

+ Here is the call graph for this function:

◆ SetChunkList()

void XrdCl::XRootDMsgHandler::SetChunkList ( ChunkList chunkList)
inline

Set the chunk list.

Definition at line 389 of file XrdClXRootDMsgHandler.hh.

390  {
391  pChunkList = chunkList;
392  if( pBodyReader )
393  pBodyReader->SetChunkList( chunkList );
394  if( chunkList )
395  pChunkStatus.resize( chunkList->size() );
396  else
397  pChunkStatus.clear();
398  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetCrc32cDigests()

void XrdCl::XRootDMsgHandler::SetCrc32cDigests ( std::vector< uint32_t > &&  crc32cDigests)
inline

Definition at line 400 of file XrdClXRootDMsgHandler.hh.

401  {
402  pCrc32cDigests = std::move( crc32cDigests );
403  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetExpiration()

void XrdCl::XRootDMsgHandler::SetExpiration ( time_t  expiration)
inline

Set a timestamp after which we give up.

Definition at line 328 of file XrdClXRootDMsgHandler.hh.

329  {
330  pExpiration = expiration;
331  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetFollowMetalink()

void XrdCl::XRootDMsgHandler::SetFollowMetalink ( bool  followMetalink)
inline

Definition at line 421 of file XrdClXRootDMsgHandler.hh.

422  {
423  pFollowMetalink = followMetalink;
424  }

Referenced by XrdCl::MessageUtils::RedirectMessage().

+ Here is the caller graph for this function:

◆ SetHostList()

void XrdCl::XRootDMsgHandler::SetHostList ( HostList hostList)
inline

Set host list.

Definition at line 381 of file XrdClXRootDMsgHandler.hh.

382  {
383  pHosts.reset( hostList );
384  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetKernelBuffer()

void XrdCl::XRootDMsgHandler::SetKernelBuffer ( XrdSys::KernelBuffer kbuff)
inline

Set the kernel buffer.

Definition at line 408 of file XrdClXRootDMsgHandler.hh.

409  {
410  pKBuff = kbuff;
411  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetLoadBalancer()

void XrdCl::XRootDMsgHandler::SetLoadBalancer ( const HostInfo loadBalancer)
inline

Set the load balancer.

Definition at line 370 of file XrdClXRootDMsgHandler.hh.

371  {
372  if( !loadBalancer.url.IsValid() )
373  return;
374  pLoadBalancer = loadBalancer;
375  pHasLoadBalancer = true;
376  }

References XrdCl::URL::IsValid(), and XrdCl::HostInfo::url.

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetOksofarAsAnswer()

void XrdCl::XRootDMsgHandler::SetOksofarAsAnswer ( bool  oksofarAsAnswer)
inline

Treat the kXR_oksofar response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 354 of file XrdClXRootDMsgHandler.hh.

355  {
356  pOksofarAsAnswer = oksofarAsAnswer;
357  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectAsAnswer()

void XrdCl::XRootDMsgHandler::SetRedirectAsAnswer ( bool  redirectAsAnswer)
inline

Treat the kXR_redirect response as a valid answer to the message and notify the handler with the URL as a response

Definition at line 345 of file XrdClXRootDMsgHandler.hh.

346  {
347  pRedirectAsAnswer = redirectAsAnswer;
348  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetRedirectCounter()

void XrdCl::XRootDMsgHandler::SetRedirectCounter ( uint16_t  redirectCounter)
inline

Set the redirect counter.

Definition at line 416 of file XrdClXRootDMsgHandler.hh.

417  {
418  pRedirectCounter = redirectCounter;
419  }

Referenced by XrdCl::MessageUtils::RedirectMessage(), and XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetStateful()

void XrdCl::XRootDMsgHandler::SetStateful ( bool  stateful)
inline

Definition at line 426 of file XrdClXRootDMsgHandler.hh.

427  {
428  pStateful = stateful;
429  }

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ WaitDone()

void XrdCl::XRootDMsgHandler::WaitDone ( time_t  now)

Called after the wait time for kXR_wait has elapsed

Parameters
nowcurrent timestamp

Definition at line 1165 of file XrdClXRootDMsgHandler.cc.

1166  {
1167  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1168  }

References XrdCl::RedirectEntry::EntryWait.

◆ WriteMessageBody()

XRootDStatus XrdCl::XRootDMsgHandler::WriteMessageBody ( Socket socket,
uint32_t &  bytesWritten 
)
overridevirtual

Write message body directly to a socket - called if IsRaw returns true - only socket related errors may be returned here

Parameters
socketthe socket to read from
bytesWrittennumber of bytes written by the method
Returns
stOK & suDone if the whole body has been processed stOK & suRetry if more data needs to be written stError on failure

Reimplemented from XrdCl::MsgHandler.

Definition at line 1016 of file XrdClXRootDMsgHandler.cc.

1018  {
1019  //--------------------------------------------------------------------------
1020  // First check if it is a PgWrite
1021  //--------------------------------------------------------------------------
1022  if( !pChunkList->empty() && !pCrc32cDigests.empty() )
1023  {
1024  //------------------------------------------------------------------------
1025  // PgWrite will have just one chunk
1026  //------------------------------------------------------------------------
1027  ChunkInfo chunk = pChunkList->front();
1028  //------------------------------------------------------------------------
1029  // Calculate the size of the first and last page (in case the chunk is not
1030  // 4KB aligned)
1031  //------------------------------------------------------------------------
1032  int fLen = 0, lLen = 0;
1033  size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
1034 
1035  //------------------------------------------------------------------------
1036  // Set the crc32c buffer if not ready yet
1037  //------------------------------------------------------------------------
1038  if( pPgWrtCksumBuff.GetCursor() == 0 )
1039  {
1040  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1041  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1042  }
1043 
1044  uint32_t btsLeft = chunk.length - pAsyncOffset;
1045  uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1046  if( pglen > btsLeft ) pglen = btsLeft;
1047  char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
1048 
1049  while( btsLeft > 0 )
1050  {
1051  // first write the crc32c digest
1052  while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
1053  {
1054  uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
1055  char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
1056  int btswrt = 0;
1057  Status st = socket->Send( dgstbuf, dgstlen, btswrt );
1058  if( !st.IsOK() ) return st;
1059  bytesWritten += btswrt;
1060  pPgWrtCksumBuff.AdvanceCursor( btswrt );
1061  if( st.code == suRetry ) return st;
1062  }
1063  // then write the raw data (one page)
1064  int btswrt = 0;
1065  Status st = socket->Send( pgbuf, pglen, btswrt );
1066  if( !st.IsOK() ) return st;
1067  pgbuf += btswrt;
1068  pglen -= btswrt;
1069  btsLeft -= btswrt;
1070  bytesWritten += btswrt;
1071  pAsyncOffset += btswrt; // update the offset to the raw data
1072  if( st.code == suRetry ) return st;
1073  // if we managed to write all the data ...
1074  if( pglen == 0 )
1075  {
1076  // move to the next page
1077  ++pPgWrtCurrentPageNb;
1078  if( pPgWrtCurrentPageNb < nbpgs )
1079  {
1080  // set the digest buffer
1081  pPgWrtCksumBuff.SetCursor( 0 );
1082  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1083  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1084  }
1085  // set the page length
1086  pglen = XrdSys::PageSize;
1087  if( pglen > btsLeft ) pglen = btsLeft;
1088  // reset offset in the current page
1089  pPgWrtCurrentPageOffset = 0;
1090  }
1091  else
1092  // otherwise just adjust the offset in the current page
1093  pPgWrtCurrentPageOffset += btswrt;
1094 
1095  }
1096  }
1097  else if( !pChunkList->empty() )
1098  {
1099  size_t size = pChunkList->size();
1100  for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1101  {
1102  char *buffer = (char*)(*pChunkList)[i].buffer;
1103  uint32_t size = (*pChunkList)[i].length;
1104  size_t leftToBeWritten = size - pAsyncOffset;
1105 
1106  while( leftToBeWritten )
1107  {
1108  int btswrt = 0;
1109  Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1110  bytesWritten += btswrt;
1111  if( !st.IsOK() || st.code == suRetry ) return st;
1112  pAsyncOffset += btswrt;
1113  leftToBeWritten -= btswrt;
1114  }
1115  //----------------------------------------------------------------------
1116  // Remember that we have moved to the next chunk, also clear the offset
1117  // within the buffer as we are going to move to a new one
1118  //----------------------------------------------------------------------
1119  ++pAsyncChunkIndex;
1120  pAsyncOffset = 0;
1121  }
1122  }
1123  else
1124  {
1125  Log *log = DefaultEnv::GetLog();
1126 
1127  //------------------------------------------------------------------------
1128  // If the socket is encrypted we cannot use a kernel buffer, we have to
1129  // convert to user space buffer
1130  //------------------------------------------------------------------------
1131  if( socket->IsEncrypted() )
1132  {
1133  log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1134  pUrl.GetHostId().c_str() );
1135 
1136  char *ubuff = 0;
1137  ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1138  if( ret < 0 ) return Status( stError, errInternal );
1139  pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1140  return WriteMessageBody( socket, bytesWritten );
1141  }
1142 
1143  //------------------------------------------------------------------------
1144  // Send the data
1145  //------------------------------------------------------------------------
1146  while( !pKBuff->Empty() )
1147  {
1148  int btswrt = 0;
1149  Status st = socket->Send( *pKBuff, btswrt );
1150  bytesWritten += btswrt;
1151  if( !st.IsOK() || st.code == suRetry ) return st;
1152  }
1153 
1154  log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1155  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1156  }
1157 
1158  return Status();
1159  }
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)

References XrdCl::Buffer::AdvanceCursor(), XrdCl::ChunkInfo::buffer, XrdCl::Status::code, XrdOucPgrwUtils::csNum(), XrdCl::Log::Debug(), XrdSys::KernelBuffer::Empty(), XrdCl::errInternal, XrdCl::Buffer::GetBuffer(), XrdCl::Buffer::GetBufferAtCursor(), XrdCl::Buffer::GetCursor(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Socket::IsEncrypted(), XrdCl::Status::IsOK(), XrdCl::ChunkInfo::length, XrdSys::Move(), XrdCl::ChunkInfo::offset, XrdSys::PageSize, XrdCl::Socket::Send(), XrdCl::Buffer::SetCursor(), XrdCl::stError, XrdCl::suRetry, and XrdCl::XRootDMsg.

+ Here is the call graph for this function:

Friends And Related Function Documentation

◆ HandleRspJob

friend class HandleRspJob
friend

Definition at line 121 of file XrdClXRootDMsgHandler.hh.


The documentation for this class was generated from the following files: