XRootD
XrdClXRootDMsgHandler.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClLog.hh"
27 #include "XrdCl/XrdClDefaultEnv.hh"
28 #include "XrdCl/XrdClConstants.hh"
30 #include "XrdCl/XrdClMessage.hh"
31 #include "XrdCl/XrdClURL.hh"
32 #include "XrdCl/XrdClUtils.hh"
34 #include "XrdCl/XrdClJobManager.hh"
35 #include "XrdCl/XrdClSIDManager.hh"
39 #include "XrdCl/XrdClSocket.hh"
40 #include "XrdCl/XrdClTls.hh"
41 #include "XrdCl/XrdClOptimizers.hh"
42 
43 #include "XrdOuc/XrdOucCRC.hh"
45 
46 #include "XrdSys/XrdSysPlatform.hh" // same as above
47 #include "XrdSys/XrdSysAtomics.hh"
48 #include "XrdSys/XrdSysPthread.hh"
49 #include <memory>
50 #include <sstream>
51 #include <numeric>
52 
53 namespace
54 {
55  //----------------------------------------------------------------------------
56  // We need an extra task what will run the handler in the future, because
57  // tasks get deleted and we need the handler
58  //----------------------------------------------------------------------------
59  class WaitTask: public XrdCl::Task
60  {
61  public:
62  WaitTask( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
63  {
64  std::ostringstream o;
65  o << "WaitTask for: 0x" << handler->GetRequest();
66  SetName( o.str() );
67  }
68 
69  virtual time_t Run( time_t now )
70  {
71  pHandler->WaitDone( now );
72  return 0;
73  }
74  private:
75  XrdCl::XRootDMsgHandler *pHandler;
76  };
77 }
78 
79 namespace XrdCl
80 {
81  //----------------------------------------------------------------------------
82  // Delegate the response handling to the thread-pool
83  //----------------------------------------------------------------------------
84  class HandleRspJob: public XrdCl::Job
85  {
86  public:
87  HandleRspJob( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
88  {
89 
90  }
91 
92  virtual ~HandleRspJob()
93  {
94 
95  }
96 
97  virtual void Run( void *arg )
98  {
99  pHandler->HandleResponse();
100  delete this;
101  }
102  private:
103  XrdCl::XRootDMsgHandler *pHandler;
104  };
105 
106  //----------------------------------------------------------------------------
107  // Examine an incoming message, and decide on the action to be taken
108  //----------------------------------------------------------------------------
109  uint16_t XRootDMsgHandler::Examine( std::shared_ptr<Message> &msg )
110  {
111  const int sst = pSendingState.fetch_or( kSawResp );
112 
113  if( !( sst & kSendDone ) && !( sst & kSawResp ) )
114  {
115  // we must have been sent although we haven't got the OnStatusReady
116  // notification yet. Set the inflight notice.
117 
118  Log *log = DefaultEnv::GetLog();
119  log->Dump( XRootDMsg, "[%s] Message %s reply received before notification "
120  "that it was sent, assuming it was sent ok.",
121  pUrl.GetHostId().c_str(),
122  pRequest->GetObfuscatedDescription().c_str() );
123  }
124 
125  //--------------------------------------------------------------------------
126  // if the MsgHandler is already being used to process another request
127  // (kXR_oksofar) we need to wait
128  //--------------------------------------------------------------------------
129  if( pOksofarAsAnswer )
130  {
131  XrdSysCondVarHelper lck( pCV );
132  while( pResponse ) pCV.Wait();
133  }
134  else
135  {
136  if( pResponse )
137  {
138  Log *log = DefaultEnv::GetLog();
139  log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
140  "it already owns a response: %p (message: %s ).",
141  pUrl.GetHostId().c_str(), (void*)this,
142  pRequest->GetObfuscatedDescription().c_str() );
143  }
144  }
145 
146  if( msg->GetSize() < 8 )
147  return Ignore;
148 
149  ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
150  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
151  uint16_t status = 0;
152  uint32_t dlen = 0;
153 
154  //--------------------------------------------------------------------------
155  // We only care about async responses, but those are extracted now
156  // in the SocketHandler.
157  //--------------------------------------------------------------------------
158  if( rsp->hdr.status == kXR_attn )
159  {
160  return Ignore;
161  }
162  //--------------------------------------------------------------------------
163  // We got a sync message - check if it belongs to us
164  //--------------------------------------------------------------------------
165  else
166  {
167  if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
168  rsp->hdr.streamid[1] != req->header.streamid[1] )
169  return Ignore;
170 
171  status = rsp->hdr.status;
172  dlen = rsp->hdr.dlen;
173  }
174 
175  //--------------------------------------------------------------------------
176  // We take the ownership of the message and decide what we will do
177  // with the handler itself, the options are:
178  // 1) we want to either read in raw mode (the Raw flag) or have the message
179  // body reconstructed for us by the TransportHandler by the time
180  // Process() is called (default, no extra flag)
181  // 2) we either got a full response in which case we don't want to be
182  // notified about anything anymore (RemoveHandler) or we got a partial
183  // answer and we need to wait for more (default, no extra flag)
184  //--------------------------------------------------------------------------
185  pResponse = msg;
186  pBodyReader->SetDataLength( dlen );
187 
188  Log *log = DefaultEnv::GetLog();
189  switch( status )
190  {
191  //------------------------------------------------------------------------
192  // Handle the cached cases
193  //------------------------------------------------------------------------
194  case kXR_error:
195  case kXR_redirect:
196  case kXR_wait:
197  return RemoveHandler;
198 
199  case kXR_waitresp:
200  {
201  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
202  "message %s", pUrl.GetHostId().c_str(),
203  pRequest->GetObfuscatedDescription().c_str() );
204 
205  pResponse.reset();
206  return Ignore; // This must be handled synchronously!
207  }
208 
209  //------------------------------------------------------------------------
210  // Handle the potential raw cases
211  //------------------------------------------------------------------------
212  case kXR_ok:
213  {
214  //----------------------------------------------------------------------
215  // For kXR_read we read in raw mode
216  //----------------------------------------------------------------------
217  uint16_t reqId = ntohs( req->header.requestid );
218  if( reqId == kXR_read )
219  {
220  return Raw | RemoveHandler;
221  }
222 
223  //----------------------------------------------------------------------
224  // kXR_readv is the same as kXR_read
225  //----------------------------------------------------------------------
226  if( reqId == kXR_readv )
227  {
228  return Raw | RemoveHandler;
229  }
230 
231  //----------------------------------------------------------------------
232  // For everything else we just take what we got
233  //----------------------------------------------------------------------
234  return RemoveHandler;
235  }
236 
237  //------------------------------------------------------------------------
238  // kXR_oksofars are special, they are not full responses, so we reset
239  // the response pointer to 0 and add the message to the partial list
240  //------------------------------------------------------------------------
241  case kXR_oksofar:
242  {
243  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
244  "%s", pUrl.GetHostId().c_str(),
245  pRequest->GetObfuscatedDescription().c_str() );
246 
247  if( !pOksofarAsAnswer )
248  {
249  pPartialResps.emplace_back( std::move( pResponse ) );
250  }
251 
252  //----------------------------------------------------------------------
253  // For kXR_read we either read in raw mode if the message has not
254  // been fully reconstructed already, if it has, we adjust
255  // the buffer offset to prepare for the next one
256  //----------------------------------------------------------------------
257  uint16_t reqId = ntohs( req->header.requestid );
258  if( reqId == kXR_read )
259  {
260  pTimeoutFence.store( true, std::memory_order_relaxed );
261  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
262  }
263 
264  //----------------------------------------------------------------------
265  // kXR_readv is similar to read, except that the payload is different
266  //----------------------------------------------------------------------
267  if( reqId == kXR_readv )
268  {
269  pTimeoutFence.store( true, std::memory_order_relaxed );
270  return Raw | ( pOksofarAsAnswer ? None : NoProcess );
271  }
272 
273  return ( pOksofarAsAnswer ? None : NoProcess );
274  }
275 
276  case kXR_status:
277  {
278  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
279  "%s", pUrl.GetHostId().c_str(),
280  pRequest->GetObfuscatedDescription().c_str() );
281 
282  uint16_t reqId = ntohs( req->header.requestid );
283  if( reqId == kXR_pgwrite )
284  {
285  //--------------------------------------------------------------------
286  // In case of pgwrite by definition this wont be a partial response
287  // so we can already remove the handler from the in-queue
288  //--------------------------------------------------------------------
289  return RemoveHandler;
290  }
291 
292  //----------------------------------------------------------------------
293  // Otherwise (pgread), first of all we need to read the body of the
294  // kXR_status response, we can handle the raw data (if any) only after
295  // we have the whole kXR_status body
296  //----------------------------------------------------------------------
297  pTimeoutFence.store( true, std::memory_order_relaxed );
298  return None;
299  }
300 
301  //------------------------------------------------------------------------
302  // Default
303  //------------------------------------------------------------------------
304  default:
305  return RemoveHandler;
306  }
307  return RemoveHandler;
308  }
309 
310  //----------------------------------------------------------------------------
311  // Reexamine the incoming message, and decide on the action to be taken
312  //----------------------------------------------------------------------------
314  {
315  if( !pResponse )
316  return 0;
317 
318  Log *log = DefaultEnv::GetLog();
319  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
320 
321  //--------------------------------------------------------------------------
322  // Additional action is only required for kXR_status
323  //--------------------------------------------------------------------------
324  if( rsp->hdr.status != kXR_status ) return 0;
325 
326  //--------------------------------------------------------------------------
327  // Ignore malformed status response
328  //--------------------------------------------------------------------------
329  if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
330  {
331  log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
332  return Corrupted;
333  }
334 
335  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
336  uint16_t reqId = ntohs( req->header.requestid );
337  //--------------------------------------------------------------------------
338  // Unmarshal the status body
339  //--------------------------------------------------------------------------
340  XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
341 
342  if( !st.IsOK() && st.code == errDataError )
343  {
344  log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
345  st.GetErrorMessage().c_str() );
346  return Corrupted;
347  }
348 
349  if( !st.IsOK() )
350  {
351  log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
352  pUrl.GetHostId().c_str() );
353  pStatus = st;
354  HandleRspOrQueue();
355  return Ignore;
356  }
357 
358  //--------------------------------------------------------------------------
359  // Common handling for partial results
360  //--------------------------------------------------------------------------
361  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
363  {
364  pPartialResps.push_back( std::move( pResponse ) );
365  }
366 
367  //--------------------------------------------------------------------------
368  // Decide the actions that we need to take
369  //--------------------------------------------------------------------------
370  uint16_t action = 0;
371  if( reqId == kXR_pgread )
372  {
373  //----------------------------------------------------------------------
374  // The message contains only Status header and body but no raw data
375  //----------------------------------------------------------------------
376  if( !pPageReader )
377  pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
378  pPageReader->SetRsp( rspst );
379 
380  action |= Raw;
381 
383  action |= NoProcess;
384  else
385  action |= RemoveHandler;
386  }
387  else if( reqId == kXR_pgwrite )
388  {
389  // if data corruption has been detected on the server side we will
390  // send some additional data pointing to the pages that need to be
391  // retransmitted
392  if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
393  pResponse->GetCursor() )
394  action |= More;
395  }
396 
397  return action;
398  }
399 
400  //----------------------------------------------------------------------------
401  // Get handler sid
402  //----------------------------------------------------------------------------
403  uint16_t XRootDMsgHandler::GetSid() const
404  {
405  ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
406  return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
407  }
408 
409  //----------------------------------------------------------------------------
411  //----------------------------------------------------------------------------
413  {
414  Log *log = DefaultEnv::GetLog();
415 
416  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
417 
418  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
419 
420  //--------------------------------------------------------------------------
421  // If it is a local file, it can be only a metalink redirector
422  //--------------------------------------------------------------------------
423  if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
424  pHosts->back().protocol = kXR_PROTOCOLVERSION;
425 
426  //--------------------------------------------------------------------------
427  // We got an answer, check who we were talking to
428  //--------------------------------------------------------------------------
429  else
430  {
431  AnyObject qryResult;
432  int *qryResponse = nullptr;
433  pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
434  qryResult.Get( qryResponse );
435  if (qryResponse) {
436  pHosts->back().flags = *qryResponse;
437  delete qryResponse;
438  qryResponse = nullptr;
439  }
440  pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
441  qryResult.Get( qryResponse );
442  if (qryResponse) {
443  pHosts->back().protocol = *qryResponse;
444  delete qryResponse;
445  }
446  }
447 
448  //--------------------------------------------------------------------------
449  // Process the message
450  //--------------------------------------------------------------------------
451  Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
452  if( !st.IsOK() )
453  {
454  pStatus = Status( stFatal, errInvalidMessage );
455  HandleResponse();
456  return;
457  }
458 
459  //--------------------------------------------------------------------------
460  // we have an response for the message so it's not in fly anymore
461  //--------------------------------------------------------------------------
462  pSendingState.fetch_or( kInFlyDone );
463 
464  //--------------------------------------------------------------------------
465  // Reset the aggregated wait (used to omit wait response in case of Metalink
466  // redirector)
467  //--------------------------------------------------------------------------
468  if( rsp->hdr.status != kXR_wait )
469  pAggregatedWaitTime = 0;
470 
471  switch( rsp->hdr.status )
472  {
473  //------------------------------------------------------------------------
474  // kXR_ok - we're done here
475  //------------------------------------------------------------------------
476  case kXR_ok:
477  {
478  log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
479  pUrl.GetHostId().c_str(),
480  pRequest->GetObfuscatedDescription().c_str() );
481  pStatus = Status();
482  HandleResponse();
483  return;
484  }
485 
486  case kXR_status:
487  {
488  log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
489  pUrl.GetHostId().c_str(),
490  pRequest->GetObfuscatedDescription().c_str() );
491  pStatus = Status();
492  HandleResponse();
493  return;
494  }
495 
496  //------------------------------------------------------------------------
497  // kXR_ok - we're serving partial result to the user
498  //------------------------------------------------------------------------
499  case kXR_oksofar:
500  {
501  log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
502  pUrl.GetHostId().c_str(),
503  pRequest->GetObfuscatedDescription().c_str() );
504  pStatus = Status( stOK, suContinue );
505  HandleResponse();
506  return;
507  }
508 
509  //------------------------------------------------------------------------
510  // kXR_error - we've got a problem
511  //------------------------------------------------------------------------
512  case kXR_error:
513  {
514  char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
515  memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
516  log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
517  "[%d] %s", pUrl.GetHostId().c_str(),
518  pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
519  errmsg );
520  delete [] errmsg;
521 
522  HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
523  return;
524  }
525 
526  //------------------------------------------------------------------------
527  // kXR_redirect - they tell us to go elsewhere
528  //------------------------------------------------------------------------
529  case kXR_redirect:
530  {
531  if( rsp->hdr.dlen <= 4 )
532  {
533  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
534  pUrl.GetHostId().c_str() );
535  pStatus = Status( stError, errInvalidResponse );
536  HandleResponse();
537  return;
538  }
539 
540  char *urlInfoBuff = new char[rsp->hdr.dlen-3];
541  urlInfoBuff[rsp->hdr.dlen-4] = 0;
542  memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
543  std::string urlInfo = urlInfoBuff;
544  delete [] urlInfoBuff;
545  log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
546  "message %s: %s, port %d", pUrl.GetHostId().c_str(),
547  pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
548  rsp->body.redirect.port );
549 
550  //----------------------------------------------------------------------
551  // Check if we can proceed
552  //----------------------------------------------------------------------
553  if( !pRedirectCounter )
554  {
555  log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
556  "message %s, the last known error is: %s",
557  pUrl.GetHostId().c_str(),
558  pRequest->GetObfuscatedDescription().c_str(),
559  pLastError.ToString().c_str() );
560 
561 
562  pStatus = Status( stFatal, errRedirectLimit );
563  HandleResponse();
564  return;
565  }
566  --pRedirectCounter;
567 
568  //----------------------------------------------------------------------
569  // Keep the info about this server if we still need to find a load
570  // balancer
571  //----------------------------------------------------------------------
572  uint32_t flags = pHosts->back().flags;
573  if( !pHasLoadBalancer )
574  {
575  if( flags & kXR_isManager )
576  {
577  //------------------------------------------------------------------
578  // If the current server is a meta manager then it supersedes
579  // any existing load balancer, otherwise we assign a load-balancer
580  // only if it has not been already assigned
581  //------------------------------------------------------------------
582  if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
583  {
584  pLoadBalancer = pHosts->back();
585  log->Dump( XRootDMsg, "[%s] Current server has been assigned "
586  "as a load-balancer for message %s",
587  pUrl.GetHostId().c_str(),
588  pRequest->GetObfuscatedDescription().c_str() );
589  HostList::iterator it;
590  for( it = pHosts->begin(); it != pHosts->end(); ++it )
591  it->loadBalancer = false;
592  pHosts->back().loadBalancer = true;
593  }
594  }
595  }
596 
597  //----------------------------------------------------------------------
598  // If the redirect comes from a data server safe the URL because
599  // in case of a failure we will use it as the effective data server URL
600  // for the tried CGI opaque info
601  //----------------------------------------------------------------------
602  if( flags & kXR_isServer )
603  pEffectiveDataServerUrl = new URL( pHosts->back().url );
604 
605  //----------------------------------------------------------------------
606  // Build the URL and check it's validity
607  //----------------------------------------------------------------------
608  std::vector<std::string> urlComponents;
609  std::string newCgi;
610  Utils::splitString( urlComponents, urlInfo, "?" );
611 
612  std::ostringstream o;
613 
614  o << urlComponents[0];
615  if( rsp->body.redirect.port > 0 )
616  o << ":" << rsp->body.redirect.port << "/";
617  else if( rsp->body.redirect.port < 0 )
618  {
619  //--------------------------------------------------------------------
620  // check if the manager wants to enforce write recovery at himself
621  // (beware we are dealing here with negative flags)
622  //--------------------------------------------------------------------
623  if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
624  pHosts->back().flags |= kXR_recoverWrts;
625 
626  //--------------------------------------------------------------------
627  // check if the manager wants to collapse the communication channel
628  // (the redirect host is to replace the current host)
629  //--------------------------------------------------------------------
630  if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
631  {
632  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
633  pPostMaster->CollapseRedirect( pUrl, url );
634  }
635 
636  if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
637  {
638  std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
639  if( Utils::CheckEC( pRequest, url ) )
640  pRedirectAsAnswer = true;
641  }
642  }
643 
644  URL newUrl = URL( o.str() );
645  if( !newUrl.IsValid() )
646  {
647  pStatus = Status( stError, errInvalidRedirectURL );
648  log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
649  pUrl.GetHostId().c_str(), urlInfo.c_str() );
650  HandleResponse();
651  return;
652  }
653 
654  if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
655  newUrl.SetUserName( pUrl.GetUserName() );
656 
657  if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
658  newUrl.SetPassword( pUrl.GetPassword() );
659 
660  //----------------------------------------------------------------------
661  // Forward any "xrd.*" params from the original client request also to
662  // the new redirection url
663  // Also, we need to preserve any "xrdcl.*' as they are important for
664  // our internal workflows.
665  //----------------------------------------------------------------------
666  std::ostringstream ossXrd;
667  const URL::ParamsMap &urlParams = pUrl.GetParams();
668 
669  for(URL::ParamsMap::const_iterator it = urlParams.begin();
670  it != urlParams.end(); ++it )
671  {
672  if( it->first.compare( 0, 4, "xrd." ) &&
673  it->first.compare( 0, 6, "xrdcl." ) )
674  continue;
675 
676  ossXrd << it->first << '=' << it->second << '&';
677  }
678 
679  std::string xrdCgi = ossXrd.str();
680  pRedirectUrl = newUrl.GetURL();
681 
682  URL cgiURL;
683  if( urlComponents.size() > 1 )
684  {
685  pRedirectUrl += "?";
686  pRedirectUrl += urlComponents[1];
687  std::ostringstream o;
688  o << "fake://fake:111//fake?";
689  o << urlComponents[1];
690 
691  if( urlComponents.size() == 3 )
692  o << '?' << urlComponents[2];
693 
694  if (!xrdCgi.empty())
695  {
696  o << '&' << xrdCgi;
697  pRedirectUrl += '&';
698  pRedirectUrl += xrdCgi;
699  }
700 
701  cgiURL = URL( o.str() );
702  }
703  else {
704  if (!xrdCgi.empty())
705  {
706  std::ostringstream o;
707  o << "fake://fake:111//fake?";
708  o << xrdCgi;
709  cgiURL = URL( o.str() );
710  pRedirectUrl += '?';
711  pRedirectUrl += xrdCgi;
712  }
713  }
714 
715  //----------------------------------------------------------------------
716  // Check if we need to return the URL as a response
717  //----------------------------------------------------------------------
718  if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
719  newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
720  !newUrl.IsLocalFile() )
721  pRedirectAsAnswer = true;
722 
723  if( pRedirectAsAnswer )
724  {
725  pStatus = Status( stError, errRedirect );
726  HandleResponse();
727  return;
728  }
729 
730  //----------------------------------------------------------------------
731  // Rewrite the message in a way required to send it to another server
732  //----------------------------------------------------------------------
733  newUrl.SetParams( cgiURL.GetParams() );
734  Status st = RewriteRequestRedirect( newUrl );
735  if( !st.IsOK() )
736  {
737  pStatus = st;
738  HandleResponse();
739  return;
740  }
741 
742  //----------------------------------------------------------------------
743  // Make sure we don't change the protocol by accident (root vs roots)
744  //----------------------------------------------------------------------
745  if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
746  ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
747  newUrl.SetProtocol( "roots" );
748 
749  //----------------------------------------------------------------------
750  // Send the request to the new location
751  //----------------------------------------------------------------------
752  HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
753  return;
754  }
755 
756  //------------------------------------------------------------------------
757  // kXR_wait - we wait, and re-issue the request later
758  //------------------------------------------------------------------------
759  case kXR_wait:
760  {
761  uint32_t waitSeconds = 0;
762 
763  if( rsp->hdr.dlen >= 4 )
764  {
765  char *infoMsg = new char[rsp->hdr.dlen-3];
766  infoMsg[rsp->hdr.dlen-4] = 0;
767  memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
768  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
769  "message %s: %s", pUrl.GetHostId().c_str(),
770  rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
771  infoMsg );
772  delete [] infoMsg;
773  waitSeconds = rsp->body.wait.seconds;
774  }
775  else
776  {
777  log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
778  "message %s", pUrl.GetHostId().c_str(),
779  pRequest->GetObfuscatedDescription().c_str() );
780  }
781 
782  pAggregatedWaitTime += waitSeconds;
783 
784  // We need a special case if the data node comes from metalink
785  // redirector. In this case it might make more sense to try the
786  // next entry in the Metalink than wait.
787  if( OmitWait( *pRequest, pLoadBalancer.url ) )
788  {
789  int maxWait = DefaultMaxMetalinkWait;
790  DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
791  if( pAggregatedWaitTime > maxWait )
792  {
793  UpdateTriedCGI();
794  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
795  return;
796  }
797  }
798 
799  //----------------------------------------------------------------------
800  // Some messages require rewriting before they can be sent again
801  // after wait
802  //----------------------------------------------------------------------
803  Status st = RewriteRequestWait();
804  if( !st.IsOK() )
805  {
806  pStatus = st;
807  HandleResponse();
808  return;
809  }
810 
811  //----------------------------------------------------------------------
812  // Register a task to resend the message in some seconds, if we still
813  // have time to do that, and report a timeout otherwise
814  //----------------------------------------------------------------------
815  time_t resendTime = ::time(0)+waitSeconds;
816 
817  if( resendTime < pExpiration )
818  {
819  log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
820  pUrl.GetHostId().c_str(), (void*)this,
821  pRequest->GetObfuscatedDescription().c_str() );
822 
823  TaskManager *taskMgr = pPostMaster->GetTaskManager();
824  taskMgr->RegisterTask( new WaitTask( this ), resendTime );
825  }
826  else
827  {
828  log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
829  pUrl.GetHostId().c_str(),
830  pRequest->GetObfuscatedDescription().c_str() );
831  HandleError( Status( stError, errOperationExpired) );
832  }
833  return;
834  }
835 
836  //------------------------------------------------------------------------
837  // kXR_waitresp - the response will be returned in some seconds as an
838  // unsolicited message. Currently all messages of this type are handled
839  // one step before in the XrdClStream::OnIncoming as they need to be
840  // processed synchronously.
841  //------------------------------------------------------------------------
842  case kXR_waitresp:
843  {
844  if( rsp->hdr.dlen < 4 )
845  {
846  log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
847  pUrl.GetHostId().c_str() );
848  pStatus = Status( stError, errInvalidResponse );
849  HandleResponse();
850  return;
851  }
852 
853  log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
854  "message %s", pUrl.GetHostId().c_str(),
855  rsp->body.waitresp.seconds,
856  pRequest->GetObfuscatedDescription().c_str() );
857  return;
858  }
859 
860  //------------------------------------------------------------------------
861  // Default - unrecognized/unsupported response, declare an error
862  //------------------------------------------------------------------------
863  default:
864  {
865  log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
866  "message %s", pUrl.GetHostId().c_str(),
867  rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
868  pStatus = Status( stError, errInvalidResponse );
869  HandleResponse();
870  return;
871  }
872  }
873 
874  return;
875  }
876 
877  //----------------------------------------------------------------------------
878  // Handle an event other that a message arrival - may be timeout
879  //----------------------------------------------------------------------------
881  XRootDStatus status )
882  {
883  Log *log = DefaultEnv::GetLog();
884  log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
885  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
886 
887  if( event == Ready )
888  return 0;
889 
890  if( pTimeoutFence.load( std::memory_order_relaxed ) )
891  return 0;
892 
893  HandleError( status );
894  return RemoveHandler;
895  }
896 
897  //----------------------------------------------------------------------------
898  // Read message body directly from a socket
899  //----------------------------------------------------------------------------
901  Socket *socket,
902  uint32_t &bytesRead )
903  {
904  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
905  uint16_t reqId = ntohs( req->header.requestid );
906 
907  if( reqId == kXR_pgread )
908  return pPageReader->Read( *socket, bytesRead );
909 
910  return pBodyReader->Read( *socket, bytesRead );
911  }
912 
913  //----------------------------------------------------------------------------
914  // We're here when we requested sending something over the wire
915  // or other status update on this action.
916  // We can be called when message is still in out-queue, with an
917  // error status indicating message will not be sent.
918  //----------------------------------------------------------------------------
920  XRootDStatus status )
921  {
922  Log *log = DefaultEnv::GetLog();
923 
924  if( status.IsOK() )
925  {
926  log->Dump( XRootDMsg, "[%s] Got notification that outgoing message %s "
927  "was sent successfully.", pUrl.GetHostId().c_str(),
928  message->GetObfuscatedDescription().c_str() );
929  }
930 
931  // After setting kSendDone processing of this object may continue in
932  // another thread. Unless we're in an error condition our object may
933  // be modified or even destroyed after this point.
934  const int sst = pSendingState.fetch_or( kSendDone );
935 
936  // ignore if we're already in this state
937  if( status.IsOK() && ( sst & kSendDone ) ) return;
938 
939  // if we have already seen a response we should be getting notified
940  // of a successful send. But if not, log and do our best to recover.
941  if( !status.IsOK() && ( ( sst & kFinalResp ) || ( sst & kSawResp ) ) )
942  {
943  log->Error( XRootDMsg, "[%s] Unexpected error for message %s. Trying to "
944  "recover.", pUrl.GetHostId().c_str(),
945  message->GetObfuscatedDescription().c_str() );
946  HandleError( status );
947  return;
948  }
949 
950  if( sst & kFinalResp )
951  {
952  // late notification and we already have final response for user,
953  // need to queue handler callback.
954  HandleRspOrQueue();
955  return;
956  }
957 
958  if( sst & kRetryAtSrv )
959  {
960  // late notification and we already received a response and know
961  // we need to retry at differnt server.
962  HandleError( RetryAtServer( pRetryAtUrl, pRetryAtEntryType ) );
963  return;
964  }
965 
966  if( sst & kSawResp )
967  {
968  // late notification, response processing may be happening in another
969  // thread.
970  return;
971  }
972 
973  //--------------------------------------------------------------------------
974  // We were successful, so we now need to listen for a response
975  //--------------------------------------------------------------------------
976  if( status.IsOK() )
977  {
978  // this is the expcted order, we got the notificaiton but no response
979  // received yet. However another thread is liable to be processing
980  // one or sending a final response and deleting us at any point now.
981  return;
982  }
983 
984  //--------------------------------------------------------------------------
985  // We have failed, recover if possible
986  //--------------------------------------------------------------------------
987  log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
988  "recover.", pUrl.GetHostId().c_str(),
989  message->GetObfuscatedDescription().c_str() );
990  HandleError( status );
991  }
992 
993  //----------------------------------------------------------------------------
994  // Are we a raw writer or not?
995  //----------------------------------------------------------------------------
997  {
998  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
999  uint16_t reqId = ntohs( req->header.requestid );
1000  if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
1001  return true;
1002  // checkpoint + execute
1003  if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
1004  {
1005  ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
1006  reqId = ntohs( xeq->header.requestid );
1007  return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
1008  }
1009 
1010  return false;
1011  }
1012 
1013  //----------------------------------------------------------------------------
1014  // Write the message body
1015  //----------------------------------------------------------------------------
1017  uint32_t &bytesWritten )
1018  {
1019  //--------------------------------------------------------------------------
1020  // First check if it is a PgWrite
1021  //--------------------------------------------------------------------------
1022  if( !pChunkList->empty() && !pCrc32cDigests.empty() )
1023  {
1024  //------------------------------------------------------------------------
1025  // PgWrite will have just one chunk
1026  //------------------------------------------------------------------------
1027  ChunkInfo chunk = pChunkList->front();
1028  //------------------------------------------------------------------------
1029  // Calculate the size of the first and last page (in case the chunk is not
1030  // 4KB aligned)
1031  //------------------------------------------------------------------------
1032  int fLen = 0, lLen = 0;
1033  size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
1034 
1035  //------------------------------------------------------------------------
1036  // Set the crc32c buffer if not ready yet
1037  //------------------------------------------------------------------------
1038  if( pPgWrtCksumBuff.GetCursor() == 0 )
1039  {
1040  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1041  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1042  }
1043 
1044  uint32_t btsLeft = chunk.length - pAsyncOffset;
1045  uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
1046  if( pglen > btsLeft ) pglen = btsLeft;
1047  char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
1048 
1049  while( btsLeft > 0 )
1050  {
1051  // first write the crc32c digest
1052  while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
1053  {
1054  uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
1055  char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
1056  int btswrt = 0;
1057  Status st = socket->Send( dgstbuf, dgstlen, btswrt );
1058  if( !st.IsOK() ) return st;
1059  bytesWritten += btswrt;
1060  pPgWrtCksumBuff.AdvanceCursor( btswrt );
1061  if( st.code == suRetry ) return st;
1062  }
1063  // then write the raw data (one page)
1064  int btswrt = 0;
1065  Status st = socket->Send( pgbuf, pglen, btswrt );
1066  if( !st.IsOK() ) return st;
1067  pgbuf += btswrt;
1068  pglen -= btswrt;
1069  btsLeft -= btswrt;
1070  bytesWritten += btswrt;
1071  pAsyncOffset += btswrt; // update the offset to the raw data
1072  if( st.code == suRetry ) return st;
1073  // if we managed to write all the data ...
1074  if( pglen == 0 )
1075  {
1076  // move to the next page
1077  ++pPgWrtCurrentPageNb;
1078  if( pPgWrtCurrentPageNb < nbpgs )
1079  {
1080  // set the digest buffer
1081  pPgWrtCksumBuff.SetCursor( 0 );
1082  uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1083  memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1084  }
1085  // set the page length
1086  pglen = XrdSys::PageSize;
1087  if( pglen > btsLeft ) pglen = btsLeft;
1088  // reset offset in the current page
1089  pPgWrtCurrentPageOffset = 0;
1090  }
1091  else
1092  // otherwise just adjust the offset in the current page
1093  pPgWrtCurrentPageOffset += btswrt;
1094 
1095  }
1096  }
1097  else if( !pChunkList->empty() )
1098  {
1099  size_t size = pChunkList->size();
1100  for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1101  {
1102  char *buffer = (char*)(*pChunkList)[i].buffer;
1103  uint32_t size = (*pChunkList)[i].length;
1104  size_t leftToBeWritten = size - pAsyncOffset;
1105 
1106  while( leftToBeWritten )
1107  {
1108  int btswrt = 0;
1109  Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1110  bytesWritten += btswrt;
1111  if( !st.IsOK() || st.code == suRetry ) return st;
1112  pAsyncOffset += btswrt;
1113  leftToBeWritten -= btswrt;
1114  }
1115  //----------------------------------------------------------------------
1116  // Remember that we have moved to the next chunk, also clear the offset
1117  // within the buffer as we are going to move to a new one
1118  //----------------------------------------------------------------------
1119  ++pAsyncChunkIndex;
1120  pAsyncOffset = 0;
1121  }
1122  }
1123  else
1124  {
1125  Log *log = DefaultEnv::GetLog();
1126 
1127  //------------------------------------------------------------------------
1128  // If the socket is encrypted we cannot use a kernel buffer, we have to
1129  // convert to user space buffer
1130  //------------------------------------------------------------------------
1131  if( socket->IsEncrypted() )
1132  {
1133  log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1134  pUrl.GetHostId().c_str() );
1135 
1136  char *ubuff = 0;
1137  ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1138  if( ret < 0 ) return Status( stError, errInternal );
1139  pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1140  return WriteMessageBody( socket, bytesWritten );
1141  }
1142 
1143  //------------------------------------------------------------------------
1144  // Send the data
1145  //------------------------------------------------------------------------
1146  while( !pKBuff->Empty() )
1147  {
1148  int btswrt = 0;
1149  Status st = socket->Send( *pKBuff, btswrt );
1150  bytesWritten += btswrt;
1151  if( !st.IsOK() || st.code == suRetry ) return st;
1152  }
1153 
1154  log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1155  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1156  }
1157 
1158  return Status();
1159  }
1160 
1161  //----------------------------------------------------------------------------
1162  // We're here when we got a time event. We needed to re-issue the request
1163  // in some time in the future, and that moment has arrived
1164  //----------------------------------------------------------------------------
1166  {
1167  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1168  }
1169 
1170  //----------------------------------------------------------------------------
1171  // Bookkeeping after partial response has been received.
1172  //----------------------------------------------------------------------------
1174  {
1175  pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1176  }
1177 
1178  //----------------------------------------------------------------------------
1179  // Unpack the message and call the response handler
1180  //----------------------------------------------------------------------------
1181  void XRootDMsgHandler::HandleResponse()
1182  {
1183  //--------------------------------------------------------------------------
1184  // Is it a final response?
1185  //--------------------------------------------------------------------------
1186  bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
1187  if( finalrsp )
1188  {
1189  // Do not do final processing of the response if we haven't had
1190  // confirmation the original request was sent (via OnStatusReady).
1191  // The final processing will be triggered when we get the confirm.
1192  const int sst = pSendingState.fetch_or( kFinalResp );
1193  if( ( sst & kSawReadySend ) && !( sst & kSendDone ) )
1194  return;
1195  }
1196 
1197  //--------------------------------------------------------------------------
1198  // Process the response and notify the listener
1199  //--------------------------------------------------------------------------
1201  XRootDStatus *status = ProcessStatus();
1202  AnyObject *response = 0;
1203 
1204  Log *log = DefaultEnv::GetLog();
1205  log->Debug( ExDbgMsg, "[%s] Calling MsgHandler: %p (message: %s ) "
1206  "with status: %s.",
1207  pUrl.GetHostId().c_str(), (void*)this,
1208  pRequest->GetObfuscatedDescription().c_str(),
1209  status->ToString().c_str() );
1210 
1211  if( status->IsOK() )
1212  {
1213  Status st = ParseResponse( response );
1214  if( !st.IsOK() )
1215  {
1216  delete status;
1217  delete response;
1218  status = new XRootDStatus( st );
1219  response = 0;
1220  }
1221  }
1222 
1223  //--------------------------------------------------------------------------
1224  // Close the redirect entry if necessary
1225  //--------------------------------------------------------------------------
1226  if( pRdirEntry )
1227  {
1228  pRdirEntry->status = *status;
1229  pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1230  }
1231 
1232  //--------------------------------------------------------------------------
1233  // Release the stream id
1234  //--------------------------------------------------------------------------
1235  if( pSidMgr && finalrsp )
1236  {
1237  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1238  if( status->IsOK() || !IsInFly() ||
1239  !( status->code == errOperationExpired || status->code == errOperationInterrupted ) )
1240  pSidMgr->ReleaseSID( req->header.streamid );
1241  }
1242 
1243  HostList *hosts = pHosts.release();
1244  if( !finalrsp )
1245  pHosts.reset( new HostList( *hosts ) );
1246 
1247  pResponseHandler->HandleResponseWithHosts( status, response, hosts );
1248 
1249  //--------------------------------------------------------------------------
1250  // if it is the final response there is nothing more to do ...
1251  //--------------------------------------------------------------------------
1252  if( finalrsp )
1253  delete this;
1254  //--------------------------------------------------------------------------
1255  // on the other hand if it is not the final response, we have to keep the
1256  // MsgHandler and delete the current response
1257  //--------------------------------------------------------------------------
1258  else
1259  {
1260  XrdSysCondVarHelper lck( pCV );
1261  pResponse.reset();
1262  pTimeoutFence.store( false, std::memory_order_relaxed );
1263  pCV.Broadcast();
1264  }
1265  }
1266 
1267 
1268  //----------------------------------------------------------------------------
1269  // Extract the status information from the stuff that we got
1270  //----------------------------------------------------------------------------
1271  XRootDStatus *XRootDMsgHandler::ProcessStatus()
1272  {
1273  XRootDStatus *st = new XRootDStatus( pStatus );
1274  ServerResponse *rsp = 0;
1275  if( pResponse )
1276  rsp = (ServerResponse *)pResponse->GetBuffer();
1277 
1278  if( !pStatus.IsOK() && rsp )
1279  {
1280  if( pStatus.code == errErrorResponse )
1281  {
1282  st->errNo = rsp->body.error.errnum;
1283  // omit the last character as the string returned from the server
1284  // (acording to protocol specs) should be null-terminated
1285  std::string errmsg( rsp->body.error.errmsg, rsp->hdr.dlen-5 );
1286  if( st->errNo == kXR_noReplicas && !pLastError.IsOK() )
1287  errmsg += " Last seen error: " + pLastError.ToString();
1288  st->SetErrorMessage( errmsg );
1289  }
1290  else if( pStatus.code == errRedirect )
1291  st->SetErrorMessage( pRedirectUrl );
1292  }
1293  return st;
1294  }
1295 
1296  //------------------------------------------------------------------------
1297  // Parse the response and put it in an object that could be passed to
1298  // the user
1299  //------------------------------------------------------------------------
1300  Status XRootDMsgHandler::ParseResponse( AnyObject *&response )
1301  {
1302  if( !pResponse )
1303  return Status();
1304 
1305  ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
1306  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1307  Log *log = DefaultEnv::GetLog();
1308 
1309  //--------------------------------------------------------------------------
1310  // Handle redirect as an answer
1311  //--------------------------------------------------------------------------
1312  if( rsp->hdr.status == kXR_redirect )
1313  {
1314  log->Error( XRootDMsg, "Internal Error: unable to process redirect" );
1315  return 0;
1316  }
1317 
1318  Buffer buff;
1319  uint32_t length = 0;
1320  char *buffer = 0;
1321 
1322  //--------------------------------------------------------------------------
1323  // We don't have any partial answers so pass what we have
1324  //--------------------------------------------------------------------------
1325  if( pPartialResps.empty() )
1326  {
1327  buffer = rsp->body.buffer.data;
1328  length = rsp->hdr.dlen;
1329  }
1330  //--------------------------------------------------------------------------
1331  // Partial answers, we need to glue them together before parsing
1332  //--------------------------------------------------------------------------
1333  else if( req->header.requestid != kXR_read &&
1334  req->header.requestid != kXR_readv )
1335  {
1336  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1337  {
1338  ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1339  length += part->hdr.dlen;
1340  }
1341  length += rsp->hdr.dlen;
1342 
1343  buff.Allocate( length );
1344  uint32_t offset = 0;
1345  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1346  {
1347  ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1348  buff.Append( part->body.buffer.data, part->hdr.dlen, offset );
1349  offset += part->hdr.dlen;
1350  }
1351  buff.Append( rsp->body.buffer.data, rsp->hdr.dlen, offset );
1352  buffer = buff.GetBuffer();
1353  }
1354 
1355  //--------------------------------------------------------------------------
1356  // Right, but what was the question?
1357  //--------------------------------------------------------------------------
1358  switch( req->header.requestid )
1359  {
1360  //------------------------------------------------------------------------
1361  // kXR_mv, kXR_truncate, kXR_rm, kXR_mkdir, kXR_rmdir, kXR_chmod,
1362  // kXR_ping, kXR_close, kXR_write, kXR_sync
1363  //------------------------------------------------------------------------
1364  case kXR_mv:
1365  case kXR_truncate:
1366  case kXR_rm:
1367  case kXR_mkdir:
1368  case kXR_rmdir:
1369  case kXR_chmod:
1370  case kXR_ping:
1371  case kXR_close:
1372  case kXR_write:
1373  case kXR_writev:
1374  case kXR_sync:
1375  case kXR_chkpoint:
1376  return Status();
1377 
1378  //------------------------------------------------------------------------
1379  // kXR_locate
1380  //------------------------------------------------------------------------
1381  case kXR_locate:
1382  {
1383  AnyObject *obj = new AnyObject();
1384 
1385  char *nullBuffer = new char[length+1];
1386  nullBuffer[length] = 0;
1387  memcpy( nullBuffer, buffer, length );
1388 
1389  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1390  "LocateInfo: %s", pUrl.GetHostId().c_str(),
1391  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1392  LocationInfo *data = new LocationInfo();
1393 
1394  if( data->ParseServerResponse( nullBuffer ) == false )
1395  {
1396  delete obj;
1397  delete data;
1398  delete [] nullBuffer;
1399  return Status( stError, errInvalidResponse );
1400  }
1401  delete [] nullBuffer;
1402 
1403  obj->Set( data );
1404  response = obj;
1405  return Status();
1406  }
1407 
1408  //------------------------------------------------------------------------
1409  // kXR_stat
1410  //------------------------------------------------------------------------
1411  case kXR_stat:
1412  {
1413  AnyObject *obj = new AnyObject();
1414 
1415  //----------------------------------------------------------------------
1416  // Virtual File System stat (kXR_vfs)
1417  //----------------------------------------------------------------------
1418  if( req->stat.options & kXR_vfs )
1419  {
1420  StatInfoVFS *data = new StatInfoVFS();
1421 
1422  char *nullBuffer = new char[length+1];
1423  nullBuffer[length] = 0;
1424  memcpy( nullBuffer, buffer, length );
1425 
1426  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1427  "StatInfoVFS: %s", pUrl.GetHostId().c_str(),
1428  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1429 
1430  if( data->ParseServerResponse( nullBuffer ) == false )
1431  {
1432  delete obj;
1433  delete data;
1434  delete [] nullBuffer;
1435  return Status( stError, errInvalidResponse );
1436  }
1437  delete [] nullBuffer;
1438 
1439  obj->Set( data );
1440  }
1441  //----------------------------------------------------------------------
1442  // Normal stat
1443  //----------------------------------------------------------------------
1444  else
1445  {
1446  StatInfo *data = new StatInfo();
1447 
1448  char *nullBuffer = new char[length+1];
1449  nullBuffer[length] = 0;
1450  memcpy( nullBuffer, buffer, length );
1451 
1452  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as StatInfo: "
1453  "%s", pUrl.GetHostId().c_str(),
1454  pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1455 
1456  if( data->ParseServerResponse( nullBuffer ) == false )
1457  {
1458  delete obj;
1459  delete data;
1460  delete [] nullBuffer;
1461  return Status( stError, errInvalidResponse );
1462  }
1463  delete [] nullBuffer;
1464  obj->Set( data );
1465  }
1466 
1467  response = obj;
1468  return Status();
1469  }
1470 
1471  //------------------------------------------------------------------------
1472  // kXR_protocol
1473  //------------------------------------------------------------------------
1474  case kXR_protocol:
1475  {
1476  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ProtocolInfo",
1477  pUrl.GetHostId().c_str(),
1478  pRequest->GetObfuscatedDescription().c_str() );
1479 
1480  if( rsp->hdr.dlen < 8 )
1481  {
1482  log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
1483  pUrl.GetHostId().c_str() );
1484  return Status( stError, errInvalidResponse );
1485  }
1486 
1487  AnyObject *obj = new AnyObject();
1488  ProtocolInfo *data = new ProtocolInfo( rsp->body.protocol.pval,
1489  rsp->body.protocol.flags );
1490  obj->Set( data );
1491  response = obj;
1492  return Status();
1493  }
1494 
1495  //------------------------------------------------------------------------
1496  // kXR_dirlist
1497  //------------------------------------------------------------------------
1498  case kXR_dirlist:
1499  {
1500  AnyObject *obj = new AnyObject();
1501  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1502  "DirectoryList", pUrl.GetHostId().c_str(),
1503  pRequest->GetObfuscatedDescription().c_str() );
1504 
1505  char *path = new char[req->dirlist.dlen+1];
1506  path[req->dirlist.dlen] = 0;
1507  memcpy( path, pRequest->GetBuffer(24), req->dirlist.dlen );
1508 
1509  DirectoryList *data = new DirectoryList();
1510  data->SetParentName( path );
1511  delete [] path;
1512 
1513  char *nullBuffer = new char[length+1];
1514  nullBuffer[length] = 0;
1515  memcpy( nullBuffer, buffer, length );
1516 
1517  bool invalidrsp = false;
1518 
1519  if( !pDirListStarted )
1520  {
1521  pDirListWithStat = DirectoryList::HasStatInfo( nullBuffer );
1522  pDirListStarted = true;
1523 
1524  invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer );
1525  }
1526  else
1527  invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer, pDirListWithStat );
1528 
1529  if( invalidrsp )
1530  {
1531  delete data;
1532  delete obj;
1533  delete [] nullBuffer;
1534  return Status( stError, errInvalidResponse );
1535  }
1536 
1537  delete [] nullBuffer;
1538  obj->Set( data );
1539  response = obj;
1540  return Status();
1541  }
1542 
1543  //------------------------------------------------------------------------
1544  // kXR_open - if we got the statistics, otherwise return 0
1545  //------------------------------------------------------------------------
1546  case kXR_open:
1547  {
1548  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as OpenInfo",
1549  pUrl.GetHostId().c_str(),
1550  pRequest->GetObfuscatedDescription().c_str() );
1551 
1552  if( rsp->hdr.dlen < 4 )
1553  {
1554  log->Error( XRootDMsg, "[%s] Got invalid open response.",
1555  pUrl.GetHostId().c_str() );
1556  return Status( stError, errInvalidResponse );
1557  }
1558 
1559  AnyObject *obj = new AnyObject();
1560  StatInfo *statInfo = 0;
1561 
1562  //----------------------------------------------------------------------
1563  // Handle StatInfo if requested
1564  //----------------------------------------------------------------------
1565  if( req->open.options & kXR_retstat )
1566  {
1567  log->Dump( XRootDMsg, "[%s] Parsing StatInfo in response to %s",
1568  pUrl.GetHostId().c_str(),
1569  pRequest->GetObfuscatedDescription().c_str() );
1570 
1571  if( rsp->hdr.dlen >= 12 )
1572  {
1573  char *nullBuffer = new char[rsp->hdr.dlen-11];
1574  nullBuffer[rsp->hdr.dlen-12] = 0;
1575  memcpy( nullBuffer, buffer+12, rsp->hdr.dlen-12 );
1576 
1577  statInfo = new StatInfo();
1578  if( statInfo->ParseServerResponse( nullBuffer ) == false )
1579  {
1580  delete statInfo;
1581  statInfo = 0;
1582  }
1583  delete [] nullBuffer;
1584  }
1585 
1586  if( rsp->hdr.dlen < 12 || !statInfo )
1587  {
1588  log->Error( XRootDMsg, "[%s] Unable to parse StatInfo in response "
1589  "to %s", pUrl.GetHostId().c_str(),
1590  pRequest->GetObfuscatedDescription().c_str() );
1591  delete obj;
1592  return Status( stError, errInvalidResponse );
1593  }
1594  }
1595 
1596  OpenInfo *data = new OpenInfo( (uint8_t*)buffer,
1597  pResponse->GetSessionId(),
1598  statInfo );
1599  obj->Set( data );
1600  response = obj;
1601  return Status();
1602  }
1603 
1604  //------------------------------------------------------------------------
1605  // kXR_read
1606  //------------------------------------------------------------------------
1607  case kXR_read:
1608  {
1609  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ChunkInfo",
1610  pUrl.GetHostId().c_str(),
1611  pRequest->GetObfuscatedDescription().c_str() );
1612 
1613  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1614  {
1615  //--------------------------------------------------------------------
1616  // we are expecting to have only the header in the message, the raw
1617  // data have been readout into the user buffer
1618  //--------------------------------------------------------------------
1619  if( pPartialResps[i]->GetSize() > 8 )
1620  return Status( stOK, errInternal );
1621  }
1622  //----------------------------------------------------------------------
1623  // we are expecting to have only the header in the message, the raw
1624  // data have been readout into the user buffer
1625  //----------------------------------------------------------------------
1626  if( pResponse->GetSize() > 8 )
1627  return Status( stOK, errInternal );
1628  //----------------------------------------------------------------------
1629  // Get the response for the end user
1630  //----------------------------------------------------------------------
1631  return pBodyReader->GetResponse( response );
1632  }
1633 
1634  //------------------------------------------------------------------------
1635  // kXR_pgread
1636  //------------------------------------------------------------------------
1637  case kXR_pgread:
1638  {
1639  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as PageInfo",
1640  pUrl.GetHostId().c_str(),
1641  pRequest->GetObfuscatedDescription().c_str() );
1642 
1643  //----------------------------------------------------------------------
1644  // Glue in the cached responses if necessary
1645  //----------------------------------------------------------------------
1646  ChunkInfo chunk = pChunkList->front();
1647  bool sizeMismatch = false;
1648  uint32_t currentOffset = 0;
1649  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1650  {
1651  ServerResponseV2 *part = (ServerResponseV2*)pPartialResps[i]->GetBuffer();
1652 
1653  //--------------------------------------------------------------------
1654  // the actual size of the raw data without the crc32c checksums
1655  //--------------------------------------------------------------------
1656  size_t datalen = part->status.bdy.dlen - NbPgPerRsp( part->info.pgread.offset,
1657  part->status.bdy.dlen ) * CksumSize;
1658 
1659  if( currentOffset + datalen > chunk.length )
1660  {
1661  sizeMismatch = true;
1662  break;
1663  }
1664 
1665  currentOffset += datalen;
1666  }
1667 
1668  ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
1669  size_t datalen = rspst->status.bdy.dlen - NbPgPerRsp( rspst->info.pgread.offset,
1670  rspst->status.bdy.dlen ) * CksumSize;
1671  if( currentOffset + datalen <= chunk.length )
1672  currentOffset += datalen;
1673  else
1674  sizeMismatch = true;
1675 
1676  //----------------------------------------------------------------------
1677  // Overflow
1678  //----------------------------------------------------------------------
1679  if( pChunkStatus.front().sizeError || sizeMismatch )
1680  {
1681  log->Error( XRootDMsg, "[%s] Handling response to %s: user supplied "
1682  "buffer is too small for the received data.",
1683  pUrl.GetHostId().c_str(),
1684  pRequest->GetObfuscatedDescription().c_str() );
1685  return Status( stError, errInvalidResponse );
1686  }
1687 
1688  AnyObject *obj = new AnyObject();
1689  PageInfo *pgInfo = new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1690  std::move( pCrc32cDigests) );
1691 
1692  obj->Set( pgInfo );
1693  response = obj;
1694  return Status();
1695  }
1696 
1697  //------------------------------------------------------------------------
1698  // kXR_pgwrite
1699  //------------------------------------------------------------------------
1700  case kXR_pgwrite:
1701  {
1702  std::vector<std::tuple<uint64_t, uint32_t>> retries;
1703 
1704  ServerResponseV2 *rsp = (ServerResponseV2*)pResponse->GetBuffer();
1705  if( rsp->status.bdy.dlen > 0 )
1706  {
1707  ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) );
1708  size_t pgcnt = ( rsp->status.bdy.dlen - 8 ) / sizeof( kXR_int64 );
1709  retries.reserve( pgcnt );
1710  kXR_int64 *pgoffs = (kXR_int64*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) +
1711  sizeof( ServerResponseBody_pgWrCSE ) );
1712 
1713  for( size_t i = 0; i < pgcnt; ++i )
1714  {
1715  uint32_t len = XrdSys::PageSize;
1716  if( i == 0 ) len = cse->dlFirst;
1717  else if( i == pgcnt - 1 ) len = cse->dlLast;
1718  retries.push_back( std::make_tuple( pgoffs[i], len ) );
1719  }
1720  }
1721 
1722  RetryInfo *info = new RetryInfo( std::move( retries ) );
1723  AnyObject *obj = new AnyObject();
1724  obj->Set( info );
1725  response = obj;
1726 
1727  return Status();
1728  }
1729 
1730 
1731  //------------------------------------------------------------------------
1732  // kXR_readv - we need to pass the length of the buffer to the user code
1733  //------------------------------------------------------------------------
1734  case kXR_readv:
1735  {
1736  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1737  "VectorReadInfo", pUrl.GetHostId().c_str(),
1738  pRequest->GetObfuscatedDescription().c_str() );
1739 
1740  for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1741  {
1742  //--------------------------------------------------------------------
1743  // we are expecting to have only the header in the message, the raw
1744  // data have been readout into the user buffer
1745  //--------------------------------------------------------------------
1746  if( pPartialResps[i]->GetSize() > 8 )
1747  return Status( stOK, errInternal );
1748  }
1749  //----------------------------------------------------------------------
1750  // we are expecting to have only the header in the message, the raw
1751  // data have been readout into the user buffer
1752  //----------------------------------------------------------------------
1753  if( pResponse->GetSize() > 8 )
1754  return Status( stOK, errInternal );
1755  //----------------------------------------------------------------------
1756  // Get the response for the end user
1757  //----------------------------------------------------------------------
1758  return pBodyReader->GetResponse( response );
1759  }
1760 
1761  //------------------------------------------------------------------------
1762  // kXR_fattr
1763  //------------------------------------------------------------------------
1764  case kXR_fattr:
1765  {
1766  int len = rsp->hdr.dlen;
1767  char* data = rsp->body.buffer.data;
1768 
1769  return ParseXAttrResponse( data, len, response );
1770  }
1771 
1772  //------------------------------------------------------------------------
1773  // kXR_query
1774  //------------------------------------------------------------------------
1775  case kXR_query:
1776  case kXR_set:
1777  case kXR_prepare:
1778  default:
1779  {
1780  AnyObject *obj = new AnyObject();
1781  log->Dump( XRootDMsg, "[%s] Parsing the response to %s as BinaryData",
1782  pUrl.GetHostId().c_str(),
1783  pRequest->GetObfuscatedDescription().c_str() );
1784 
1785  BinaryDataInfo *data = new BinaryDataInfo();
1786  data->Allocate( length );
1787  data->Append( buffer, length );
1788  obj->Set( data );
1789  response = obj;
1790  return Status();
1791  }
1792  };
1793  return Status( stError, errInvalidMessage );
1794  }
1795 
1796  //------------------------------------------------------------------------
1797  // Parse the response to kXR_fattr request and put it in an object that
1798  // could be passed to the user
1799  //------------------------------------------------------------------------
1800  Status XRootDMsgHandler::ParseXAttrResponse( char *data, size_t len,
1801  AnyObject *&response )
1802  {
1803  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1804 // Log *log = DefaultEnv::GetLog(); //TODO
1805 
1806  switch( req->fattr.subcode )
1807  {
1808  case kXR_fattrDel:
1809  case kXR_fattrSet:
1810  {
1811  Status status;
1812 
1813  kXR_char nerrs = 0;
1814  if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1815  return status;
1816 
1817  kXR_char nattr = 0;
1818  if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1819  return status;
1820 
1821  std::vector<XAttrStatus> resp;
1822  // read the namevec
1823  for( kXR_char i = 0; i < nattr; ++i )
1824  {
1825  kXR_unt16 rc = 0;
1826  if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1827  return status;
1828  rc = ntohs( rc );
1829 
1830  // count errors
1831  if( rc ) --nerrs;
1832 
1833  std::string name;
1834  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1835  return status;
1836 
1837  XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1838  XRootDStatus();
1839  resp.push_back( XAttrStatus( name, st ) );
1840  }
1841 
1842  // check if we read all the data and if the error count is OK
1843  if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1844 
1845  // set up the response object
1846  response = new AnyObject();
1847  response->Set( new std::vector<XAttrStatus>( std::move( resp ) ) );
1848 
1849  return Status();
1850  }
1851 
1852  case kXR_fattrGet:
1853  {
1854  Status status;
1855 
1856  kXR_char nerrs = 0;
1857  if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1858  return status;
1859 
1860  kXR_char nattr = 0;
1861  if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1862  return status;
1863 
1864  std::vector<XAttr> resp;
1865  resp.reserve( nattr );
1866 
1867  // read the name vec
1868  for( kXR_char i = 0; i < nattr; ++i )
1869  {
1870  kXR_unt16 rc = 0;
1871  if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1872  return status;
1873  rc = ntohs( rc );
1874 
1875  // count errors
1876  if( rc ) --nerrs;
1877 
1878  std::string name;
1879  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1880  return status;
1881 
1882  XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1883  XRootDStatus();
1884  resp.push_back( XAttr( name, st ) );
1885  }
1886 
1887  // read the value vec
1888  for( kXR_char i = 0; i < nattr; ++i )
1889  {
1890  kXR_int32 vlen = 0;
1891  if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1892  return status;
1893  vlen = ntohl( vlen );
1894 
1895  std::string value;
1896  if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1897  return status;
1898 
1899  resp[i].value.swap( value );
1900  }
1901 
1902  // check if we read all the data and if the error count is OK
1903  if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1904 
1905  // set up the response object
1906  response = new AnyObject();
1907  response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1908 
1909  return Status();
1910  }
1911 
1912  case kXR_fattrList:
1913  {
1914  Status status;
1915  std::vector<XAttr> resp;
1916 
1917  while( len > 0 )
1918  {
1919  std::string name;
1920  if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1921  return status;
1922 
1923  kXR_int32 vlen = 0;
1924  if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1925  return status;
1926  vlen = ntohl( vlen );
1927 
1928  std::string value;
1929  if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1930  return status;
1931 
1932  resp.push_back( XAttr( name, value ) );
1933  }
1934 
1935  // set up the response object
1936  response = new AnyObject();
1937  response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1938 
1939  return Status();
1940  }
1941 
1942  default:
1943  return Status( stError, errDataError );
1944  }
1945  }
1946 
1947  //----------------------------------------------------------------------------
1948  // Perform the changes to the original request needed by the redirect
1949  // procedure - allocate new streamid, append redirection data and such
1950  //----------------------------------------------------------------------------
1951  Status XRootDMsgHandler::RewriteRequestRedirect( const URL &newUrl )
1952  {
1953  Log *log = DefaultEnv::GetLog();
1954 
1955  Status st;
1956  // Append any "xrd.*" parameters present in newCgi so that any authentication
1957  // requirements are properly enforced
1958  const URL::ParamsMap &newCgi = newUrl.GetParams();
1959  std::string xrdCgi = "";
1960  std::ostringstream ossXrd;
1961  for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1962  {
1963  if( it->first.compare( 0, 4, "xrd." ) )
1964  continue;
1965  ossXrd << it->first << '=' << it->second << '&';
1966  }
1967 
1968  xrdCgi = ossXrd.str();
1969  // Redirection URL containing also any original xrd.* opaque parameters
1970  XrdCl::URL authUrl;
1971 
1972  if (xrdCgi.empty())
1973  {
1974  authUrl = newUrl;
1975  }
1976  else
1977  {
1978  std::string surl = newUrl.GetURL();
1979  (surl.find('?') == std::string::npos) ? (surl += '?') :
1980  ((*surl.rbegin() != '&') ? (surl += '&') : (surl += ""));
1981  surl += xrdCgi;
1982  if (!authUrl.FromString(surl))
1983  {
1984  std::string surlLog = surl;
1985  if( unlikely( log->GetLevel() >= Log::ErrorMsg ) ) {
1986  surlLog = obfuscateAuth(surlLog);
1987  }
1988  log->Error( XRootDMsg, "[%s] Failed to build redirection URL from data: %s",
1989  newUrl.GetHostId().c_str(), surl.c_str());
1990  return Status(stError, errInvalidRedirectURL);
1991  }
1992  }
1993 
1994  //--------------------------------------------------------------------------
1995  // Rewrite particular requests
1996  //--------------------------------------------------------------------------
1998  MessageUtils::RewriteCGIAndPath( pRequest, newCgi, true, newUrl.GetPath() );
2000  return Status();
2001  }
2002 
2003  //----------------------------------------------------------------------------
2004  // Some requests need to be rewritten also after getting kXR_wait
2005  //----------------------------------------------------------------------------
2006  Status XRootDMsgHandler::RewriteRequestWait()
2007  {
2008  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2009 
2011 
2012  //------------------------------------------------------------------------
2013  // For kXR_locate and kXR_open request the kXR_refresh bit needs to be
2014  // turned off after wait
2015  //------------------------------------------------------------------------
2016  switch( req->header.requestid )
2017  {
2018  case kXR_locate:
2019  {
2020  uint16_t refresh = kXR_refresh;
2021  req->locate.options &= (~refresh);
2022  break;
2023  }
2024 
2025  case kXR_open:
2026  {
2027  uint16_t refresh = kXR_refresh;
2028  req->locate.options &= (~refresh);
2029  break;
2030  }
2031  }
2032 
2033  XRootDTransport::SetDescription( pRequest );
2035  return Status();
2036  }
2037 
2038  //----------------------------------------------------------------------------
2039  // Recover error
2040  //----------------------------------------------------------------------------
2041  void XRootDMsgHandler::HandleError( XRootDStatus status )
2042  {
2043  //--------------------------------------------------------------------------
2044  // If there was no error then do nothing
2045  //--------------------------------------------------------------------------
2046  if( status.IsOK() )
2047  return;
2048 
2049  if( pSidMgr && IsInFly() && (
2050  status.code == errOperationExpired ||
2051  status.code == errOperationInterrupted ) )
2052  {
2053  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2054  pSidMgr->TimeOutSID( req->header.streamid );
2055  }
2056 
2057  bool noreplicas = ( status.code == errErrorResponse &&
2058  status.errNo == kXR_noReplicas );
2059 
2060  if( !noreplicas ) pLastError = status;
2061 
2062  Log *log = DefaultEnv::GetLog();
2063  log->Debug( XRootDMsg, "[%s] Handling error while processing %s: %s.",
2064  pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str(),
2065  status.ToString().c_str() );
2066 
2067  //--------------------------------------------------------------------------
2068  // Check if it is a fatal TLS error that has been marked as potentially
2069  // recoverable, if yes check if we can downgrade from fatal to error.
2070  //--------------------------------------------------------------------------
2071  if( status.IsFatal() && status.code == errTlsError && status.errNo == EAGAIN )
2072  {
2073  if( pSslErrCnt < MaxSslErrRetry )
2074  {
2075  status.status &= ~stFatal; // switch off fatal&error bits
2076  status.status |= stError; // switch on error bit
2077  }
2078  ++pSslErrCnt; // count number of consecutive SSL errors
2079  }
2080  else
2081  pSslErrCnt = 0;
2082 
2083  //--------------------------------------------------------------------------
2084  // We have got an error message, we can recover it at the load balancer if:
2085  // 1) we haven't got it from the load balancer
2086  // 2) we have a load balancer assigned
2087  // 3) the error is either one of: kXR_FSError, kXR_IOError, kXR_ServerError,
2088  // kXR_NotFound
2089  // 4) in the case of kXR_NotFound a kXR_refresh flags needs to be set
2090  //--------------------------------------------------------------------------
2091  if( status.code == errErrorResponse )
2092  {
2093  if( RetriableErrorResponse( status ) )
2094  {
2095  UpdateTriedCGI(status.errNo);
2096  if( status.errNo == kXR_NotFound || status.errNo == kXR_Overloaded )
2097  SwitchOnRefreshFlag();
2098  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2099  return;
2100  }
2101  else
2102  {
2103  pStatus = status;
2104  HandleRspOrQueue();
2105  return;
2106  }
2107  }
2108 
2109  //--------------------------------------------------------------------------
2110  // Nothing can be done if:
2111  // 1) a user timeout has occurred
2112  // 2) has a non-zero session id
2113  // 3) if another error occurred and the validity of the message expired
2114  //--------------------------------------------------------------------------
2115  if( status.code == errOperationExpired || pRequest->GetSessionId() ||
2116  status.code == errOperationInterrupted || time(0) >= pExpiration )
2117  {
2118  log->Error( XRootDMsg, "[%s] Unable to get the response to request %s",
2119  pUrl.GetHostId().c_str(),
2120  pRequest->GetObfuscatedDescription().c_str() );
2121  pStatus = status;
2122  HandleRspOrQueue();
2123  return;
2124  }
2125 
2126  //--------------------------------------------------------------------------
2127  // At this point we're left with connection errors, we recover them
2128  // at a load balancer if we have one and if not on the current server
2129  // until we get a response, an unrecoverable error or a timeout
2130  //--------------------------------------------------------------------------
2131  if( pLoadBalancer.url.IsValid() &&
2132  pLoadBalancer.url.GetLocation() != pUrl.GetLocation() )
2133  {
2134  UpdateTriedCGI( kXR_ServerError );
2135  HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2136  return;
2137  }
2138  else
2139  {
2140  if( !status.IsFatal() && IsRetriable() )
2141  {
2142  log->Info( XRootDMsg, "[%s] Retrying request: %s.",
2143  pUrl.GetHostId().c_str(),
2144  pRequest->GetObfuscatedDescription().c_str() );
2145 
2146  UpdateTriedCGI( kXR_ServerError );
2147  HandleError( RetryAtServer( pUrl, RedirectEntry::EntryRetry ) );
2148  return;
2149  }
2150  pStatus = status;
2151  HandleRspOrQueue();
2152  return;
2153  }
2154  }
2155 
2156  //----------------------------------------------------------------------------
2157  // Retry the message at another server
2158  //----------------------------------------------------------------------------
2159  Status XRootDMsgHandler::RetryAtServer( const URL &url, RedirectEntry::Type entryType )
2160  {
2161  if( &pRetryAtUrl != &url ) pRetryAtUrl = url;
2162  pRetryAtEntryType = entryType;
2163  const int sst = pSendingState.fetch_or( kRetryAtSrv );
2164 
2165  //--------------------------------------------------------------------------
2166  // wait for any delayed send notification now. The handler may be requeued
2167  // during this function.
2168  //--------------------------------------------------------------------------
2169  if( ( sst & kSawReadySend ) && !( sst & kSendDone ) ) return Status();
2170  pSendingState &= ~kRetryAtSrv;
2171 
2172  pResponse.reset();
2173  Log *log = DefaultEnv::GetLog();
2174 
2175  //--------------------------------------------------------------------------
2176  // Set up a redirect entry
2177  //--------------------------------------------------------------------------
2178  if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2179  pRdirEntry.reset( new RedirectEntry( pUrl.GetLocation(), url.GetLocation(), entryType ) );
2180 
2181  if( pUrl.GetLocation() != url.GetLocation() )
2182  {
2183  pHosts->push_back( url );
2184 
2185  //------------------------------------------------------------------------
2186  // Assign a new stream id to the message
2187  //------------------------------------------------------------------------
2188 
2189  // first release the old stream id
2190  // (though it could be a redirect from a local
2191  // metalink file, in this case there's no SID)
2192  ClientRequestHdr *req = (ClientRequestHdr*)pRequest->GetBuffer();
2193  if( pSidMgr )
2194  {
2195  pSidMgr->ReleaseSID( req->streamid );
2196  pSidMgr.reset();
2197  }
2198 
2199  // then get the new SIDManager
2200  // (again this could be a redirect to a local
2201  // file and in this case there is no SID)
2202  if( !url.IsLocalFile() )
2203  {
2204  pSidMgr = SIDMgrPool::Instance().GetSIDMgr( url );
2205  Status st = pSidMgr->AllocateSID( req->streamid );
2206  if( !st.IsOK() )
2207  {
2208  log->Error( XRootDMsg, "[%s] Impossible to send message %s.",
2209  pUrl.GetHostId().c_str(),
2210  pRequest->GetObfuscatedDescription().c_str() );
2211  return st;
2212  }
2213  }
2214 
2215  pUrl = url;
2216  }
2217 
2218  if( pUrl.IsMetalink() && pFollowMetalink )
2219  {
2220  log->Debug( ExDbgMsg, "[%s] Metaling redirection for MsgHandler: %p (message: %s ).",
2221  pUrl.GetHostId().c_str(), (void*)this,
2222  pRequest->GetObfuscatedDescription().c_str() );
2223 
2224  return pPostMaster->Redirect( pUrl, pRequest, this );
2225  }
2226  else if( pUrl.IsLocalFile() )
2227  {
2228  HandleLocalRedirect( &pUrl );
2229  return Status();
2230  }
2231  else
2232  {
2233  log->Debug( ExDbgMsg, "[%s] Retry at server MsgHandler: %p (message: %s ).",
2234  pUrl.GetHostId().c_str(), (void*)this,
2235  pRequest->GetObfuscatedDescription().c_str() );
2236  return pPostMaster->Send( pUrl, pRequest, this, true, pExpiration );
2237  }
2238  }
2239 
2240  //----------------------------------------------------------------------------
2241  // Update the "tried=" part of the CGI of the current message
2242  //----------------------------------------------------------------------------
2243  void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2244  {
2245  URL::ParamsMap cgi;
2246  std::string tried;
2247 
2248  //--------------------------------------------------------------------------
2249  // In case a data server responded with a kXR_redirect and we fail at the
2250  // node where we were redirected to, the original data server should be
2251  // included in the tried CGI opaque info (instead of the current one).
2252  //--------------------------------------------------------------------------
2253  if( pEffectiveDataServerUrl )
2254  {
2255  tried = pEffectiveDataServerUrl->GetHostName();
2256  delete pEffectiveDataServerUrl;
2257  pEffectiveDataServerUrl = 0;
2258  }
2259  //--------------------------------------------------------------------------
2260  // Otherwise use the current URL.
2261  //--------------------------------------------------------------------------
2262  else
2263  tried = pUrl.GetHostName();
2264 
2265  // Report the reason for the failure to the next location
2266  //
2267  if (errNo)
2268  { if (errNo == kXR_NotFound) cgi["triedrc"] = "enoent";
2269  else if (errNo == kXR_IOError) cgi["triedrc"] = "ioerr";
2270  else if (errNo == kXR_FSError) cgi["triedrc"] = "fserr";
2271  else if (errNo == kXR_ServerError) cgi["triedrc"] = "srverr";
2272  }
2273 
2274  //--------------------------------------------------------------------------
2275  // If our current load balancer is a metamanager and we failed either
2276  // at a diskserver or at an unidentified node we also exclude the last
2277  // known manager
2278  //--------------------------------------------------------------------------
2279  if( pLoadBalancer.url.IsValid() && (pLoadBalancer.flags & kXR_attrMeta) )
2280  {
2281  HostList::reverse_iterator it;
2282  for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2283  {
2284  if( it->loadBalancer )
2285  break;
2286 
2287  tried += "," + it->url.GetHostName();
2288 
2289  if( it->flags & kXR_isManager )
2290  break;
2291  }
2292  }
2293 
2294  cgi["tried"] = tried;
2296  MessageUtils::RewriteCGIAndPath( pRequest, cgi, false, "" );
2298  }
2299 
2300  //----------------------------------------------------------------------------
2301  // Switch on the refresh flag for some requests
2302  //----------------------------------------------------------------------------
2303  void XRootDMsgHandler::SwitchOnRefreshFlag()
2304  {
2306  ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2307  switch( req->header.requestid )
2308  {
2309  case kXR_locate:
2310  {
2311  req->locate.options |= kXR_refresh;
2312  break;
2313  }
2314 
2315  case kXR_open:
2316  {
2317  req->locate.options |= kXR_refresh;
2318  break;
2319  }
2320  }
2321  XRootDTransport::SetDescription( pRequest );
2323  }
2324 
2325  //------------------------------------------------------------------------
2326  // If the current thread is a worker thread from our thread-pool
2327  // handle the response, otherwise submit a new task to the thread-pool
2328  //------------------------------------------------------------------------
2329  void XRootDMsgHandler::HandleRspOrQueue()
2330  {
2331  //--------------------------------------------------------------------------
2332  // Is it a final response?
2333  //--------------------------------------------------------------------------
2334  bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
2335  if( finalrsp )
2336  {
2337  // Do not do final processing of the response if we haven't had
2338  // confirmation the original request was sent (via OnStatusReady).
2339  // The final processing will be triggered when we get the confirm.
2340  const int sst = pSendingState.fetch_or( kFinalResp );
2341  if( ( sst & kSawReadySend ) && !( sst & kSendDone ) )
2342  return;
2343  }
2344 
2345  JobManager *jobMgr = pPostMaster->GetJobManager();
2346  if( jobMgr->IsWorker() )
2347  HandleResponse();
2348  else
2349  {
2350  Log *log = DefaultEnv::GetLog();
2351  log->Debug( ExDbgMsg, "[%s] Passing to the thread-pool MsgHandler: %p (message: %s ).",
2352  pUrl.GetHostId().c_str(), (void*)this,
2353  pRequest->GetObfuscatedDescription().c_str() );
2354  jobMgr->QueueJob( new HandleRspJob( this ), 0 );
2355  }
2356  }
2357 
2358  //------------------------------------------------------------------------
2359  // Notify the FileStateHandler to retry Open() with new URL
2360  //------------------------------------------------------------------------
2361  void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2362  {
2363  Log *log = DefaultEnv::GetLog();
2364  log->Debug( ExDbgMsg, "[%s] Handling local redirect - MsgHandler: %p (message: %s ).",
2365  pUrl.GetHostId().c_str(), (void*)this,
2366  pRequest->GetObfuscatedDescription().c_str() );
2367 
2368  if( !pLFileHandler )
2369  {
2370  HandleError( XRootDStatus( stFatal, errNotSupported ) );
2371  return;
2372  }
2373 
2374  AnyObject *resp = 0;
2375  pLFileHandler->SetHostList( *pHosts );
2376  XRootDStatus st = pLFileHandler->Open( url, pRequest, resp );
2377  if( !st.IsOK() )
2378  {
2379  HandleError( st );
2380  return;
2381  }
2382 
2383  pResponseHandler->HandleResponseWithHosts( new XRootDStatus(),
2384  resp,
2385  pHosts.release() );
2386  delete this;
2387 
2388  return;
2389  }
2390 
2391  //------------------------------------------------------------------------
2392  // Check if it is OK to retry this request
2393  //------------------------------------------------------------------------
2394  bool XRootDMsgHandler::IsRetriable()
2395  {
2396  std::string value;
2397  DefaultEnv::GetEnv()->GetString( "OpenRecovery", value );
2398  if( value == "true" ) return true;
2399 
2400  // check if it is a mutable open (open + truncate or open + create)
2401  ClientRequest *req = reinterpret_cast<ClientRequest*>( pRequest->GetBuffer() );
2402  if( req->header.requestid == htons( kXR_open ) )
2403  {
2404  bool _mutable = ( req->open.options & htons( kXR_delete ) ) ||
2405  ( req->open.options & htons( kXR_new ) );
2406 
2407  if( _mutable )
2408  {
2409  Log *log = DefaultEnv::GetLog();
2410  log->Debug( XRootDMsg,
2411  "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2412  pUrl.GetHostId().c_str(),
2413  pRequest->GetObfuscatedDescription().c_str() );
2414  // disallow retry if it is a mutable open
2415  return false;
2416  }
2417  }
2418 
2419  return true;
2420  }
2421 
2422  //------------------------------------------------------------------------
2423  // Check if for given request and Metalink redirector it is OK to omit
2424  // the kXR_wait and proceed straight to the next entry in the Metalink file
2425  //------------------------------------------------------------------------
2426  bool XRootDMsgHandler::OmitWait( Message &request, const URL &url )
2427  {
2428  // we can omit kXR_wait only if we have a Metalink redirector
2429  if( !url.IsMetalink() )
2430  return false;
2431 
2432  // we can omit kXR_wait only for requests that can be redirected
2433  // (kXR_read is the only stateful request that can be redirected)
2434  ClientRequest *req = reinterpret_cast<ClientRequest*>( request.GetBuffer() );
2435  if( pStateful && req->header.requestid != kXR_read )
2436  return false;
2437 
2438  // we can only omit kXR_wait if the Metalink redirect has more
2439  // replicas
2440  RedirectorRegistry &registry = RedirectorRegistry::Instance();
2441  VirtualRedirector *redirector = registry.Get( url );
2442 
2443  // we need more than one server as the current one is not reflected
2444  // in tried CGI
2445  if( redirector->Count( request ) > 1 )
2446  return true;
2447 
2448  return false;
2449  }
2450 
2451  //------------------------------------------------------------------------
2452  // Checks if the given error returned by server is retriable.
2453  //------------------------------------------------------------------------
2454  bool XRootDMsgHandler::RetriableErrorResponse( const Status &status )
2455  {
2456  // we can only retry error response if we have a valid load-balancer and
2457  // it is not our current URL
2458  if( !( pLoadBalancer.url.IsValid() &&
2459  pUrl.GetLocation() != pLoadBalancer.url.GetLocation() ) )
2460  return false;
2461 
2462  // following errors are retriable at any load-balancer
2463  if( status.errNo == kXR_FSError || status.errNo == kXR_IOError ||
2464  status.errNo == kXR_ServerError || status.errNo == kXR_NotFound ||
2465  status.errNo == kXR_Overloaded || status.errNo == kXR_NoMemory )
2466  return true;
2467 
2468  // check if the load-balancer is a meta-manager, if yes there are
2469  // more errors that can be recovered
2470  if( !( pLoadBalancer.flags & kXR_attrMeta ) ) return false;
2471 
2472  // those errors are retriable for meta-managers
2473  if( status.errNo == kXR_Unsupported || status.errNo == kXR_FileLocked )
2474  return true;
2475 
2476  // in case of not-authorized error there is an imposed upper limit
2477  // on how many times we can retry this error
2478  if( status.errNo == kXR_NotAuthorized )
2479  {
2480  int limit = DefaultNotAuthorizedRetryLimit;
2481  DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", limit );
2482  bool ret = pNotAuthorizedCounter < limit;
2483  ++pNotAuthorizedCounter;
2484  if( !ret )
2485  {
2486  Log *log = DefaultEnv::GetLog();
2487  log->Error( XRootDMsg,
2488  "[%s] Reached limit of NotAuthorized retries!",
2489  pUrl.GetHostId().c_str() );
2490  }
2491  return ret;
2492  }
2493 
2494  // check if the load-balancer is a virtual (metalink) redirector,
2495  // if yes there are even more errors that can be recovered
2496  if( !( pLoadBalancer.flags & kXR_attrVirtRdr ) ) return false;
2497 
2498  // those errors are retriable for virtual (metalink) redirectors
2499  if( status.errNo == kXR_noserver || status.errNo == kXR_ArgTooLong )
2500  return true;
2501 
2502  // otherwise it is a non-retriable error
2503  return false;
2504  }
2505 
2506  //------------------------------------------------------------------------
2507  // Dump the redirect-trace-back into the log file
2508  //------------------------------------------------------------------------
2509  void XRootDMsgHandler::DumpRedirectTraceBack()
2510  {
2511  if( pRedirectTraceBack.empty() ) return;
2512 
2513  std::stringstream sstrm;
2514 
2515  sstrm << "Redirect trace-back:\n";
2516 
2517  int counter = 0;
2518 
2519  auto itr = pRedirectTraceBack.begin();
2520  sstrm << '\t' << counter << ". " << (*itr)->ToString() << '\n';
2521 
2522  auto prev = itr;
2523  ++itr;
2524  ++counter;
2525 
2526  for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2527  sstrm << '\t' << counter << ". "
2528  << (*itr)->ToString( (*prev)->status.IsOK() ) << '\n';
2529 
2530  int authlimit = DefaultNotAuthorizedRetryLimit;
2531  DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", authlimit );
2532 
2533  bool warn = !pStatus.IsOK() &&
2534  ( pStatus.code == errNotFound ||
2535  pStatus.code == errRedirectLimit ||
2536  ( pStatus.code == errAuthFailed && pNotAuthorizedCounter >= authlimit ) );
2537 
2538  Log *log = DefaultEnv::GetLog();
2539  if( warn )
2540  log->Warning( XRootDMsg, "%s", sstrm.str().c_str() );
2541  else
2542  log->Debug( XRootDMsg, "%s", sstrm.str().c_str() );
2543  }
2544 
2545  // Read data from buffer
2546  //------------------------------------------------------------------------
2547  template<typename T>
2548  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, T& result )
2549  {
2550  if( sizeof( T ) > buflen ) return Status( stError, errDataError );
2551 
2552  memcpy(&result, buffer, sizeof(T));
2553 
2554  buffer += sizeof( T );
2555  buflen -= sizeof( T );
2556 
2557  return Status();
2558  }
2559 
2560  //------------------------------------------------------------------------
2561  // Read a string from buffer
2562  //------------------------------------------------------------------------
2563  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result )
2564  {
2565  Status status;
2566  char c = 0;
2567 
2568  while( true )
2569  {
2570  if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2571  return status;
2572 
2573  if( c == 0 ) break;
2574  result += c;
2575  }
2576 
2577  return status;
2578  }
2579 
2580  //------------------------------------------------------------------------
2581  // Read a string from buffer
2582  //------------------------------------------------------------------------
2583  Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen,
2584  size_t size, std::string &result )
2585  {
2586  Status status;
2587 
2588  if( size > buflen ) return Status( stError, errDataError );
2589 
2590  result.append( buffer, size );
2591  buffer += size;
2592  buflen -= size;
2593 
2594  return status;
2595  }
2596 
2597 }
@ kXR_NotAuthorized
Definition: XProtocol.hh:1042
@ kXR_NotFound
Definition: XProtocol.hh:1043
@ kXR_FileLocked
Definition: XProtocol.hh:1035
@ kXR_noReplicas
Definition: XProtocol.hh:1061
@ kXR_Unsupported
Definition: XProtocol.hh:1045
@ kXR_ServerError
Definition: XProtocol.hh:1044
@ kXR_Overloaded
Definition: XProtocol.hh:1056
@ kXR_ArgTooLong
Definition: XProtocol.hh:1034
@ kXR_noserver
Definition: XProtocol.hh:1046
@ kXR_IOError
Definition: XProtocol.hh:1039
@ kXR_FSError
Definition: XProtocol.hh:1037
@ kXR_NoMemory
Definition: XProtocol.hh:1040
#define kXR_isManager
Definition: XProtocol.hh:1198
union ServerResponse::@0 body
@ kXR_fattrDel
Definition: XProtocol.hh:300
@ kXR_fattrSet
Definition: XProtocol.hh:303
@ kXR_fattrList
Definition: XProtocol.hh:302
@ kXR_fattrGet
Definition: XProtocol.hh:301
struct ClientFattrRequest fattr
Definition: XProtocol.hh:896
#define kXR_collapseRedir
Definition: XProtocol.hh:1209
ServerResponseStatus status
Definition: XProtocol.hh:1352
#define kXR_attrMeta
Definition: XProtocol.hh:1201
kXR_char streamid[2]
Definition: XProtocol.hh:158
kXR_char streamid[2]
Definition: XProtocol.hh:956
kXR_unt16 options
Definition: XProtocol.hh:513
struct ClientDirlistRequest dirlist
Definition: XProtocol.hh:894
static const int kXR_ckpXeq
Definition: XProtocol.hh:218
@ kXR_delete
Definition: XProtocol.hh:483
@ kXR_refresh
Definition: XProtocol.hh:489
@ kXR_new
Definition: XProtocol.hh:485
@ kXR_retstat
Definition: XProtocol.hh:493
struct ClientOpenRequest open
Definition: XProtocol.hh:902
@ kXR_waitresp
Definition: XProtocol.hh:948
@ kXR_redirect
Definition: XProtocol.hh:946
@ kXR_oksofar
Definition: XProtocol.hh:942
@ kXR_status
Definition: XProtocol.hh:949
@ kXR_ok
Definition: XProtocol.hh:941
@ kXR_attn
Definition: XProtocol.hh:943
@ kXR_wait
Definition: XProtocol.hh:947
@ kXR_error
Definition: XProtocol.hh:945
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1304
struct ClientRequestHdr header
Definition: XProtocol.hh:887
#define kXR_recoverWrts
Definition: XProtocol.hh:1208
kXR_unt16 requestid
Definition: XProtocol.hh:159
@ kXR_read
Definition: XProtocol.hh:126
@ kXR_open
Definition: XProtocol.hh:123
@ kXR_writev
Definition: XProtocol.hh:144
@ kXR_readv
Definition: XProtocol.hh:138
@ kXR_mkdir
Definition: XProtocol.hh:121
@ kXR_sync
Definition: XProtocol.hh:129
@ kXR_chmod
Definition: XProtocol.hh:115
@ kXR_dirlist
Definition: XProtocol.hh:117
@ kXR_fattr
Definition: XProtocol.hh:133
@ kXR_rm
Definition: XProtocol.hh:127
@ kXR_query
Definition: XProtocol.hh:114
@ kXR_write
Definition: XProtocol.hh:132
@ kXR_set
Definition: XProtocol.hh:131
@ kXR_rmdir
Definition: XProtocol.hh:128
@ kXR_truncate
Definition: XProtocol.hh:141
@ kXR_protocol
Definition: XProtocol.hh:119
@ kXR_mv
Definition: XProtocol.hh:122
@ kXR_ping
Definition: XProtocol.hh:124
@ kXR_stat
Definition: XProtocol.hh:130
@ kXR_pgread
Definition: XProtocol.hh:143
@ kXR_chkpoint
Definition: XProtocol.hh:125
@ kXR_locate
Definition: XProtocol.hh:140
@ kXR_close
Definition: XProtocol.hh:116
@ kXR_pgwrite
Definition: XProtocol.hh:139
@ kXR_prepare
Definition: XProtocol.hh:134
#define kXR_isServer
Definition: XProtocol.hh:1199
#define kXR_attrVirtRdr
Definition: XProtocol.hh:1204
struct ClientChkPointRequest chkpoint
Definition: XProtocol.hh:890
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1303
union ServerResponseV2::@1 info
#define kXR_PROTOCOLVERSION
Definition: XProtocol.hh:70
@ kXR_vfs
Definition: XProtocol.hh:799
struct ClientStatRequest stat
Definition: XProtocol.hh:915
kXR_char options
Definition: XProtocol.hh:809
#define kXR_ecRedir
Definition: XProtocol.hh:1210
struct ClientLocateRequest locate
Definition: XProtocol.hh:898
ServerResponseHeader hdr
Definition: XProtocol.hh:1330
long long kXR_int64
Definition: XPtypes.hh:98
int kXR_int32
Definition: XPtypes.hh:89
unsigned short kXR_unt16
Definition: XPtypes.hh:67
unsigned char kXR_char
Definition: XPtypes.hh:65
#define unlikely(x)
std::string obfuscateAuth(const std::string &input)
void Get(Type &object)
Retrieve the object being held.
Object for reading out data from the PgRead response.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
Definition: XrdClBuffer.hh:156
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
Definition: XrdClBuffer.hh:72
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
uint32_t GetCursor() const
Get append cursor.
Definition: XrdClBuffer.hh:140
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
Definition: XrdClBuffer.hh:189
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
virtual void Run(void *arg)
The job logic.
HandleRspJob(XrdCl::XRootDMsgHandler *handler)
Interface for a job to be run by the job manager.
void SetHostList(const HostList &hostList)
XRootDStatus Open(const std::string &url, uint16_t flags, uint16_t mode, ResponseHandler *handler, time_t timeout=0)
Handle diagnostics.
Definition: XrdClLog.hh:101
@ ErrorMsg
report errors
Definition: XrdClLog.hh:109
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
JobManager * GetJobManager()
Get the job manager object user by the post master.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
A network socket.
Definition: XrdClSocket.hh:43
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
Definition: XrdClSocket.cc:461
bool IsEncrypted()
Definition: XrdClSocket.cc:867
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition: XrdClURL.cc:465
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
Definition: XrdClURL.cc:62
void SetPassword(const std::string &password)
Set the password.
Definition: XrdClURL.hh:161
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:402
std::string GetURL() const
Get the URL.
Definition: XrdClURL.hh:86
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition: XrdClURL.cc:344
const std::string & GetUserName() const
Get the username.
Definition: XrdClURL.hh:135
const std::string & GetPassword() const
Get the password.
Definition: XrdClURL.hh:153
bool IsLocalFile() const
Definition: XrdClURL.cc:474
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
void SetProtocol(const std::string &protocol)
Set protocol.
Definition: XrdClURL.hh:126
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
void SetUserName(const std::string &userName)
Set the username.
Definition: XrdClURL.hh:143
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition: XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Definition: XrdClUtils.cc:703
Handle/Process/Forward XRootD messages.
const Message * GetRequest() const
Get the request pointer.
virtual uint16_t InspectStatusRsp() override
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
virtual void Process() override
Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
virtual bool IsRaw() const override
Are we a raw writer or not?
const std::string & GetErrorMessage() const
Get error message.
static void SetDescription(Message *msg)
Get the description of a message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errRedirectLimit
Definition: XrdClStatus.hh:102
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const uint16_t errTlsError
Definition: XrdClStatus.hh:80
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errNotFound
Definition: XrdClStatus.hh:100
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const uint64_t ExDbgMsg
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
const uint16_t errInvalidRedirectURL
Definition: XrdClStatus.hh:98
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
Definition: XrdClStatus.hh:106
const uint16_t errAuthFailed
Definition: XrdClStatus.hh:88
const uint16_t errInvalidMessage
Definition: XrdClStatus.hh:85
none object for initializing empty Optional
XrdSysError Log
Definition: XrdConfig.cc:113
@ kXR_PartialResult
Definition: XProtocol.hh:1293
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
URL url
URL of the host.
uint32_t flags
Host type.
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version