XRootD
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (const URL *url, const URL &prefer=URL())
 Constructor. More...
 
 ~Stream ()
 Destructor. More...
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty. More...
 
XRootDStatus EnableLink (PathID &path)
 
void Finalize ()
 
void ForceConnect ()
 Force connection. More...
 
void ForceError (XRootDStatus status, const bool hush, const uint64_t sess)
 Force error. More...
 
std::shared_ptr< ChannelGetChannel ()
 
const std::string & GetName () const
 Return stream name. More...
 
const URLGetURL () const
 Get the URL. More...
 
XRootDStatus Initialize ()
 Initializer. More...
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed. More...
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error. More...
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error. More...
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed. More...
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout. More...
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout. More...
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream. More...
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler. More...
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler. More...
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending. More...
 
void SetChannel (std::weak_ptr< Channel > &channel)
 Sets a weak_ptr of our owning Channel. More...
 
void SetChannelData (AnyObject *channelData)
 Set the channel data. More...
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue. More...
 
void SetJobManager (JobManager *jobManager)
 Set job manager. More...
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams. More...
 
void SetPoller (Poller *poller)
 Set the poller. More...
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager. More...
 
void SetTransport (TransportHandler *transport)
 Set the transport. More...
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 169 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 175 of file XrdClStream.hh.

176  {
177  Disconnected = 0,
178  Connected = 1,
179  Connecting = 2,
180  Error = 3
181  };
@ Disconnected
Not connected.
Definition: XrdClStream.hh:177
@ Error
Broken.
Definition: XrdClStream.hh:180
@ Connected
Connected.
Definition: XrdClStream.hh:178
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:179

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( const URL url,
const URL prefer = URL() 
)

Constructor.

Definition at line 299 of file XrdClStream.cc.

299  :
300  pUrl( url ),
301  pPrefer( prefer ),
302  pTransport( 0 ),
303  pPoller( 0 ),
304  pTaskManager( 0 ),
305  pJobManager( 0 ),
306  pIncomingQueue( 0 ),
307  pChannelData( 0 ),
308  pLastStreamError( 0 ),
309  pConnectionCount( 0 ),
310  pConnectionInitTime( 0 ),
311  pAddressType( Utils::IPAll ),
312  pSessionId( 0 ),
313  pBytesSent( 0 ),
314  pBytesReceived( 0 )
315  {
316  pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
317  pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
318 
319  std::ostringstream o;
320  o << pUrl->GetHostId();
321  pStreamName = o.str();
322 
323  pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
325  pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
327  pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
329 
330  std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
332 
333  pAddressType = Utils::String2AddressType( netStack );
334  if( pAddressType == Utils::AddressType::IPAuto )
335  {
336  XrdNetUtils::NetProt stacks = XrdNetUtils::NetConfig( XrdNetUtils::NetType::qryINIF );
337  if( !( stacks & XrdNetUtils::hasIP64 ) )
338  {
339  if( stacks & XrdNetUtils::hasIPv4 )
340  pAddressType = Utils::AddressType::IPv4;
341  else if( stacks & XrdNetUtils::hasIPv6 )
342  pAddressType = Utils::AddressType::IPv6;
343  }
344  }
345 
346  Log *log = DefaultEnv::GetLog();
347  log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
348  "Connection Window: %d, ConnectionRetry: %d, Stream Error "
349  "Window: %d", pStreamName.c_str(), netStack.c_str(),
350  pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
351  }
static Log * GetLog()
Get default log.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
Definition: XrdClUtils.cc:123
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:104
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:716
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition: XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::URL::GetHostId(), XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 356 of file XrdClStream.cc.

357  {
358  // Used to disconnect substreams here, but since we are refernce counted
359  // and connected substream hold a count, if we're here they're closed.
360 
361  Log *log = DefaultEnv::GetLog();
362  log->Debug( PostMasterMsg, "[%s] Destroying stream",
363  pStreamName.c_str() );
364 
365  MonitorDisconnection( XRootDStatus() );
366 
367  SubStreamList::iterator it;
368  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
369  delete *it;
370  }

References XrdCl::Log::Debug(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1395 of file XrdClStream.cc.

1396  {
1397  Log *log = DefaultEnv::GetLog();
1398 
1399  //--------------------------------------------------------------------------
1400  // Resolve all the addresses of the host we're supposed to connect to
1401  //--------------------------------------------------------------------------
1402  std::vector<XrdNetAddr> prefaddrs;
1403  XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1404  if( !st.IsOK() )
1405  {
1406  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1407  , pStreamName.c_str(), url.GetHostName().c_str() );
1408  return false;
1409  }
1410 
1411  //--------------------------------------------------------------------------
1412  // Resolve all the addresses of the alias
1413  //--------------------------------------------------------------------------
1414  std::vector<XrdNetAddr> aliasaddrs;
1415  st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1416  if( !st.IsOK() )
1417  {
1418  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1419  , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1420  return false;
1421  }
1422 
1423  //--------------------------------------------------------------------------
1424  // Now check if the preferred host is part of the alias
1425  //--------------------------------------------------------------------------
1426  auto itr = prefaddrs.begin();
1427  for( ; itr != prefaddrs.end() ; ++itr )
1428  {
1429  auto itr2 = aliasaddrs.begin();
1430  for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1431  if( itr->Same( &*itr2 ) ) return true;
1432  }
1433 
1434  return false;
1435  }
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
Definition: XrdClUtils.cc:140

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t  subStream)

Disables respective uplink if empty.

Definition at line 794 of file XrdClStream.cc.

795  {
796  bool closing;
797  StreamMutexHelper scopedLock( pMutex, subStream, closing );
798  if( closing ) return;
799  Log *log = DefaultEnv::GetLog();
800 
801  if( pSubStreams[subStream]->outQueue->IsEmpty() )
802  {
803  log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
804  pSubStreams[subStream]->socket->GetStreamName().c_str() );
805  pSubStreams[subStream]->socket->DisableUplink();
806  }
807  }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::AsyncSocketHandler::OnWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 391 of file XrdClStream.cc.

392  {
393  StreamMutexHelper scopedLock( pMutex );
394 
395  //--------------------------------------------------------------------------
396  // We are in the process of connecting the main stream, so we do nothing
397  // because when the main stream connection is established it will connect
398  // all the other streams
399  //--------------------------------------------------------------------------
400  if( pSubStreams[0]->status == Socket::Connecting )
401  return XRootDStatus();
402 
403  //--------------------------------------------------------------------------
404  // The main stream is connected, so we can verify whether we have
405  // the up and the down stream connected and ready to handle data.
406  // If anything is not right we fall back to stream 0.
407  //--------------------------------------------------------------------------
408  if( pSubStreams[0]->status == Socket::Connected )
409  {
410  if( pSubStreams[path.down]->status != Socket::Connected )
411  path.down = 0;
412 
413  if( pSubStreams[path.up]->status == Socket::Disconnected )
414  {
415  path.up = 0;
416  return pSubStreams[0]->socket->EnableUplink();
417  }
418 
419  if( pSubStreams[path.up]->status == Socket::Connected )
420  return pSubStreams[path.up]->socket->EnableUplink();
421 
422  return XRootDStatus();
423  }
424 
425  //--------------------------------------------------------------------------
426  // The main stream is not connected, we need to check whether enough time
427  // has passed since we last encountered an error (if any) so that we could
428  // re-attempt the connection
429  //--------------------------------------------------------------------------
430  Log *log = DefaultEnv::GetLog();
431  time_t now = ::time(0);
432 
433  if( now-pLastStreamError < pStreamErrorWindow )
434  return pLastFatalError;
435 
436  gettimeofday( &pConnectionStarted, 0 );
437  ++pConnectionCount;
438 
439  //--------------------------------------------------------------------------
440  // Resolve all the addresses of the host we're supposed to connect to
441  //--------------------------------------------------------------------------
442  XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
443  if( !st.IsOK() )
444  {
445  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
446  "the host", pStreamName.c_str() );
447  pLastStreamError = now;
448  st.status = stFatal;
449  pLastFatalError = st;
450  return st;
451  }
452 
453  if( pPrefer.IsValid() )
454  {
455  std::vector<XrdNetAddr> addrresses;
456  XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
457  if( !st.IsOK() )
458  {
459  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
460  pStreamName.c_str(), pPrefer.GetHostName().c_str() );
461  }
462  else
463  {
464  std::vector<XrdNetAddr> tmp;
465  tmp.reserve( pAddresses.size() );
466  // first add all remaining addresses
467  auto itr = pAddresses.begin();
468  for( ; itr != pAddresses.end() ; ++itr )
469  {
470  if( !HasNetAddr( *itr, addrresses ) )
471  tmp.push_back( *itr );
472  }
473  // then copy all 'preferred' addresses
474  std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
475  // and keep the result
476  pAddresses.swap( tmp );
477  }
478  }
479 
481  pAddresses );
482 
483  while( !pAddresses.empty() )
484  {
485  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
486  pAddresses.pop_back();
487  pConnectionInitTime = ::time( 0 );
488  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
489  if( st.IsOK() )
490  {
491  pSubStreams[0]->status = Socket::Connecting;
492  break;
493  }
494  }
495  return st;
496  }
@ Disconnected
The socket is disconnected.
Definition: XrdClSocket.hh:50
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
Definition: XrdClUtils.cc:234
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostId(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Finalize()

void XrdCl::Stream::Finalize ( )

Used at finalize time, disconnects the stream. Assumes poller and jobmanager are not running.

Definition at line 568 of file XrdClStream.cc.

569  {
570  auto channel = GetChannel();
571  StreamMutexHelper scopedLock( pMutex );
572  SubStreamList::iterator it;
573  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
574  {
575  (*it)->socket->Close();
576  (*it)->status = Socket::Disconnected;
577  }
578  pSessionId = 0;
579  }
std::shared_ptr< Channel > GetChannel()
Definition: XrdClStream.hh:414

References XrdCl::Socket::Disconnected, and GetChannel().

+ Here is the call graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 552 of file XrdClStream.cc.

553  {
554  StreamMutexHelper scopedLock( pMutex );
555  if( pSubStreams[0]->status == Socket::Connecting )
556  {
557  pSubStreams[0]->status = Socket::Disconnected;
558  XrdCl::PathID path( 0, 0 );
559  XrdCl::XRootDStatus st = EnableLink( path );
560  if( !st.IsOK() )
561  OnConnectError( 0, st );
562  }
563  }
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:391
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:947
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

+ Here is the call graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus  status,
const bool  hush,
const uint64_t  sess 
)

Force error.

Definition at line 1141 of file XrdClStream.cc.

1142  {
1143  auto channel = GetChannel();
1144  bool closing;
1145  StreamMutexHelper scopedLock( pMutex,
1146  [this, channel, status, hush, sess]()
1147  {
1148  this->ForceError(status, hush, sess);
1149  }, closing );
1150  if( closing ) return;
1151  if( sess && sess != pSessionId ) return;
1152 
1153  Log *log = DefaultEnv::GetLog();
1154  for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
1155  {
1156  if( pSubStreams[substream]->status != Socket::Connected ) continue;
1157  SockHandlerClose( substream );
1158 
1159  if( !hush )
1160  log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
1161  pStreamName.c_str(), status.ToString().c_str() );
1162 
1163  //--------------------------------------------------------------------
1164  // Reinsert the stuff that we have failed to sent
1165  //--------------------------------------------------------------------
1166  Reinsert( substream );
1167  }
1168 
1169  pConnectionCount = 0;
1170  pSessionId = 0;
1171 
1172  //------------------------------------------------------------------------
1173  // We're done here, unlock the stream mutex to avoid deadlocks and
1174  // report the disconnection event to the handlers
1175  //------------------------------------------------------------------------
1176  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1177  "message handlers.", pStreamName.c_str() );
1178 
1179  SubStreamList::iterator it;
1180  OutQueue q;
1181  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1182  q.GrabItems( *(*it)->outQueue );
1183  scopedLock.UnLock();
1184 
1185  q.Report( status );
1186 
1187  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1188  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1189  }
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
@ Broken
The stream is broken.
void ForceError(XRootDStatus status, const bool hush, const uint64_t sess)
Force error.

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), GetChannel(), XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::PostMasterMsg, XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::ChannelEventHandler::StreamBroken, and XrdCl::Status::ToString().

Referenced by XrdCl::AsyncSocketHandler::OnHeaderCorruption().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetChannel()

std::shared_ptr<Channel> XrdCl::Stream::GetChannel ( )
inline

Gets a shared_ptr of our owning Channel. Used to by our AsyncSocketHandlers to obtain a ref count, and internally to ensure Channel (and thus ourselves) remains alive during error-event handlers that close sockets.

Definition at line 414 of file XrdClStream.hh.

415  {
416  return pChannel.lock();
417  }

Referenced by XrdCl::AsyncSocketHandler::Connect(), Finalize(), ForceError(), OnConnect(), OnConnectError(), OnError(), and OnReadTimeout().

+ Here is the caller graph for this function:

◆ GetName()

const std::string& XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 297 of file XrdClStream.hh.

298  {
299  return pStreamName;
300  }

◆ GetURL()

const URL* XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 284 of file XrdClStream.hh.

285  {
286  return pUrl;
287  }

Referenced by XrdCl::AsyncSocketHandler::OnConnectionReturn().

+ Here is the caller graph for this function:

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 375 of file XrdClStream.cc.

376  {
377  if( !pTransport || !pPoller || !pChannelData )
378  return XRootDStatus( stError, errUninitialized );
379 
380  AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
381  pChannelData, 0, this );
382  pSubStreams.push_back( new SubStreamData() );
383  pSubStreams[0]->socket = s;
384  return XRootDStatus();
385  }
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32

References XrdCl::errUninitialized, and XrdCl::stError.

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t  stream,
MsgHandler *&  incHandler 
)

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1364 of file XrdClStream.cc.

1366  {
1367  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1368  if( !mh.handler )
1370 
1371  uint16_t action = mh.handler->InspectStatusRsp();
1372  mh.action |= action;
1373 
1374  if( action & MsgHandler::RemoveHandler )
1375  pIncomingQueue->RemoveMessageHandler( mh.handler );
1376 
1377  if( action & MsgHandler::Raw )
1378  {
1379  incHandler = mh.handler;
1380  return MsgHandler::Raw;
1381  }
1382 
1383  if( action & MsgHandler::Corrupted )
1384  return MsgHandler::Corrupted;
1385 
1386  if( action & MsgHandler::More )
1387  return MsgHandler::More;
1388 
1389  return MsgHandler::None;
1390  }
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, and XrdCl::InQueue::RemoveMessageHandler().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > &  msg,
uint16_t  stream 
)

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1343 of file XrdClStream.cc.

1344  {
1345  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1346  if( !mh.handler )
1347  mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1348  mh.expires,
1349  mh.action );
1350 
1351  if( !mh.handler )
1352  return nullptr;
1353 
1354  if( mh.action & MsgHandler::Raw )
1355  return mh.handler;
1356  return nullptr;
1357  }
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InQueue::GetHandlerForMessage(), XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t  subStream)

Call back when a message has been reconstructed.

Definition at line 834 of file XrdClStream.cc.

835  {
836  auto channel = GetChannel();
837  bool closing;
838  StreamMutexHelper scopedLock( pMutex, subStream, closing );
839  if( closing ) return;
840  pSubStreams[subStream]->status = Socket::Connected;
841 
842  std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
843  Log *log = DefaultEnv::GetLog();
844  log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
845  subStream, ipstack.c_str() );
846 
847  if( subStream == 0 )
848  {
849  pLastStreamError = 0;
850  pLastFatalError = XRootDStatus();
851  pConnectionCount = 0;
852  uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
853  pSessionId = ++sSessCntGen;
854 
855  //------------------------------------------------------------------------
856  // Create the streams if they don't exist yet
857  //------------------------------------------------------------------------
858  if( pSubStreams.size() == 1 && numSub > 1 )
859  {
860  for( uint16_t i = 1; i < numSub; ++i )
861  {
862  URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
863  AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
864  pChannelData, i, this );
865  pSubStreams.push_back( new SubStreamData() );
866  pSubStreams[i]->socket = s;
867  }
868  }
869 
870  //------------------------------------------------------------------------
871  // Connect the extra streams, if we fail we move all the outgoing items
872  // to stream 0, we don't need to enable the uplink here, because it
873  // should be already enabled after the handshaking process is completed.
874  //------------------------------------------------------------------------
875  if( pSubStreams.size() > 1 )
876  {
877  log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
878  pStreamName.c_str(), pSubStreams.size() - 1 );
879  for( size_t i = 1; i < pSubStreams.size(); ++i )
880  {
881  if( pSubStreams[i]->status != Socket::Disconnected )
882  {
883  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
884  SockHandlerClose( i );
885  }
886  pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
887  XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
888  if( !st.IsOK() )
889  {
890  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
891  SockHandlerClose( i );
892  }
893  else
894  {
895  pSubStreams[i]->status = Socket::Connecting;
896  }
897  }
898  }
899 
900  //------------------------------------------------------------------------
901  // Inform monitoring
902  //------------------------------------------------------------------------
903  pBytesSent = 0;
904  pBytesReceived = 0;
905  gettimeofday( &pConnectionDone, 0 );
906  Monitor *mon = DefaultEnv::GetMonitor();
907  if( mon )
908  {
909  Monitor::ConnectInfo i;
910  i.server = pUrl->GetHostId();
911  i.sTOD = pConnectionStarted;
912  i.eTOD = pConnectionDone;
913  i.streams = pSubStreams.size();
914 
915  AnyObject qryResult;
916  std::string *qryResponse = nullptr;
917  pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
918  qryResult.Get( qryResponse );
919 
920  if (qryResponse) {
921  i.auth = *qryResponse;
922  delete qryResponse;
923  } else {
924  i.auth = "";
925  }
926 
927  mon->Event( Monitor::EvConnect, &i );
928  }
929 
930  //------------------------------------------------------------------------
931  // For every connected control-stream call the global on-connect handler
932  //------------------------------------------------------------------------
934  }
935  else if( pOnDataConnJob )
936  {
937  //------------------------------------------------------------------------
938  // For every connected data-stream call the on-connect handler
939  //------------------------------------------------------------------------
940  pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
941  }
942  }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::Monitor::ConnectInfo::auth, XrdCl::TransportQuery::Auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::TransportHandler::GetBindPreference(), GetChannel(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::Query(), XrdCl::JobManager::QueueJob(), XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, and XrdCl::TransportHandler::SubStreamNumber().

Referenced by XrdCl::AsyncSocketHandler::HandShakeNextStep().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t  subStream,
XRootDStatus  status 
)

On connect error.

Definition at line 947 of file XrdClStream.cc.

948  {
949  auto channel = GetChannel();
950  bool closing;
951  StreamMutexHelper scopedLock( pMutex, subStream, closing );
952  if( closing ) return;
953  Log *log = DefaultEnv::GetLog();
954  SockHandlerClose( subStream );
955  time_t now = ::time(0);
956 
957  //--------------------------------------------------------------------------
958  // For every connection error call the global connection error handler
959  //--------------------------------------------------------------------------
961 
962  //--------------------------------------------------------------------------
963  // If we connected subStream == 0 and cannot connect >0 then we just give
964  // up and move the outgoing messages to another queue
965  //--------------------------------------------------------------------------
966  if( subStream > 0 )
967  {
968  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
969  if( pSubStreams[0]->status == Socket::Connected )
970  {
971  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
972  if( !st.IsOK() )
973  OnFatalError( 0, st, scopedLock );
974  return;
975  }
976 
977  if( pSubStreams[0]->status == Socket::Connecting )
978  return;
979 
980  OnFatalError( subStream, status, scopedLock );
981  return;
982  }
983 
984  //--------------------------------------------------------------------------
985  // Check if we still have time to try and do something in the current window
986  //--------------------------------------------------------------------------
987  time_t elapsed = now-pConnectionInitTime;
988  log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
989  pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
990 
991  //------------------------------------------------------------------------
992  // If we have some IP addresses left we try them
993  //------------------------------------------------------------------------
994  if( !pAddresses.empty() )
995  {
996  XRootDStatus st;
997  do
998  {
999  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
1000  pAddresses.pop_back();
1001  pConnectionInitTime = ::time( 0 );
1002  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
1003  }
1004  while( !pAddresses.empty() && !st.IsOK() );
1005 
1006  if( !st.IsOK() )
1007  OnFatalError( subStream, st, scopedLock );
1008  else
1009  pSubStreams[0]->status = Socket::Connecting;
1010 
1011  return;
1012  }
1013  //------------------------------------------------------------------------
1014  // If we still can retry with the same host name, we sleep until the end
1015  // of the connection window and try
1016  //------------------------------------------------------------------------
1017  else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
1018  && !status.IsFatal() )
1019  {
1020  log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
1021  pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
1022 
1023  pSubStreams[0]->status = Socket::Connecting;
1024  Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
1025  pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
1026  return;
1027  }
1028  //--------------------------------------------------------------------------
1029  // We are out of the connection window, the only thing we can do here
1030  // is re-resolving the host name and retrying if we still can
1031  //--------------------------------------------------------------------------
1032  else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
1033  {
1034  pAddresses.clear();
1035  pSubStreams[0]->status = Socket::Disconnected;
1036  PathID path( 0, 0 );
1037  XRootDStatus st = EnableLink( path );
1038  if( !st.IsOK() )
1039  OnFatalError( subStream, st, scopedLock );
1040  return;
1041  }
1042 
1043  //--------------------------------------------------------------------------
1044  // Else, we fail
1045  //--------------------------------------------------------------------------
1046  OnFatalError( subStream, status, scopedLock );
1047  }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void RegisterTask(Task *task, time_t time, bool own=true)

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), GetChannel(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), XrdCl::PostMasterMsg, and XrdCl::TaskManager::RegisterTask().

Referenced by ForceConnect(), XrdCl::AsyncSocketHandler::OnConnectionReturn(), and XrdCl::AsyncSocketHandler::OnFaultWhileHandshaking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t  subStream,
XRootDStatus  status 
)

On error.

Definition at line 1052 of file XrdClStream.cc.

1053  {
1054  auto channel = GetChannel();
1055  bool closing;
1056  StreamMutexHelper scopedLock( pMutex, subStream, closing );
1057  if( closing ) return;
1058  Log *log = DefaultEnv::GetLog();
1059  SockHandlerClose( subStream );
1060 
1061  log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
1062  pStreamName.c_str(), subStream, status.ToString().c_str() );
1063 
1064  //--------------------------------------------------------------------------
1065  // Reinsert the stuff that we have failed to sent
1066  //--------------------------------------------------------------------------
1067  Reinsert( subStream );
1068 
1069  //--------------------------------------------------------------------------
1070  // We are dealing with an error of a peripheral stream. If we don't have
1071  // anything to send don't bother recovering. Otherwise move the requests
1072  // to stream 0 if possible.
1073  //--------------------------------------------------------------------------
1074  if( subStream > 0 )
1075  {
1076  if( pSubStreams[subStream]->outQueue->IsEmpty() )
1077  return;
1078 
1079  if( pSubStreams[0]->status != Socket::Disconnected )
1080  {
1081  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
1082  if( pSubStreams[0]->status == Socket::Connected )
1083  {
1084  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
1085  if( !st.IsOK() )
1086  OnFatalError( 0, st, scopedLock );
1087  return;
1088  }
1089  }
1090  OnFatalError( subStream, status, scopedLock );
1091  return;
1092  }
1093 
1094  //--------------------------------------------------------------------------
1095  // If we lost the stream 0 we have lost the session, we re-enable the
1096  // stream if we still have things in one of the outgoing queues, otherwise
1097  // there is not point to recover at this point.
1098  //--------------------------------------------------------------------------
1099  if( subStream == 0 )
1100  {
1101  MonitorDisconnection( status );
1102  pSessionId = 0;
1103 
1104  SubStreamList::iterator it;
1105  size_t outstanding = 0;
1106  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1107  outstanding += (*it)->outQueue->GetSizeStateless();
1108 
1109  if( outstanding )
1110  {
1111  PathID path( 0, 0 );
1112  XRootDStatus st = EnableLink( path );
1113  if( !st.IsOK() )
1114  {
1115  OnFatalError( 0, st, scopedLock );
1116  return;
1117  }
1118  }
1119 
1120  //------------------------------------------------------------------------
1121  // We're done here, unlock the stream mutex to avoid deadlocks and
1122  // report the disconnection event to the handlers
1123  //------------------------------------------------------------------------
1124  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1125  "message handlers.", pStreamName.c_str() );
1126  OutQueue q;
1127  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1128  q.GrabStateful( *(*it)->outQueue );
1129  scopedLock.UnLock();
1130 
1131  q.Report( status );
1132  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1133  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1134  return;
1135  }
1136  }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), GetChannel(), XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::Status::IsOK(), XrdCl::PostMasterMsg, XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdCl::StreamMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnFault(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t  subStream,
std::shared_ptr< Message msg,
uint32_t  bytesReceived 
)

Call back when a message has been reconstructed.

Definition at line 678 of file XrdClStream.cc.

681  {
682  msg->SetSessionId( pSessionId );
683  pBytesReceived += bytesReceived;
684 
685  MsgHandler *handler = nullptr;
686  uint16_t action = 0;
687  {
688  InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
689  handler = mh.handler;
690  action = mh.action;
691  mh.Reset();
692  }
693 
694  if( !IsPartial( *msg ) )
695  {
696  uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
697  *pChannelData );
698  if( streamAction & TransportHandler::DigestMsg )
699  return;
700 
701  if( streamAction & TransportHandler::RequestClose )
702  {
703  RequestClose( *msg );
704  return;
705  }
706  }
707 
708  Log *log = DefaultEnv::GetLog();
709 
710  //--------------------------------------------------------------------------
711  // No handler, we discard the message ...
712  //--------------------------------------------------------------------------
713  if( !handler )
714  {
715  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
716  log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
717  "(status=%d, SID=[%d,%d]), no MsgHandler found.",
718  pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
719  rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
720  return;
721  }
722 
723  //--------------------------------------------------------------------------
724  // We have a handler, so we call the callback
725  //--------------------------------------------------------------------------
726  log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
727  pStreamName.c_str(), (void*)msg.get() );
728 
730  {
731  log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
732  pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
733 
734  // if we are handling partial response we have to take down the timeout fence
735  if( IsPartial( *msg ) )
736  {
737  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
738  if( xrdHandler ) xrdHandler->PartialReceived();
739  }
740 
741  return;
742  }
743 
744  Job *job = new HandleIncMsgJob( handler );
745  pJobManager->QueueJob( job );
746  }
kXR_char streamid[2]
Definition: XProtocol.hh:956
ServerResponseHeader hdr
Definition: XProtocol.hh:1330
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::TransportHandler::MessageReceived(), XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t  subStream,
Message msg,
uint32_t  bytesSent 
)

Definition at line 812 of file XrdClStream.cc.

815  {
816  pTransport->MessageSent( msg, subStream, bytesSent,
817  *pChannelData );
818  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
819  pBytesSent += bytesSent;
820  if( h.handler )
821  {
822  // ensure expiration time is assigned if still in queue
823  pIncomingQueue->AssignTimeout( h.handler );
824  // OnStatusReady may cause the handler to delete itself, in
825  // which case the handler or the user callback may also delete msg
826  h.handler->OnStatusReady( msg, XRootDStatus() );
827  }
828  pSubStreams[subStream]->outMsgHelper.Reset();
829  }
void AssignTimeout(MsgHandler *handler)
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.

References XrdCl::InQueue::AssignTimeout(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::TransportHandler::MessageSent(), and XrdCl::MsgHandler::OnStatusReady().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t  subStream)

On read timeout.

Definition at line 1248 of file XrdClStream.cc.

1249  {
1250  //--------------------------------------------------------------------------
1251  // We only take the main stream into account
1252  //--------------------------------------------------------------------------
1253  if( substream != 0 )
1254  return true;
1255 
1256  //--------------------------------------------------------------------------
1257  // Check if there is no outgoing messages and if the stream TTL is elapesed.
1258  // It is assumed that the underlying transport makes sure that there is no
1259  // pending requests that are not answered, ie. all possible virtual streams
1260  // are de-allocated
1261  //--------------------------------------------------------------------------
1262  Log *log = DefaultEnv::GetLog();
1263  SubStreamList::iterator it;
1264  time_t now = time(0);
1265 
1266  bool closing;
1267  StreamMutexHelper scopedLock( pMutex, substream, closing );
1268  if( closing ) return false;
1269  uint32_t outgoingMessages = 0;
1270  time_t lastActivity = 0;
1271  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1272  {
1273  outgoingMessages += (*it)->outQueue->GetSize();
1274  time_t sockLastActivity = (*it)->socket->GetLastActivity();
1275  if( lastActivity < sockLastActivity )
1276  lastActivity = sockLastActivity;
1277  }
1278 
1279  if( !outgoingMessages )
1280  {
1281  bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1282  *pChannelData );
1283  if( disconnect )
1284  {
1285  log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1286  pStreamName.c_str() );
1287  const uint64_t sess = pSessionId;
1288  scopedLock.UnLock();
1289  //----------------------------------------------------------------------
1290  // Important note!
1291  //
1292  // This destroys the Stream object itself, the underlined
1293  // AsyncSocketHandler object (that called this method) and the Channel
1294  // object that aggregates this Stream.
1295  //----------------------------------------------------------------------
1297  return false;
1298  }
1299  }
1300 
1301  //--------------------------------------------------------------------------
1302  // Check if the stream is broken
1303  //--------------------------------------------------------------------------
1304  XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1305  *pChannelData );
1306  if( !st.IsOK() )
1307  {
1308  scopedLock.UnLock();
1309  OnError( substream, st );
1310  return false;
1311  }
1312  return true;
1313  }
Status ForceDisconnect(const URL &url)
Shut down a channel.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0

References XrdCl::Log::Debug(), XrdCl::PostMaster::ForceDisconnect(), GetChannel(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::IsStreamBroken(), XrdCl::TransportHandler::IsStreamTTLElapsed(), OnError(), XrdCl::PostMasterMsg, and XrdCl::StreamMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t  subStream)

Definition at line 752 of file XrdClStream.cc.

753  {
754  bool closing;
755  StreamMutexHelper scopedLock( pMutex, subStream, closing );
756  if( closing ) return std::make_pair( (Message *)0, (MsgHandler *)0 );
757  Log *log = DefaultEnv::GetLog();
758  if( pSubStreams[subStream]->outQueue->IsEmpty() )
759  {
760  log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
761  pSubStreams[subStream]->socket->GetStreamName().c_str() );
762 
763  pSubStreams[subStream]->socket->DisableUplink();
764  return std::make_pair( (Message *)0, (MsgHandler *)0 );
765  }
766 
767  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
768  h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
769  h.expires,
770  h.stateful );
771 
772  log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
773  "from out-queue to in-queue, starting to send outgoing.",
774  pUrl->GetHostId().c_str(), (void*)h.handler,
775  h.msg->GetObfuscatedDescription().c_str() );
776 
777  scopedLock.UnLock();
778 
779  if( h.handler )
780  {
781  bool rmMsg = false;
782  pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
783  if( rmMsg )
784  {
785  Log *log = DefaultEnv::GetLog();
786  log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
787  pStreamName.c_str() );
788  }
789  h.handler->OnReadyToSend( h.msg );
790  }
791  return std::make_pair( h.msg, h.handler );
792  }
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
Definition: XrdClInQueue.cc:54

References XrdCl::InQueue::AddMessageHandler(), XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, XrdCl::StreamMutexHelper::UnLock(), and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t  subStream)

On write timeout.

Definition at line 1318 of file XrdClStream.cc.

1319  {
1320  return true;
1321  }

Referenced by XrdCl::AsyncSocketHandler::OnWriteTimeout().

+ Here is the caller graph for this function:

◆ Query()

Status XrdCl::Stream::Query ( uint16_t  query,
AnyObject result 
)

Query the stream.

Definition at line 1440 of file XrdClStream.cc.

1441  {
1442  switch( query )
1443  {
1444  case StreamQuery::IpAddr:
1445  {
1446  result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1447  return Status();
1448  }
1449 
1450  case StreamQuery::IpStack:
1451  {
1452  result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1453  return Status();
1454  }
1455 
1456  case StreamQuery::HostName:
1457  {
1458  result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1459  return Status();
1460  }
1461 
1462  default:
1463  return Status( stError, errQueryNotSupported );
1464  }
1465  }
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

+ Here is the call graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler handler)

Register channel event handler.

Definition at line 1326 of file XrdClStream.cc.

1327  {
1328  pChannelEvHandlers.AddHandler( handler );
1329  }
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.

References XrdCl::ChannelHandlerList::AddHandler().

+ Here is the call graph for this function:

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler handler)

Remove a channel event handler.

Definition at line 1334 of file XrdClStream.cc.

1335  {
1336  pChannelEvHandlers.RemoveHandler( handler );
1337  }
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.

References XrdCl::ChannelHandlerList::RemoveHandler().

+ Here is the call graph for this function:

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message msg,
MsgHandler handler,
bool  stateful,
time_t  expires 
)

Queue the message for sending.

Definition at line 501 of file XrdClStream.cc.

505  {
506  StreamMutexHelper scopedLock( pMutex );
507  Log *log = DefaultEnv::GetLog();
508 
509  //--------------------------------------------------------------------------
510  // Check the session ID and bounce if needed
511  //--------------------------------------------------------------------------
512  if( msg->GetSessionId() &&
513  (pSubStreams[0]->status != Socket::Connected ||
514  pSessionId != msg->GetSessionId()) )
515  return XRootDStatus( stError, errInvalidSession );
516 
517  //--------------------------------------------------------------------------
518  // Decide on the path to send the message
519  //--------------------------------------------------------------------------
520  PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
521  if( pSubStreams.size() <= path.up )
522  {
523  log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
524  "substream %d, using 0 instead", pStreamName.c_str(),
525  msg->GetObfuscatedDescription().c_str(), path.up );
526  path.up = 0;
527  }
528 
529  log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
530  "substream %d expecting answer at %d", pStreamName.c_str(),
531  msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
532 
533  //--------------------------------------------------------------------------
534  // Enable *a* path and insert the message to the right queue
535  //--------------------------------------------------------------------------
536  XRootDStatus st = EnableLink( path );
537  if( st.IsOK() )
538  {
539  pTransport->MultiplexSubStream( msg, *pChannelData, &path );
540  handler->OnWaitingToSend( msg );
541  pSubStreams[path.up]->outQueue->PushBack( msg, handler,
542  expires, stateful );
543  }
544  else
545  st.status = stFatal;
546  return st;
547  }
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::MultiplexSubStream(), XrdCl::MsgHandler::OnWaitingToSend(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ SetChannel()

void XrdCl::Stream::SetChannel ( std::weak_ptr< Channel > &  channel)
inline

Sets a weak_ptr of our owning Channel.

Definition at line 233 of file XrdClStream.hh.

234  {
235  pChannel = channel;
236  }

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject channelData)
inline

Set the channel data.

Definition at line 241 of file XrdClStream.hh.

242  {
243  pChannelData = channelData;
244  }

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue incomingQueue)
inline

Set the incoming queue.

Definition at line 225 of file XrdClStream.hh.

226  {
227  pIncomingQueue = incomingQueue;
228  }

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager jobManager)
inline

Set job manager.

Definition at line 257 of file XrdClStream.hh.

258  {
259  pJobManager = jobManager;
260  }

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > &  onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 391 of file XrdClStream.hh.

392  {
393  StreamMutexHelper scopedLock( pMutex );
394  pOnDataConnJob = onConnJob;
395  }

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller poller)
inline

Set the poller.

Definition at line 217 of file XrdClStream.hh.

218  {
219  pPoller = poller;
220  }

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager taskManager)
inline

Set task manager.

Definition at line 249 of file XrdClStream.hh.

250  {
251  pTaskManager = taskManager;
252  }

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler transport)
inline

Set the transport.

Definition at line 209 of file XrdClStream.hh.

210  {
211  pTransport = transport;
212  }

◆ Tick()

void XrdCl::Stream::Tick ( time_t  now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 584 of file XrdClStream.cc.

585  {
586  //--------------------------------------------------------------------------
587  // Check for timed-out requests and incoming handlers
588  //--------------------------------------------------------------------------
589  StreamMutexHelper scopedLock( pMutex );
590  OutQueue q;
591  SubStreamList::iterator it;
592  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
593  q.GrabExpired( *(*it)->outQueue, now );
594  scopedLock.UnLock();
595 
596  q.Report( XRootDStatus( stError, errOperationExpired ) );
597  pIncomingQueue->ReportTimeout( now );
598  }
void ReportTimeout(time_t now=0)
Timeout handlers.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdCl::OutQueue::Report(), XrdCl::InQueue::ReportTimeout(), XrdCl::stError, and XrdCl::StreamMutexHelper::UnLock().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: