XRootD
XrdClStream.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdCl/XrdClStream.hh"
26 #include "XrdCl/XrdClSocket.hh"
27 #include "XrdCl/XrdClChannel.hh"
28 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClLog.hh"
30 #include "XrdCl/XrdClMessage.hh"
31 #include "XrdCl/XrdClDefaultEnv.hh"
32 #include "XrdCl/XrdClUtils.hh"
33 #include "XrdCl/XrdClOutQueue.hh"
34 #include "XrdCl/XrdClMonitor.hh"
39 
40 #include <sys/types.h>
41 #include <algorithm>
42 #include <sys/socket.h>
43 #include <sys/time.h>
44 
45 namespace XrdCl
46 {
47  //----------------------------------------------------------------------------
48  // Statics
49  //----------------------------------------------------------------------------
50  RAtomic_uint64_t Stream::sSessCntGen{0};
51 
52  //----------------------------------------------------------------------------
53  // Incoming message helper
54  //----------------------------------------------------------------------------
56  {
57  InMessageHelper( Message *message = 0,
58  MsgHandler *hndlr = 0,
59  time_t expir = 0,
60  uint16_t actio = 0 ):
61  msg( message ), handler( hndlr ), expires( expir ), action( actio ) {}
62  void Reset()
63  {
64  msg = 0; handler = 0; expires = 0; action = 0;
65  }
68  time_t expires;
69  uint16_t action;
70  };
71 
72  //----------------------------------------------------------------------------
73  // Sub stream helper
74  //----------------------------------------------------------------------------
76  {
77  SubStreamData(): socket( 0 ), status( Socket::Disconnected )
78  {
79  outQueue = new OutQueue();
80  }
82  {
83  delete socket;
84  delete outQueue;
85  }
91  };
92 
93 
94  //----------------------------------------------------------------------------
95  // Notify the mutex a close (which may block waiting for the Poller) is
96  // about to happen for one of the subStreams.
97  //----------------------------------------------------------------------------
98  void StreamMutex::AddClosing( uint16_t subStream )
99  {
100  XrdSysCondVarHelper lck( mcv );
101  mclosing[subStream]++;
102  mcv.Broadcast();
103  }
104 
105  //----------------------------------------------------------------------------
106  // Notify the mutex a close has completed.
107  //----------------------------------------------------------------------------
108  void StreamMutex::RemoveClosing( uint16_t subStream )
109  {
110  XrdSysCondVarHelper lck( mcv );
111  mclosing[subStream]--;
112  if( mclosing[subStream]==0 ) mclosing.erase( subStream );
113  mcv.Broadcast();
114  }
115 
116  //----------------------------------------------------------------------------
117  // Lock
118  //----------------------------------------------------------------------------
120  {
121  XrdSysCondVarHelper lck( mcv );
122  if( mlist.empty() )
123  {
124  mlist.emplace_front();
125  ++mlist.front().cnt;
126  mthmap[XrdSysThread::ID()] = mlist.begin();
127  return;
128  }
129  while( 1 )
130  {
131  auto mit = mthmap.find( XrdSysThread::ID() );
132  if( mit == mthmap.end() )
133  {
134  mlist.emplace_back();
135  bool ins;
136  std::tie( mit, ins ) = mthmap.insert(
137  std::make_pair( XrdSysThread::ID(), std::prev( mlist.end() ) ) );
138  }
139  if( mit->second == mlist.begin() )
140  {
141  ++mlist.front().cnt;
142  return;
143  }
144  mcv.Wait();
145  }
146  }
147 
148  //----------------------------------------------------------------------------
149  // Lock, notifying the mutex we're acquiring the mute from with a callback
150  // from the given subStream. We may fail to acquiring by setting isclosing.
151  //----------------------------------------------------------------------------
152  void StreamMutex::Lock( uint16_t subStream, bool &isclosing )
153  {
154  isclosing = false;
155  XrdSysCondVarHelper lck( mcv );
156  if( mlist.empty() ) {
157  mlist.emplace_front();
158  ++mlist.front().cnt;
159  mthmap[XrdSysThread::ID()] = mlist.begin();
160  return;
161  }
162  while( 1 )
163  {
164  auto mit = mthmap.find( XrdSysThread::ID() );
165  if( mit == mthmap.end() )
166  {
167  mlist.emplace_back();
168  bool ins;
169  std::tie( mit, ins ) = mthmap.insert(
170  std::make_pair( XrdSysThread::ID(), std::prev( mlist.end() ) ) );
171  }
172  if( mit->second == mlist.begin() )
173  {
174  ++mlist.front().cnt;
175  return;
176  }
177  if( hasfn || mclosing.count( subStream ) )
178  {
179  isclosing = true;
180  mlist.erase( mit->second );
181  mthmap.erase( mit );
182  return;
183  }
184  mcv.Wait();
185  }
186  }
187 
188  //----------------------------------------------------------------------------
189  // Lock, notifying the mutex that any thread waiting for a lock from within a
190  // Poller callback should abort. We either acquire the mutex immediately or
191  // indicate that we could not, by setting isclosing. In that case the supplied
192  // func will be exectued by the last thread to release the lock.
193  // Only one func may be registered at a time. Others will not be called.
194  //----------------------------------------------------------------------------
195  void StreamMutex::Lock( const std::function<void()> &func, bool &isclosing )
196  {
197  isclosing = false;
198  XrdSysCondVarHelper lck( mcv );
199  if( mlist.empty() )
200  {
201  mlist.emplace_front( func );
202  ++mlist.front().cnt;
203  auto lit = mlist.begin();
204  mthmap[XrdSysThread::ID()] = lit;
205  fnlistit = lit;
206  hasfn = true;
207  return;
208  }
209  while( 1 )
210  {
211  auto mit = mthmap.find( XrdSysThread::ID() );
212  if( mit == mthmap.end() )
213  {
214  if( hasfn )
215  {
216  isclosing = true;
217  return;
218  }
219  mlist.emplace_back( func );
220  hasfn = true;
221  fnlistit = std::prev( mlist.end() );
222  mcv.Broadcast();
223  isclosing = true;
224  return;
225  }
226  if( mit->second == mlist.begin() )
227  {
228  ++mlist.front().cnt;
229  return;
230  }
231  mcv.Wait();
232  }
233  }
234 
235  //----------------------------------------------------------------------------
236  // UnLock
237  //----------------------------------------------------------------------------
239  {
240  // keep any fn callback until return in case it holds a ref count
241  std::function<void()> keepfn;
242 
243  XrdSysCondVarHelper lck( mcv );
244  auto mit = mthmap.find( XrdSysThread::ID() );
245  if( mit == mthmap.end() ) return;
246 
247  // we must have held the lock
248  assert( mit->second == mlist.begin() );
249 
250  const size_t cnt = --mlist.front().cnt;
251  if( cnt ) return;
252 
253  if( hasfn && fnlistit == mit->second )
254  {
255  hasfn = false;
256  std::swap( keepfn, mlist.front().fn );
257  }
258 
259  mlist.erase( mit->second );
260  mthmap.erase( mit );
261 
262  // next up should have zero count
263  assert( mlist.empty() || mlist.front().cnt == 0 );
264 
265  if( hasfn && fnlistit == mlist.begin() )
266  {
267  auto &lfn = mlist.front().fn;
268  ++mlist.front().cnt;
269  mthmap[XrdSysThread::ID()] = mlist.begin();
270  lck.UnLock();
271  lfn();
272  UnLock();
273  return;
274  }
275  mcv.Broadcast();
276  }
277 
278  //------------------------------------------------------------------------
279  // Job to handle disposing of socket outside of a poller callback
280  //------------------------------------------------------------------------
281  class SocketDestroyJob: public Job
282  {
283  public:
285  pSock( socket ) { }
286  virtual ~SocketDestroyJob() {}
287  virtual void Run( void* )
288  {
289  delete pSock;
290  delete this;
291  }
292  private:
293  AsyncSocketHandler *pSock;
294  };
295 
296  //----------------------------------------------------------------------------
297  // Constructor
298  //----------------------------------------------------------------------------
299  Stream::Stream( const URL *url, const URL &prefer ):
300  pUrl( url ),
301  pPrefer( prefer ),
302  pTransport( 0 ),
303  pPoller( 0 ),
304  pTaskManager( 0 ),
305  pJobManager( 0 ),
306  pIncomingQueue( 0 ),
307  pChannelData( 0 ),
308  pLastStreamError( 0 ),
309  pConnectionCount( 0 ),
310  pConnectionInitTime( 0 ),
311  pAddressType( Utils::IPAll ),
312  pSessionId( 0 ),
313  pBytesSent( 0 ),
314  pBytesReceived( 0 )
315  {
316  pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
317  pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
318 
319  std::ostringstream o;
320  o << pUrl->GetHostId();
321  pStreamName = o.str();
322 
323  pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
325  pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
327  pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
329 
330  std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
332 
333  pAddressType = Utils::String2AddressType( netStack );
334  if( pAddressType == Utils::AddressType::IPAuto )
335  {
336  XrdNetUtils::NetProt stacks = XrdNetUtils::NetConfig( XrdNetUtils::NetType::qryINIF );
337  if( !( stacks & XrdNetUtils::hasIP64 ) )
338  {
339  if( stacks & XrdNetUtils::hasIPv4 )
340  pAddressType = Utils::AddressType::IPv4;
341  else if( stacks & XrdNetUtils::hasIPv6 )
342  pAddressType = Utils::AddressType::IPv6;
343  }
344  }
345 
346  Log *log = DefaultEnv::GetLog();
347  log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
348  "Connection Window: %d, ConnectionRetry: %d, Stream Error "
349  "Window: %d", pStreamName.c_str(), netStack.c_str(),
350  pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
351  }
352 
353  //----------------------------------------------------------------------------
354  // Destructor
355  //----------------------------------------------------------------------------
357  {
358  // Used to disconnect substreams here, but since we are refernce counted
359  // and connected substream hold a count, if we're here they're closed.
360 
361  Log *log = DefaultEnv::GetLog();
362  log->Debug( PostMasterMsg, "[%s] Destroying stream",
363  pStreamName.c_str() );
364 
365  MonitorDisconnection( XRootDStatus() );
366 
367  SubStreamList::iterator it;
368  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
369  delete *it;
370  }
371 
372  //----------------------------------------------------------------------------
373  // Initializer
374  //----------------------------------------------------------------------------
376  {
377  if( !pTransport || !pPoller || !pChannelData )
379 
380  AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
381  pChannelData, 0, this );
382  pSubStreams.push_back( new SubStreamData() );
383  pSubStreams[0]->socket = s;
384  return XRootDStatus();
385  }
386 
387  //------------------------------------------------------------------------
388  // Make sure that the underlying socket handler gets write readiness
389  // events
390  //------------------------------------------------------------------------
392  {
393  StreamMutexHelper scopedLock( pMutex );
394 
395  //--------------------------------------------------------------------------
396  // We are in the process of connecting the main stream, so we do nothing
397  // because when the main stream connection is established it will connect
398  // all the other streams
399  //--------------------------------------------------------------------------
400  if( pSubStreams[0]->status == Socket::Connecting )
401  return XRootDStatus();
402 
403  //--------------------------------------------------------------------------
404  // The main stream is connected, so we can verify whether we have
405  // the up and the down stream connected and ready to handle data.
406  // If anything is not right we fall back to stream 0.
407  //--------------------------------------------------------------------------
408  if( pSubStreams[0]->status == Socket::Connected )
409  {
410  if( pSubStreams[path.down]->status != Socket::Connected )
411  path.down = 0;
412 
413  if( pSubStreams[path.up]->status == Socket::Disconnected )
414  {
415  path.up = 0;
416  return pSubStreams[0]->socket->EnableUplink();
417  }
418 
419  if( pSubStreams[path.up]->status == Socket::Connected )
420  return pSubStreams[path.up]->socket->EnableUplink();
421 
422  return XRootDStatus();
423  }
424 
425  //--------------------------------------------------------------------------
426  // The main stream is not connected, we need to check whether enough time
427  // has passed since we last encountered an error (if any) so that we could
428  // re-attempt the connection
429  //--------------------------------------------------------------------------
430  Log *log = DefaultEnv::GetLog();
431  time_t now = ::time(0);
432 
433  if( now-pLastStreamError < pStreamErrorWindow )
434  return pLastFatalError;
435 
436  gettimeofday( &pConnectionStarted, 0 );
437  ++pConnectionCount;
438 
439  //--------------------------------------------------------------------------
440  // Resolve all the addresses of the host we're supposed to connect to
441  //--------------------------------------------------------------------------
442  XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
443  if( !st.IsOK() )
444  {
445  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
446  "the host", pStreamName.c_str() );
447  pLastStreamError = now;
448  st.status = stFatal;
449  pLastFatalError = st;
450  return st;
451  }
452 
453  if( pPrefer.IsValid() )
454  {
455  std::vector<XrdNetAddr> addrresses;
456  XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
457  if( !st.IsOK() )
458  {
459  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
460  pStreamName.c_str(), pPrefer.GetHostName().c_str() );
461  }
462  else
463  {
464  std::vector<XrdNetAddr> tmp;
465  tmp.reserve( pAddresses.size() );
466  // first add all remaining addresses
467  auto itr = pAddresses.begin();
468  for( ; itr != pAddresses.end() ; ++itr )
469  {
470  if( !HasNetAddr( *itr, addrresses ) )
471  tmp.push_back( *itr );
472  }
473  // then copy all 'preferred' addresses
474  std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
475  // and keep the result
476  pAddresses.swap( tmp );
477  }
478  }
479 
481  pAddresses );
482 
483  while( !pAddresses.empty() )
484  {
485  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
486  pAddresses.pop_back();
487  pConnectionInitTime = ::time( 0 );
488  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
489  if( st.IsOK() )
490  {
491  pSubStreams[0]->status = Socket::Connecting;
492  break;
493  }
494  }
495  return st;
496  }
497 
498  //----------------------------------------------------------------------------
499  // Queue the message for sending
500  //----------------------------------------------------------------------------
502  MsgHandler *handler,
503  bool stateful,
504  time_t expires )
505  {
506  StreamMutexHelper scopedLock( pMutex );
507  Log *log = DefaultEnv::GetLog();
508 
509  //--------------------------------------------------------------------------
510  // Check the session ID and bounce if needed
511  //--------------------------------------------------------------------------
512  if( msg->GetSessionId() &&
513  (pSubStreams[0]->status != Socket::Connected ||
514  pSessionId != msg->GetSessionId()) )
516 
517  //--------------------------------------------------------------------------
518  // Decide on the path to send the message
519  //--------------------------------------------------------------------------
520  PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
521  if( pSubStreams.size() <= path.up )
522  {
523  log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
524  "substream %d, using 0 instead", pStreamName.c_str(),
525  msg->GetObfuscatedDescription().c_str(), path.up );
526  path.up = 0;
527  }
528 
529  log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
530  "substream %d expecting answer at %d", pStreamName.c_str(),
531  msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
532 
533  //--------------------------------------------------------------------------
534  // Enable *a* path and insert the message to the right queue
535  //--------------------------------------------------------------------------
536  XRootDStatus st = EnableLink( path );
537  if( st.IsOK() )
538  {
539  pTransport->MultiplexSubStream( msg, *pChannelData, &path );
540  handler->OnWaitingToSend( msg );
541  pSubStreams[path.up]->outQueue->PushBack( msg, handler,
542  expires, stateful );
543  }
544  else
545  st.status = stFatal;
546  return st;
547  }
548 
549  //----------------------------------------------------------------------------
550  // Force connection
551  //----------------------------------------------------------------------------
553  {
554  StreamMutexHelper scopedLock( pMutex );
555  if( pSubStreams[0]->status == Socket::Connecting )
556  {
557  pSubStreams[0]->status = Socket::Disconnected;
558  XrdCl::PathID path( 0, 0 );
559  XrdCl::XRootDStatus st = EnableLink( path );
560  if( !st.IsOK() )
561  OnConnectError( 0, st );
562  }
563  }
564 
565  //----------------------------------------------------------------------------
566  // Disconnect the stream
567  //----------------------------------------------------------------------------
569  {
570  auto channel = GetChannel();
571  StreamMutexHelper scopedLock( pMutex );
572  SubStreamList::iterator it;
573  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
574  {
575  (*it)->socket->Close();
576  (*it)->status = Socket::Disconnected;
577  }
578  pSessionId = 0;
579  }
580 
581  //----------------------------------------------------------------------------
582  // Handle a clock event
583  //----------------------------------------------------------------------------
584  void Stream::Tick( time_t now )
585  {
586  //--------------------------------------------------------------------------
587  // Check for timed-out requests and incoming handlers
588  //--------------------------------------------------------------------------
589  StreamMutexHelper scopedLock( pMutex );
590  OutQueue q;
591  SubStreamList::iterator it;
592  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
593  q.GrabExpired( *(*it)->outQueue, now );
594  scopedLock.UnLock();
595 
597  pIncomingQueue->ReportTimeout( now );
598  }
599 }
600 
601 //------------------------------------------------------------------------------
602 // Handle message timeouts and reconnection in the future
603 //------------------------------------------------------------------------------
604 namespace
605 {
606  class StreamConnectorTask: public XrdCl::Task
607  {
608  public:
609  //------------------------------------------------------------------------
610  // Constructor
611  //------------------------------------------------------------------------
612  StreamConnectorTask( const XrdCl::URL &url, const std::string &n ):
613  url( url )
614  {
615  std::string name = "StreamConnectorTask for ";
616  name += n;
617  SetName( name );
618  }
619 
620  //------------------------------------------------------------------------
621  // Run the task
622  //------------------------------------------------------------------------
623  time_t Run( time_t )
624  {
626  return 0;
627  }
628 
629  private:
630  XrdCl::URL url;
631  };
632 }
633 
634 namespace XrdCl
635 {
636  XRootDStatus Stream::RequestClose( Message &response )
637  {
638  ServerResponse *rsp = reinterpret_cast<ServerResponse*>( response.GetBuffer() );
639  if( rsp->hdr.dlen < 4 ) return XRootDStatus( stError );
640  Message *msg;
641  ClientCloseRequest *req;
642  MessageUtils::CreateRequest( msg, req );
643  req->requestid = kXR_close;
644  memcpy( req->fhandle, reinterpret_cast<uint8_t*>( rsp->body.buffer.data ), 4 );
646  msg->SetSessionId( pSessionId );
647  NullResponseHandler *handler = new NullResponseHandler();
648  MessageSendParams params;
649  params.timeout = 0;
650  params.followRedirects = false;
651  params.stateful = true;
653  return MessageUtils::SendMessage( *pUrl, msg, handler, params, 0 );
654  }
655 
656  //------------------------------------------------------------------------
657  // Check if message is a partial response
658  //------------------------------------------------------------------------
659  bool Stream::IsPartial( Message &msg )
660  {
661  ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
662  if( rsphdr->status == kXR_oksofar )
663  return true;
664 
665  if( rsphdr->status == kXR_status )
666  {
667  ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
669  return true;
670  }
671 
672  return false;
673  }
674 
675  //----------------------------------------------------------------------------
676  // Call back when a message has been reconstructed
677  //----------------------------------------------------------------------------
678  void Stream::OnIncoming( uint16_t subStream,
679  std::shared_ptr<Message> msg,
680  uint32_t bytesReceived )
681  {
682  msg->SetSessionId( pSessionId );
683  pBytesReceived += bytesReceived;
684 
685  MsgHandler *handler = nullptr;
686  uint16_t action = 0;
687  {
688  InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
689  handler = mh.handler;
690  action = mh.action;
691  mh.Reset();
692  }
693 
694  if( !IsPartial( *msg ) )
695  {
696  uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
697  *pChannelData );
698  if( streamAction & TransportHandler::DigestMsg )
699  return;
700 
701  if( streamAction & TransportHandler::RequestClose )
702  {
703  RequestClose( *msg );
704  return;
705  }
706  }
707 
708  Log *log = DefaultEnv::GetLog();
709 
710  //--------------------------------------------------------------------------
711  // No handler, we discard the message ...
712  //--------------------------------------------------------------------------
713  if( !handler )
714  {
715  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
716  log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
717  "(status=%d, SID=[%d,%d]), no MsgHandler found.",
718  pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
719  rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
720  return;
721  }
722 
723  //--------------------------------------------------------------------------
724  // We have a handler, so we call the callback
725  //--------------------------------------------------------------------------
726  log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
727  pStreamName.c_str(), (void*)msg.get() );
728 
730  {
731  log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
732  pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
733 
734  // if we are handling partial response we have to take down the timeout fence
735  if( IsPartial( *msg ) )
736  {
737  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
738  if( xrdHandler ) xrdHandler->PartialReceived();
739  }
740 
741  return;
742  }
743 
744  Job *job = new HandleIncMsgJob( handler );
745  pJobManager->QueueJob( job );
746  }
747 
748  //----------------------------------------------------------------------------
749  // Call when one of the sockets is ready to accept a new message
750  //----------------------------------------------------------------------------
751  std::pair<Message *, MsgHandler *>
752  Stream::OnReadyToWrite( uint16_t subStream )
753  {
754  bool closing;
755  StreamMutexHelper scopedLock( pMutex, subStream, closing );
756  if( closing ) return std::make_pair( (Message *)0, (MsgHandler *)0 );
757  Log *log = DefaultEnv::GetLog();
758  if( pSubStreams[subStream]->outQueue->IsEmpty() )
759  {
760  log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
761  pSubStreams[subStream]->socket->GetStreamName().c_str() );
762 
763  pSubStreams[subStream]->socket->DisableUplink();
764  return std::make_pair( (Message *)0, (MsgHandler *)0 );
765  }
766 
767  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
768  h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
769  h.expires,
770  h.stateful );
771 
772  log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
773  "from out-queue to in-queue, starting to send outgoing.",
774  pUrl->GetHostId().c_str(), (void*)h.handler,
775  h.msg->GetObfuscatedDescription().c_str() );
776 
777  scopedLock.UnLock();
778 
779  if( h.handler )
780  {
781  bool rmMsg = false;
782  pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
783  if( rmMsg )
784  {
785  Log *log = DefaultEnv::GetLog();
786  log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
787  pStreamName.c_str() );
788  }
789  h.handler->OnReadyToSend( h.msg );
790  }
791  return std::make_pair( h.msg, h.handler );
792  }
793 
794  void Stream::DisableIfEmpty( uint16_t subStream )
795  {
796  bool closing;
797  StreamMutexHelper scopedLock( pMutex, subStream, closing );
798  if( closing ) return;
799  Log *log = DefaultEnv::GetLog();
800 
801  if( pSubStreams[subStream]->outQueue->IsEmpty() )
802  {
803  log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
804  pSubStreams[subStream]->socket->GetStreamName().c_str() );
805  pSubStreams[subStream]->socket->DisableUplink();
806  }
807  }
808 
809  //----------------------------------------------------------------------------
810  // Call when a message is written to the socket
811  //----------------------------------------------------------------------------
812  void Stream::OnMessageSent( uint16_t subStream,
813  Message *msg,
814  uint32_t bytesSent )
815  {
816  pTransport->MessageSent( msg, subStream, bytesSent,
817  *pChannelData );
818  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
819  pBytesSent += bytesSent;
820  if( h.handler )
821  {
822  // ensure expiration time is assigned if still in queue
823  pIncomingQueue->AssignTimeout( h.handler );
824  // OnStatusReady may cause the handler to delete itself, in
825  // which case the handler or the user callback may also delete msg
826  h.handler->OnStatusReady( msg, XRootDStatus() );
827  }
828  pSubStreams[subStream]->outMsgHelper.Reset();
829  }
830 
831  //----------------------------------------------------------------------------
832  // Call back when a message has been reconstructed
833  //----------------------------------------------------------------------------
834  void Stream::OnConnect( uint16_t subStream )
835  {
836  auto channel = GetChannel();
837  bool closing;
838  StreamMutexHelper scopedLock( pMutex, subStream, closing );
839  if( closing ) return;
840  pSubStreams[subStream]->status = Socket::Connected;
841 
842  std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
843  Log *log = DefaultEnv::GetLog();
844  log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
845  subStream, ipstack.c_str() );
846 
847  if( subStream == 0 )
848  {
849  pLastStreamError = 0;
850  pLastFatalError = XRootDStatus();
851  pConnectionCount = 0;
852  uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
853  pSessionId = ++sSessCntGen;
854 
855  //------------------------------------------------------------------------
856  // Create the streams if they don't exist yet
857  //------------------------------------------------------------------------
858  if( pSubStreams.size() == 1 && numSub > 1 )
859  {
860  for( uint16_t i = 1; i < numSub; ++i )
861  {
862  URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
863  AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
864  pChannelData, i, this );
865  pSubStreams.push_back( new SubStreamData() );
866  pSubStreams[i]->socket = s;
867  }
868  }
869 
870  //------------------------------------------------------------------------
871  // Connect the extra streams, if we fail we move all the outgoing items
872  // to stream 0, we don't need to enable the uplink here, because it
873  // should be already enabled after the handshaking process is completed.
874  //------------------------------------------------------------------------
875  if( pSubStreams.size() > 1 )
876  {
877  log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
878  pStreamName.c_str(), pSubStreams.size() - 1 );
879  for( size_t i = 1; i < pSubStreams.size(); ++i )
880  {
881  if( pSubStreams[i]->status != Socket::Disconnected )
882  {
883  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
884  SockHandlerClose( i );
885  }
886  pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
887  XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
888  if( !st.IsOK() )
889  {
890  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
891  SockHandlerClose( i );
892  }
893  else
894  {
895  pSubStreams[i]->status = Socket::Connecting;
896  }
897  }
898  }
899 
900  //------------------------------------------------------------------------
901  // Inform monitoring
902  //------------------------------------------------------------------------
903  pBytesSent = 0;
904  pBytesReceived = 0;
905  gettimeofday( &pConnectionDone, 0 );
907  if( mon )
908  {
910  i.server = pUrl->GetHostId();
911  i.sTOD = pConnectionStarted;
912  i.eTOD = pConnectionDone;
913  i.streams = pSubStreams.size();
914 
915  AnyObject qryResult;
916  std::string *qryResponse = nullptr;
917  pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
918  qryResult.Get( qryResponse );
919 
920  if (qryResponse) {
921  i.auth = *qryResponse;
922  delete qryResponse;
923  } else {
924  i.auth = "";
925  }
926 
927  mon->Event( Monitor::EvConnect, &i );
928  }
929 
930  //------------------------------------------------------------------------
931  // For every connected control-stream call the global on-connect handler
932  //------------------------------------------------------------------------
934  }
935  else if( pOnDataConnJob )
936  {
937  //------------------------------------------------------------------------
938  // For every connected data-stream call the on-connect handler
939  //------------------------------------------------------------------------
940  pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
941  }
942  }
943 
944  //----------------------------------------------------------------------------
945  // On connect error
946  //----------------------------------------------------------------------------
947  void Stream::OnConnectError( uint16_t subStream, XRootDStatus status )
948  {
949  auto channel = GetChannel();
950  bool closing;
951  StreamMutexHelper scopedLock( pMutex, subStream, closing );
952  if( closing ) return;
953  Log *log = DefaultEnv::GetLog();
954  SockHandlerClose( subStream );
955  time_t now = ::time(0);
956 
957  //--------------------------------------------------------------------------
958  // For every connection error call the global connection error handler
959  //--------------------------------------------------------------------------
961 
962  //--------------------------------------------------------------------------
963  // If we connected subStream == 0 and cannot connect >0 then we just give
964  // up and move the outgoing messages to another queue
965  //--------------------------------------------------------------------------
966  if( subStream > 0 )
967  {
968  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
969  if( pSubStreams[0]->status == Socket::Connected )
970  {
971  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
972  if( !st.IsOK() )
973  OnFatalError( 0, st, scopedLock );
974  return;
975  }
976 
977  if( pSubStreams[0]->status == Socket::Connecting )
978  return;
979 
980  OnFatalError( subStream, status, scopedLock );
981  return;
982  }
983 
984  //--------------------------------------------------------------------------
985  // Check if we still have time to try and do something in the current window
986  //--------------------------------------------------------------------------
987  time_t elapsed = now-pConnectionInitTime;
988  log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
989  pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
990 
991  //------------------------------------------------------------------------
992  // If we have some IP addresses left we try them
993  //------------------------------------------------------------------------
994  if( !pAddresses.empty() )
995  {
996  XRootDStatus st;
997  do
998  {
999  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
1000  pAddresses.pop_back();
1001  pConnectionInitTime = ::time( 0 );
1002  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
1003  }
1004  while( !pAddresses.empty() && !st.IsOK() );
1005 
1006  if( !st.IsOK() )
1007  OnFatalError( subStream, st, scopedLock );
1008  else
1009  pSubStreams[0]->status = Socket::Connecting;
1010 
1011  return;
1012  }
1013  //------------------------------------------------------------------------
1014  // If we still can retry with the same host name, we sleep until the end
1015  // of the connection window and try
1016  //------------------------------------------------------------------------
1017  else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
1018  && !status.IsFatal() )
1019  {
1020  log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
1021  pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
1022 
1023  pSubStreams[0]->status = Socket::Connecting;
1024  Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
1025  pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
1026  return;
1027  }
1028  //--------------------------------------------------------------------------
1029  // We are out of the connection window, the only thing we can do here
1030  // is re-resolving the host name and retrying if we still can
1031  //--------------------------------------------------------------------------
1032  else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
1033  {
1034  pAddresses.clear();
1035  pSubStreams[0]->status = Socket::Disconnected;
1036  PathID path( 0, 0 );
1037  XRootDStatus st = EnableLink( path );
1038  if( !st.IsOK() )
1039  OnFatalError( subStream, st, scopedLock );
1040  return;
1041  }
1042 
1043  //--------------------------------------------------------------------------
1044  // Else, we fail
1045  //--------------------------------------------------------------------------
1046  OnFatalError( subStream, status, scopedLock );
1047  }
1048 
1049  //----------------------------------------------------------------------------
1050  // Call back when an error has occurred
1051  //----------------------------------------------------------------------------
1052  void Stream::OnError( uint16_t subStream, XRootDStatus status )
1053  {
1054  auto channel = GetChannel();
1055  bool closing;
1056  StreamMutexHelper scopedLock( pMutex, subStream, closing );
1057  if( closing ) return;
1058  Log *log = DefaultEnv::GetLog();
1059  SockHandlerClose( subStream );
1060 
1061  log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
1062  pStreamName.c_str(), subStream, status.ToString().c_str() );
1063 
1064  //--------------------------------------------------------------------------
1065  // Reinsert the stuff that we have failed to sent
1066  //--------------------------------------------------------------------------
1067  Reinsert( subStream );
1068 
1069  //--------------------------------------------------------------------------
1070  // We are dealing with an error of a peripheral stream. If we don't have
1071  // anything to send don't bother recovering. Otherwise move the requests
1072  // to stream 0 if possible.
1073  //--------------------------------------------------------------------------
1074  if( subStream > 0 )
1075  {
1076  if( pSubStreams[subStream]->outQueue->IsEmpty() )
1077  return;
1078 
1079  if( pSubStreams[0]->status != Socket::Disconnected )
1080  {
1081  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
1082  if( pSubStreams[0]->status == Socket::Connected )
1083  {
1084  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
1085  if( !st.IsOK() )
1086  OnFatalError( 0, st, scopedLock );
1087  return;
1088  }
1089  }
1090  OnFatalError( subStream, status, scopedLock );
1091  return;
1092  }
1093 
1094  //--------------------------------------------------------------------------
1095  // If we lost the stream 0 we have lost the session, we re-enable the
1096  // stream if we still have things in one of the outgoing queues, otherwise
1097  // there is not point to recover at this point.
1098  //--------------------------------------------------------------------------
1099  if( subStream == 0 )
1100  {
1101  MonitorDisconnection( status );
1102  pSessionId = 0;
1103 
1104  SubStreamList::iterator it;
1105  size_t outstanding = 0;
1106  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1107  outstanding += (*it)->outQueue->GetSizeStateless();
1108 
1109  if( outstanding )
1110  {
1111  PathID path( 0, 0 );
1112  XRootDStatus st = EnableLink( path );
1113  if( !st.IsOK() )
1114  {
1115  OnFatalError( 0, st, scopedLock );
1116  return;
1117  }
1118  }
1119 
1120  //------------------------------------------------------------------------
1121  // We're done here, unlock the stream mutex to avoid deadlocks and
1122  // report the disconnection event to the handlers
1123  //------------------------------------------------------------------------
1124  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1125  "message handlers.", pStreamName.c_str() );
1126  OutQueue q;
1127  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1128  q.GrabStateful( *(*it)->outQueue );
1129  scopedLock.UnLock();
1130 
1131  q.Report( status );
1132  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1133  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1134  return;
1135  }
1136  }
1137 
1138  //------------------------------------------------------------------------
1139  // Force error
1140  //------------------------------------------------------------------------
1141  void Stream::ForceError( XRootDStatus status, const bool hush, const uint64_t sess )
1142  {
1143  auto channel = GetChannel();
1144  bool closing;
1145  StreamMutexHelper scopedLock( pMutex,
1146  [this, channel, status, hush, sess]()
1147  {
1148  this->ForceError(status, hush, sess);
1149  }, closing );
1150  if( closing ) return;
1151  if( sess && sess != pSessionId ) return;
1152 
1153  Log *log = DefaultEnv::GetLog();
1154  for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
1155  {
1156  if( pSubStreams[substream]->status != Socket::Connected ) continue;
1157  SockHandlerClose( substream );
1158 
1159  if( !hush )
1160  log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
1161  pStreamName.c_str(), status.ToString().c_str() );
1162 
1163  //--------------------------------------------------------------------
1164  // Reinsert the stuff that we have failed to sent
1165  //--------------------------------------------------------------------
1166  Reinsert( substream );
1167  }
1168 
1169  pConnectionCount = 0;
1170  pSessionId = 0;
1171 
1172  //------------------------------------------------------------------------
1173  // We're done here, unlock the stream mutex to avoid deadlocks and
1174  // report the disconnection event to the handlers
1175  //------------------------------------------------------------------------
1176  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1177  "message handlers.", pStreamName.c_str() );
1178 
1179  SubStreamList::iterator it;
1180  OutQueue q;
1181  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1182  q.GrabItems( *(*it)->outQueue );
1183  scopedLock.UnLock();
1184 
1185  q.Report( status );
1186 
1187  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1188  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1189  }
1190 
1191  //----------------------------------------------------------------------------
1192  // On fatal error
1193  //----------------------------------------------------------------------------
1194  void Stream::OnFatalError( uint16_t subStream,
1195  XRootDStatus status,
1196  StreamMutexHelper &lock )
1197  {
1198  Log *log = DefaultEnv::GetLog();
1199  SockHandlerClose( subStream );
1200  log->Error( PostMasterMsg, "[%s] Unable to recover: %s.",
1201  pStreamName.c_str(), status.ToString().c_str() );
1202 
1203  //--------------------------------------------------------------------------
1204  // Don't set the stream error windows for authentication errors as the user
1205  // may refresh his credential at any time
1206  //--------------------------------------------------------------------------
1207  if( status.code != errAuthFailed )
1208  {
1209  pConnectionCount = 0;
1210  pLastStreamError = ::time(0);
1211  pLastFatalError = status;
1212  }
1213 
1214  SubStreamList::iterator it;
1215  OutQueue q;
1216  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1217  q.GrabItems( *(*it)->outQueue );
1218  lock.UnLock();
1219 
1220  status.status = stFatal;
1221  q.Report( status );
1222  pIncomingQueue->ReportStreamEvent( MsgHandler::FatalError, status );
1223  pChannelEvHandlers.ReportEvent( ChannelEventHandler::FatalError, status );
1224 
1225  }
1226 
1227  //----------------------------------------------------------------------------
1228  // Inform monitoring about disconnection
1229  //----------------------------------------------------------------------------
1230  void Stream::MonitorDisconnection( XRootDStatus status )
1231  {
1232  Monitor *mon = DefaultEnv::GetMonitor();
1233  if( mon )
1234  {
1235  Monitor::DisconnectInfo i;
1236  i.server = pUrl->GetHostId();
1237  i.rBytes = pBytesReceived;
1238  i.sBytes = pBytesSent;
1239  i.cTime = ::time(0) - pConnectionDone.tv_sec;
1240  i.status = status;
1241  mon->Event( Monitor::EvDisconnect, &i );
1242  }
1243  }
1244 
1245  //----------------------------------------------------------------------------
1246  // Call back when a message has been reconstructed
1247  //----------------------------------------------------------------------------
1248  bool Stream::OnReadTimeout( uint16_t substream )
1249  {
1250  //--------------------------------------------------------------------------
1251  // We only take the main stream into account
1252  //--------------------------------------------------------------------------
1253  if( substream != 0 )
1254  return true;
1255 
1256  //--------------------------------------------------------------------------
1257  // Check if there is no outgoing messages and if the stream TTL is elapesed.
1258  // It is assumed that the underlying transport makes sure that there is no
1259  // pending requests that are not answered, ie. all possible virtual streams
1260  // are de-allocated
1261  //--------------------------------------------------------------------------
1262  Log *log = DefaultEnv::GetLog();
1263  SubStreamList::iterator it;
1264  time_t now = time(0);
1265 
1266  bool closing;
1267  StreamMutexHelper scopedLock( pMutex, substream, closing );
1268  if( closing ) return false;
1269  uint32_t outgoingMessages = 0;
1270  time_t lastActivity = 0;
1271  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1272  {
1273  outgoingMessages += (*it)->outQueue->GetSize();
1274  time_t sockLastActivity = (*it)->socket->GetLastActivity();
1275  if( lastActivity < sockLastActivity )
1276  lastActivity = sockLastActivity;
1277  }
1278 
1279  if( !outgoingMessages )
1280  {
1281  bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1282  *pChannelData );
1283  if( disconnect )
1284  {
1285  log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1286  pStreamName.c_str() );
1287  const uint64_t sess = pSessionId;
1288  scopedLock.UnLock();
1289  //----------------------------------------------------------------------
1290  // Important note!
1291  //
1292  // This destroys the Stream object itself, the underlined
1293  // AsyncSocketHandler object (that called this method) and the Channel
1294  // object that aggregates this Stream.
1295  //----------------------------------------------------------------------
1297  return false;
1298  }
1299  }
1300 
1301  //--------------------------------------------------------------------------
1302  // Check if the stream is broken
1303  //--------------------------------------------------------------------------
1304  XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1305  *pChannelData );
1306  if( !st.IsOK() )
1307  {
1308  scopedLock.UnLock();
1309  OnError( substream, st );
1310  return false;
1311  }
1312  return true;
1313  }
1314 
1315  //----------------------------------------------------------------------------
1316  // Call back when a message has been reconstru
1317  //----------------------------------------------------------------------------
1318  bool Stream::OnWriteTimeout( uint16_t /*substream*/ )
1319  {
1320  return true;
1321  }
1322 
1323  //----------------------------------------------------------------------------
1324  // Register channel event handler
1325  //----------------------------------------------------------------------------
1327  {
1328  pChannelEvHandlers.AddHandler( handler );
1329  }
1330 
1331  //----------------------------------------------------------------------------
1332  // Remove a channel event handler
1333  //----------------------------------------------------------------------------
1335  {
1336  pChannelEvHandlers.RemoveHandler( handler );
1337  }
1338 
1339  //----------------------------------------------------------------------------
1340  // Install a incoming message handler
1341  //----------------------------------------------------------------------------
1342  MsgHandler*
1343  Stream::InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream )
1344  {
1345  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1346  if( !mh.handler )
1347  mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1348  mh.expires,
1349  mh.action );
1350 
1351  if( !mh.handler )
1352  return nullptr;
1353 
1354  if( mh.action & MsgHandler::Raw )
1355  return mh.handler;
1356  return nullptr;
1357  }
1358 
1359  //----------------------------------------------------------------------------
1363  //----------------------------------------------------------------------------
1364  uint16_t Stream::InspectStatusRsp( uint16_t stream,
1365  MsgHandler *&incHandler )
1366  {
1367  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1368  if( !mh.handler )
1370 
1371  uint16_t action = mh.handler->InspectStatusRsp();
1372  mh.action |= action;
1373 
1374  if( action & MsgHandler::RemoveHandler )
1375  pIncomingQueue->RemoveMessageHandler( mh.handler );
1376 
1377  if( action & MsgHandler::Raw )
1378  {
1379  incHandler = mh.handler;
1380  return MsgHandler::Raw;
1381  }
1382 
1383  if( action & MsgHandler::Corrupted )
1384  return MsgHandler::Corrupted;
1385 
1386  if( action & MsgHandler::More )
1387  return MsgHandler::More;
1388 
1389  return MsgHandler::None;
1390  }
1391 
1392  //----------------------------------------------------------------------------
1393  // Check if channel can be collapsed using given URL
1394  //----------------------------------------------------------------------------
1395  bool Stream::CanCollapse( const URL &url )
1396  {
1397  Log *log = DefaultEnv::GetLog();
1398 
1399  //--------------------------------------------------------------------------
1400  // Resolve all the addresses of the host we're supposed to connect to
1401  //--------------------------------------------------------------------------
1402  std::vector<XrdNetAddr> prefaddrs;
1403  XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1404  if( !st.IsOK() )
1405  {
1406  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1407  , pStreamName.c_str(), url.GetHostName().c_str() );
1408  return false;
1409  }
1410 
1411  //--------------------------------------------------------------------------
1412  // Resolve all the addresses of the alias
1413  //--------------------------------------------------------------------------
1414  std::vector<XrdNetAddr> aliasaddrs;
1415  st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1416  if( !st.IsOK() )
1417  {
1418  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1419  , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1420  return false;
1421  }
1422 
1423  //--------------------------------------------------------------------------
1424  // Now check if the preferred host is part of the alias
1425  //--------------------------------------------------------------------------
1426  auto itr = prefaddrs.begin();
1427  for( ; itr != prefaddrs.end() ; ++itr )
1428  {
1429  auto itr2 = aliasaddrs.begin();
1430  for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1431  if( itr->Same( &*itr2 ) ) return true;
1432  }
1433 
1434  return false;
1435  }
1436 
1437  //------------------------------------------------------------------------
1438  // Query the stream
1439  //------------------------------------------------------------------------
1440  Status Stream::Query( uint16_t query, AnyObject &result )
1441  {
1442  switch( query )
1443  {
1444  case StreamQuery::IpAddr:
1445  {
1446  result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1447  return Status();
1448  }
1449 
1450  case StreamQuery::IpStack:
1451  {
1452  result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1453  return Status();
1454  }
1455 
1456  case StreamQuery::HostName:
1457  {
1458  result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1459  return Status();
1460  }
1461 
1462  default:
1463  return Status( stError, errQueryNotSupported );
1464  }
1465  }
1466 
1467  //----------------------------------------------------------------------------
1468  // Used under error conditions to move handlers from the out & in queue
1469  // helpers back to main out queue for the subStream or the in queue.
1470  //----------------------------------------------------------------------------
1471  void Stream::Reinsert( uint16_t subStream )
1472  {
1473  //--------------------------------------------------------------------------
1474  // Out MsgHelper
1475  //--------------------------------------------------------------------------
1476  if( pSubStreams[subStream]->outMsgHelper.msg )
1477  {
1478  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
1479  if( pIncomingQueue->HasUnsetTimeout( h.handler ) )
1480  {
1481  pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
1482  h.stateful );
1483  pIncomingQueue->RemoveMessageHandler(h.handler);
1484  h.handler->OnWaitingToSend( h.msg );
1485  }
1486  else
1487  {
1488  // Since the handler has been removed from the in-queue or had its
1489  // timeout assigned it must have been sent.
1490  h.handler->OnStatusReady( h.msg, XRootDStatus() );
1491  }
1492  pSubStreams[subStream]->outMsgHelper.Reset();
1493  }
1494 
1495  //--------------------------------------------------------------------------
1496  // In MsgHelper. Reset any partially read partial.
1497  //--------------------------------------------------------------------------
1498  if( pSubStreams[subStream]->inMsgHelper.handler )
1499  {
1500  InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
1501  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
1502  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
1503  if( xrdHandler ) xrdHandler->PartialReceived();
1504  h.Reset();
1505  }
1506  }
1507 
1508  //----------------------------------------------------------------------------
1509  // Marks subStream as disconnected and closes the sockethandler.
1510  // pMutex should be locked throughout.
1511  //----------------------------------------------------------------------------
1512  void Stream::SockHandlerClose( uint16_t subStream )
1513  {
1514  SubStreamData *sd = pSubStreams[subStream];
1515  sd->status = Socket::Disconnected;
1516  pMutex.AddClosing(subStream);
1517  sd->socket->PreClose();
1518  AsyncSocketHandler *s = new AsyncSocketHandler( *sd->socket );
1519  Job *job = new SocketDestroyJob( sd->socket );
1520  pJobManager->QueueJob( job );
1521  sd->socket = s;
1522  pMutex.RemoveClosing(subStream);
1523  }
1524 }
union ServerResponse::@0 body
kXR_char streamid[2]
Definition: XProtocol.hh:956
kXR_unt16 requestid
Definition: XProtocol.hh:257
@ kXR_oksofar
Definition: XProtocol.hh:942
@ kXR_status
Definition: XProtocol.hh:949
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1304
kXR_char fhandle[4]
Definition: XProtocol.hh:258
@ kXR_close
Definition: XProtocol.hh:116
ServerResponseHeader hdr
Definition: XProtocol.hh:1330
static uint64_t swap(uint64_t x)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ FatalError
Stream has been broken and won't be recovered.
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66
void AssignTimeout(MsgHandler *handler)
bool HasUnsetTimeout(MsgHandler *handler)
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
Definition: XrdClInQueue.cc:54
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
virtual void OnWaitingToSend(Message *msg)
Called to indicate the message is waiting to be sent.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
Status ForceDisconnect(const URL &url)
Shut down a channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual void Run(void *)
The job logic.
Definition: XrdClStream.cc:287
SocketDestroyJob(AsyncSocketHandler *socket)
Definition: XrdClStream.cc:284
A network socket.
Definition: XrdClSocket.hh:43
SocketStatus
Status of the socket.
Definition: XrdClSocket.hh:49
@ Disconnected
The socket is disconnected.
Definition: XrdClSocket.hh:50
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
void UnLock()
UnLock.
Definition: XrdClStream.cc:238
void AddClosing(uint16_t subStream)
AddClosing. Notified that subStream will be closed.
Definition: XrdClStream.cc:98
std::map< pthread_t, std::list< MtxInfo >::iterator > mthmap
Definition: XrdClStream.hh:119
std::list< MtxInfo >::iterator fnlistit
Definition: XrdClStream.hh:121
void Lock()
Lock. Regular, non-subStream aware recursive lock.
Definition: XrdClStream.cc:119
XrdSysCondVar mcv
Definition: XrdClStream.hh:116
void RemoveClosing(uint16_t subStream)
RemoveClosing. Notified that subStream close has completed.
Definition: XrdClStream.cc:108
std::map< uint16_t, size_t > mclosing
Definition: XrdClStream.hh:118
std::list< MtxInfo > mlist
Definition: XrdClStream.hh:117
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
Definition: XrdClStream.cc:501
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
Definition: XrdClStream.cc:552
Status Query(uint16_t query, AnyObject &result)
Query the stream.
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:391
Stream(const URL *url, const URL &prefer=URL())
Constructor.
Definition: XrdClStream.cc:299
std::shared_ptr< Channel > GetChannel()
Definition: XrdClStream.hh:414
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:834
void Tick(time_t now)
Definition: XrdClStream.cc:584
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:947
~Stream()
Destructor.
Definition: XrdClStream.cc:356
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
Definition: XrdClStream.cc:794
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
Definition: XrdClStream.cc:812
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:678
void OnError(uint16_t subStream, XRootDStatus status)
On error.
void ForceError(XRootDStatus status, const bool hush, const uint64_t sess)
Force error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
Definition: XrdClStream.cc:752
XRootDStatus Initialize()
Initializer.
Definition: XrdClStream.cc:375
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.
URL representation.
Definition: XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
Random utilities.
Definition: XrdClUtils.hh:50
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
Definition: XrdClUtils.cc:234
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
Definition: XrdClUtils.cc:140
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
Definition: XrdClUtils.cc:123
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:104
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:716
static pthread_t ID(void)
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79
const uint16_t errAuthFailed
Definition: XrdClStatus.hh:88
@ kXR_PartialResult
Definition: XProtocol.hh:1293
MsgHandler * handler
Definition: XrdClStream.cc:67
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Definition: XrdClStream.cc:57
Describe a server login event.
Definition: XrdClMonitor.hh:72
std::string server
"user@host:port"
Definition: XrdClMonitor.hh:78
uint16_t streams
Number of streams.
Definition: XrdClMonitor.hh:82
timeval sTOD
gettimeofday() when login started
Definition: XrdClMonitor.hh:80
timeval eTOD
gettimeofday() when login ended
Definition: XrdClMonitor.hh:81
std::string auth
authentication protocol used or empty if none
Definition: XrdClMonitor.hh:79
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
uint16_t status
Status of the execution.
Definition: XrdClStatus.hh:146
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
bool IsFatal() const
Fatal error.
Definition: XrdClStatus.hh:123
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
Definition: XrdClStream.cc:89
AsyncSocketHandler * socket
Definition: XrdClStream.cc:86
OutQueue::MsgHelper outMsgHelper
Definition: XrdClStream.cc:88
Socket::SocketStatus status
Definition: XrdClStream.cc:90
static const uint16_t Auth
Transport name, returns std::string *.