XRootD
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (const URL *url, const URL &prefer=URL())
 Constructor. More...
 
 ~Stream ()
 Destructor. More...
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty. More...
 
void Disconnect (bool force=false)
 Disconnect the stream. More...
 
XRootDStatus EnableLink (PathID &path)
 
void ForceConnect ()
 Force connection. More...
 
void ForceError (XRootDStatus status, bool hush=false)
 Force error. More...
 
const std::string & GetName () const
 Return stream name. More...
 
const URLGetURL () const
 Get the URL. More...
 
XRootDStatus Initialize ()
 Initializer. More...
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed. More...
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error. More...
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error. More...
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed. More...
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout. More...
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout. More...
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream. More...
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler. More...
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler. More...
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending. More...
 
void SetChannelData (AnyObject *channelData)
 Set the channel data. More...
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue. More...
 
void SetJobManager (JobManager *jobManager)
 Set job manager. More...
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams. More...
 
void SetPoller (Poller *poller)
 Set the poller. More...
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager. More...
 
void SetTransport (TransportHandler *transport)
 Set the transport. More...
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 51 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 57 of file XrdClStream.hh.

58  {
59  Disconnected = 0,
60  Connected = 1,
61  Connecting = 2,
62  Error = 3
63  };
@ Disconnected
Not connected.
Definition: XrdClStream.hh:59
@ Error
Broken.
Definition: XrdClStream.hh:62
@ Connected
Connected.
Definition: XrdClStream.hh:60
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:61

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( const URL url,
const URL prefer = URL() 
)

Constructor.

Definition at line 96 of file XrdClStream.cc.

96  :
97  pUrl( url ),
98  pPrefer( prefer ),
99  pTransport( 0 ),
100  pPoller( 0 ),
101  pTaskManager( 0 ),
102  pJobManager( 0 ),
103  pIncomingQueue( 0 ),
104  pChannelData( 0 ),
105  pLastStreamError( 0 ),
106  pConnectionCount( 0 ),
107  pConnectionInitTime( 0 ),
108  pAddressType( Utils::IPAll ),
109  pSessionId( 0 ),
110  pBytesSent( 0 ),
111  pBytesReceived( 0 )
112  {
113  pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114  pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115 
116  std::ostringstream o;
117  o << pUrl->GetHostId();
118  pStreamName = o.str();
119 
120  pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122  pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124  pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126 
127  std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129 
130  pAddressType = Utils::String2AddressType( netStack );
131  if( pAddressType == Utils::AddressType::IPAuto )
132  {
133  XrdNetUtils::NetProt stacks = XrdNetUtils::NetConfig( XrdNetUtils::NetType::qryINIF );
134  if( !( stacks & XrdNetUtils::hasIP64 ) )
135  {
136  if( stacks & XrdNetUtils::hasIPv4 )
137  pAddressType = Utils::AddressType::IPv4;
138  else if( stacks & XrdNetUtils::hasIPv6 )
139  pAddressType = Utils::AddressType::IPv6;
140  }
141  }
142 
143  Log *log = DefaultEnv::GetLog();
144  log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145  "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146  "Window: %d", pStreamName.c_str(), netStack.c_str(),
147  pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148  }
static Log * GetLog()
Get default log.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
Definition: XrdClUtils.cc:123
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:104
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:681
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition: XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::URL::GetHostId(), XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 153 of file XrdClStream.cc.

154  {
155  Disconnect( true );
156 
157  Log *log = DefaultEnv::GetLog();
158  log->Debug( PostMasterMsg, "[%s] Destroying stream",
159  pStreamName.c_str() );
160 
161  MonitorDisconnection( XRootDStatus() );
162 
163  SubStreamList::iterator it;
164  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165  delete *it;
166  }
void Disconnect(bool force=false)
Disconnect the stream.
Definition: XrdClStream.cc:363

References XrdCl::Log::Debug(), Disconnect(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1197 of file XrdClStream.cc.

1198  {
1199  Log *log = DefaultEnv::GetLog();
1200 
1201  //--------------------------------------------------------------------------
1202  // Resolve all the addresses of the host we're supposed to connect to
1203  //--------------------------------------------------------------------------
1204  std::vector<XrdNetAddr> prefaddrs;
1205  XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1206  if( !st.IsOK() )
1207  {
1208  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1209  , pStreamName.c_str(), url.GetHostName().c_str() );
1210  return false;
1211  }
1212 
1213  //--------------------------------------------------------------------------
1214  // Resolve all the addresses of the alias
1215  //--------------------------------------------------------------------------
1216  std::vector<XrdNetAddr> aliasaddrs;
1217  st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1218  if( !st.IsOK() )
1219  {
1220  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1221  , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1222  return false;
1223  }
1224 
1225  //--------------------------------------------------------------------------
1226  // Now check if the preferred host is part of the alias
1227  //--------------------------------------------------------------------------
1228  auto itr = prefaddrs.begin();
1229  for( ; itr != prefaddrs.end() ; ++itr )
1230  {
1231  auto itr2 = aliasaddrs.begin();
1232  for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1233  if( itr->Same( &*itr2 ) ) return true;
1234  }
1235 
1236  return false;
1237  }
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
Definition: XrdClUtils.cc:140

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::Channel::CanCollapse().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t  subStream)

Disables respective uplink if empty.

Definition at line 585 of file XrdClStream.cc.

586  {
587  XrdSysMutexHelper scopedLock( pMutex );
588  Log *log = DefaultEnv::GetLog();
589 
590  if( pSubStreams[subStream]->outQueue->IsEmpty() )
591  {
592  log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
593  pSubStreams[subStream]->socket->GetStreamName().c_str() );
594  pSubStreams[subStream]->socket->DisableUplink();
595  }
596  }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::AsyncSocketHandler::OnWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Disconnect()

void XrdCl::Stream::Disconnect ( bool  force = false)

Disconnect the stream.

Definition at line 363 of file XrdClStream.cc.

364  {
365  XrdSysMutexHelper scopedLock( pMutex );
366  SubStreamList::iterator it;
367  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368  {
369  (*it)->socket->Close();
370  (*it)->status = Socket::Disconnected;
371  }
372  }
@ Disconnected
The socket is disconnected.
Definition: XrdClSocket.hh:50

References XrdCl::Socket::Disconnected.

Referenced by ~Stream().

+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 187 of file XrdClStream.cc.

188  {
189  XrdSysMutexHelper scopedLock( pMutex );
190 
191  //--------------------------------------------------------------------------
192  // We are in the process of connecting the main stream, so we do nothing
193  // because when the main stream connection is established it will connect
194  // all the other streams
195  //--------------------------------------------------------------------------
196  if( pSubStreams[0]->status == Socket::Connecting )
197  return XRootDStatus();
198 
199  //--------------------------------------------------------------------------
200  // The main stream is connected, so we can verify whether we have
201  // the up and the down stream connected and ready to handle data.
202  // If anything is not right we fall back to stream 0.
203  //--------------------------------------------------------------------------
204  if( pSubStreams[0]->status == Socket::Connected )
205  {
206  if( pSubStreams[path.down]->status != Socket::Connected )
207  path.down = 0;
208 
209  if( pSubStreams[path.up]->status == Socket::Disconnected )
210  {
211  path.up = 0;
212  return pSubStreams[0]->socket->EnableUplink();
213  }
214 
215  if( pSubStreams[path.up]->status == Socket::Connected )
216  return pSubStreams[path.up]->socket->EnableUplink();
217 
218  return XRootDStatus();
219  }
220 
221  //--------------------------------------------------------------------------
222  // The main stream is not connected, we need to check whether enough time
223  // has passed since we last encountered an error (if any) so that we could
224  // re-attempt the connection
225  //--------------------------------------------------------------------------
226  Log *log = DefaultEnv::GetLog();
227  time_t now = ::time(0);
228 
229  if( now-pLastStreamError < pStreamErrorWindow )
230  return pLastFatalError;
231 
232  gettimeofday( &pConnectionStarted, 0 );
233  ++pConnectionCount;
234 
235  //--------------------------------------------------------------------------
236  // Resolve all the addresses of the host we're supposed to connect to
237  //--------------------------------------------------------------------------
238  XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239  if( !st.IsOK() )
240  {
241  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242  "the host", pStreamName.c_str() );
243  pLastStreamError = now;
244  st.status = stFatal;
245  pLastFatalError = st;
246  return st;
247  }
248 
249  if( pPrefer.IsValid() )
250  {
251  std::vector<XrdNetAddr> addrresses;
252  XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253  if( !st.IsOK() )
254  {
255  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256  pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257  }
258  else
259  {
260  std::vector<XrdNetAddr> tmp;
261  tmp.reserve( pAddresses.size() );
262  // first add all remaining addresses
263  auto itr = pAddresses.begin();
264  for( ; itr != pAddresses.end() ; ++itr )
265  {
266  if( !HasNetAddr( *itr, addrresses ) )
267  tmp.push_back( *itr );
268  }
269  // then copy all 'preferred' addresses
270  std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271  // and keep the result
272  pAddresses.swap( tmp );
273  }
274  }
275 
277  pAddresses );
278 
279  while( !pAddresses.empty() )
280  {
281  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282  pAddresses.pop_back();
283  pConnectionInitTime = ::time( 0 );
284  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285  if( st.IsOK() )
286  {
287  pSubStreams[0]->status = Socket::Connecting;
288  break;
289  }
290  }
291  return st;
292  }
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
Definition: XrdClUtils.cc:234
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostId(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 347 of file XrdClStream.cc.

348  {
349  XrdSysMutexHelper scopedLock( pMutex );
350  if( pSubStreams[0]->status == Socket::Connecting )
351  {
352  pSubStreams[0]->status = Socket::Disconnected;
353  XrdCl::PathID path( 0, 0 );
354  XrdCl::XRootDStatus st = EnableLink( path );
355  if( !st.IsOK() )
356  OnConnectError( 0, st );
357  }
358  }
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:187
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:728
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

Referenced by XrdCl::Channel::ForceReconnect().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus  status,
bool  hush = false 
)

Force error.

Definition at line 933 of file XrdClStream.cc.

934  {
935  XrdSysMutexHelper scopedLock( pMutex );
936  Log *log = DefaultEnv::GetLog();
937  for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
938  {
939  if( pSubStreams[substream]->status != Socket::Connected ) continue;
940  pSubStreams[substream]->socket->Close();
941  pSubStreams[substream]->status = Socket::Disconnected;
942 
943  if( !hush )
944  log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
945  pStreamName.c_str(), status.ToString().c_str() );
946 
947  //--------------------------------------------------------------------
948  // Reinsert the stuff that we have failed to sent
949  //--------------------------------------------------------------------
950  if( pSubStreams[substream]->outMsgHelper.msg )
951  {
952  OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
953  pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
954  h.stateful );
955  pIncomingQueue->RemoveMessageHandler(h.handler);
956  pSubStreams[substream]->outMsgHelper.Reset();
957  }
958 
959  //--------------------------------------------------------------------
960  // Reinsert the receiving handler and reset any partially read partial
961  //--------------------------------------------------------------------
962  if( pSubStreams[substream]->inMsgHelper.handler )
963  {
964  InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
965  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
966  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
967  if( xrdHandler ) xrdHandler->PartialReceived();
968  h.Reset();
969  }
970  }
971 
972  pConnectionCount = 0;
973 
974  //------------------------------------------------------------------------
975  // We're done here, unlock the stream mutex to avoid deadlocks and
976  // report the disconnection event to the handlers
977  //------------------------------------------------------------------------
978  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
979  "message handlers.", pStreamName.c_str() );
980 
981  SubStreamList::iterator it;
982  OutQueue q;
983  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
984  q.GrabItems( *(*it)->outQueue );
985  scopedLock.UnLock();
986 
987  q.Report( status );
988 
989  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
990  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
991  }
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
@ Broken
The stream is broken.

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::InQueue::RemoveMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::Channel::ForceDisconnect(), and XrdCl::AsyncSocketHandler::OnHeaderCorruption().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetName()

const std::string& XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 170 of file XrdClStream.hh.

171  {
172  return pStreamName;
173  }

◆ GetURL()

const URL* XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 157 of file XrdClStream.hh.

158  {
159  return pUrl;
160  }

Referenced by XrdCl::AsyncSocketHandler::OnConnectionReturn().

+ Here is the caller graph for this function:

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 171 of file XrdClStream.cc.

172  {
173  if( !pTransport || !pPoller || !pChannelData )
174  return XRootDStatus( stError, errUninitialized );
175 
176  AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177  pChannelData, 0, this );
178  pSubStreams.push_back( new SubStreamData() );
179  pSubStreams[0]->socket = s;
180  return XRootDStatus();
181  }
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32

References XrdCl::errUninitialized, and XrdCl::stError.

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t  stream,
MsgHandler *&  incHandler 
)

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1166 of file XrdClStream.cc.

1168  {
1169  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1170  if( !mh.handler )
1172 
1173  uint16_t action = mh.handler->InspectStatusRsp();
1174  mh.action |= action;
1175 
1176  if( action & MsgHandler::RemoveHandler )
1177  pIncomingQueue->RemoveMessageHandler( mh.handler );
1178 
1179  if( action & MsgHandler::Raw )
1180  {
1181  incHandler = mh.handler;
1182  return MsgHandler::Raw;
1183  }
1184 
1185  if( action & MsgHandler::Corrupted )
1186  return MsgHandler::Corrupted;
1187 
1188  if( action & MsgHandler::More )
1189  return MsgHandler::More;
1190 
1191  return MsgHandler::None;
1192  }
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, and XrdCl::InQueue::RemoveMessageHandler().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > &  msg,
uint16_t  stream 
)

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1145 of file XrdClStream.cc.

1146  {
1147  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1148  if( !mh.handler )
1149  mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1150  mh.expires,
1151  mh.action );
1152 
1153  if( !mh.handler )
1154  return nullptr;
1155 
1156  if( mh.action & MsgHandler::Raw )
1157  return mh.handler;
1158  return nullptr;
1159  }
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InQueue::GetHandlerForMessage(), XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t  subStream)

Call back when a message has been reconstructed.

Definition at line 623 of file XrdClStream.cc.

624  {
625  XrdSysMutexHelper scopedLock( pMutex );
626  pSubStreams[subStream]->status = Socket::Connected;
627 
628  std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
629  Log *log = DefaultEnv::GetLog();
630  log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
631  subStream, ipstack.c_str() );
632 
633  if( subStream == 0 )
634  {
635  pLastStreamError = 0;
636  pLastFatalError = XRootDStatus();
637  pConnectionCount = 0;
638  uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
639  pSessionId = ++sSessCntGen;
640 
641  //------------------------------------------------------------------------
642  // Create the streams if they don't exist yet
643  //------------------------------------------------------------------------
644  if( pSubStreams.size() == 1 && numSub > 1 )
645  {
646  for( uint16_t i = 1; i < numSub; ++i )
647  {
648  URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
649  AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
650  pChannelData, i, this );
651  pSubStreams.push_back( new SubStreamData() );
652  pSubStreams[i]->socket = s;
653  }
654  }
655 
656  //------------------------------------------------------------------------
657  // Connect the extra streams, if we fail we move all the outgoing items
658  // to stream 0, we don't need to enable the uplink here, because it
659  // should be already enabled after the handshaking process is completed.
660  //------------------------------------------------------------------------
661  if( pSubStreams.size() > 1 )
662  {
663  log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
664  pStreamName.c_str(), pSubStreams.size() - 1 );
665  for( size_t i = 1; i < pSubStreams.size(); ++i )
666  {
667  pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
668  XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
669  if( !st.IsOK() )
670  {
671  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
672  pSubStreams[i]->socket->Close();
673  }
674  else
675  {
676  pSubStreams[i]->status = Socket::Connecting;
677  }
678  }
679  }
680 
681  //------------------------------------------------------------------------
682  // Inform monitoring
683  //------------------------------------------------------------------------
684  pBytesSent = 0;
685  pBytesReceived = 0;
686  gettimeofday( &pConnectionDone, 0 );
687  Monitor *mon = DefaultEnv::GetMonitor();
688  if( mon )
689  {
690  Monitor::ConnectInfo i;
691  i.server = pUrl->GetHostId();
692  i.sTOD = pConnectionStarted;
693  i.eTOD = pConnectionDone;
694  i.streams = pSubStreams.size();
695 
696  AnyObject qryResult;
697  std::string *qryResponse = nullptr;
698  pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
699  qryResult.Get( qryResponse );
700 
701  if (qryResponse) {
702  i.auth = *qryResponse;
703  delete qryResponse;
704  } else {
705  i.auth = "";
706  }
707 
708  mon->Event( Monitor::EvConnect, &i );
709  }
710 
711  //------------------------------------------------------------------------
712  // For every connected control-stream call the global on-connect handler
713  //------------------------------------------------------------------------
715  }
716  else if( pOnDataConnJob )
717  {
718  //------------------------------------------------------------------------
719  // For every connected data-stream call the on-connect handler
720  //------------------------------------------------------------------------
721  pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
722  }
723  }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::Monitor::ConnectInfo::auth, XrdCl::TransportQuery::Auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::TransportHandler::GetBindPreference(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::Query(), XrdCl::JobManager::QueueJob(), XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, and XrdCl::TransportHandler::SubStreamNumber().

Referenced by XrdCl::AsyncSocketHandler::HandShakeNextStep().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t  subStream,
XRootDStatus  status 
)

On connect error.

Definition at line 728 of file XrdClStream.cc.

729  {
730  XrdSysMutexHelper scopedLock( pMutex );
731  Log *log = DefaultEnv::GetLog();
732  pSubStreams[subStream]->socket->Close();
733  time_t now = ::time(0);
734 
735  //--------------------------------------------------------------------------
736  // For every connection error call the global connection error handler
737  //--------------------------------------------------------------------------
739 
740  //--------------------------------------------------------------------------
741  // If we connected subStream == 0 and cannot connect >0 then we just give
742  // up and move the outgoing messages to another queue
743  //--------------------------------------------------------------------------
744  if( subStream > 0 )
745  {
746  pSubStreams[subStream]->status = Socket::Disconnected;
747  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
748  if( pSubStreams[0]->status == Socket::Connected )
749  {
750  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
751  if( !st.IsOK() )
752  OnFatalError( 0, st, scopedLock );
753  return;
754  }
755 
756  if( pSubStreams[0]->status == Socket::Connecting )
757  return;
758 
759  OnFatalError( subStream, status, scopedLock );
760  return;
761  }
762 
763  //--------------------------------------------------------------------------
764  // Check if we still have time to try and do something in the current window
765  //--------------------------------------------------------------------------
766  time_t elapsed = now-pConnectionInitTime;
767  log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
768  pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
769 
770  //------------------------------------------------------------------------
771  // If we have some IP addresses left we try them
772  //------------------------------------------------------------------------
773  if( !pAddresses.empty() )
774  {
775  XRootDStatus st;
776  do
777  {
778  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
779  pAddresses.pop_back();
780  pConnectionInitTime = ::time( 0 );
781  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
782  }
783  while( !pAddresses.empty() && !st.IsOK() );
784 
785  if( !st.IsOK() )
786  OnFatalError( subStream, st, scopedLock );
787 
788  return;
789  }
790  //------------------------------------------------------------------------
791  // If we still can retry with the same host name, we sleep until the end
792  // of the connection window and try
793  //------------------------------------------------------------------------
794  else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
795  && !status.IsFatal() )
796  {
797  log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
798  pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
799 
800  Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
801  pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
802  return;
803  }
804  //--------------------------------------------------------------------------
805  // We are out of the connection window, the only thing we can do here
806  // is re-resolving the host name and retrying if we still can
807  //--------------------------------------------------------------------------
808  else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
809  {
810  pAddresses.clear();
811  pSubStreams[0]->status = Socket::Disconnected;
812  PathID path( 0, 0 );
813  XRootDStatus st = EnableLink( path );
814  if( !st.IsOK() )
815  OnFatalError( subStream, st, scopedLock );
816  return;
817  }
818 
819  //--------------------------------------------------------------------------
820  // Else, we fail
821  //--------------------------------------------------------------------------
822  OnFatalError( subStream, status, scopedLock );
823  }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void RegisterTask(Task *task, time_t time, bool own=true)

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), XrdCl::PostMasterMsg, and XrdCl::TaskManager::RegisterTask().

Referenced by ForceConnect(), XrdCl::AsyncSocketHandler::OnConnectionReturn(), and XrdCl::AsyncSocketHandler::OnFaultWhileHandshaking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t  subStream,
XRootDStatus  status 
)

On error.

Definition at line 828 of file XrdClStream.cc.

829  {
830  XrdSysMutexHelper scopedLock( pMutex );
831  Log *log = DefaultEnv::GetLog();
832  pSubStreams[subStream]->socket->Close();
833  pSubStreams[subStream]->status = Socket::Disconnected;
834 
835  log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
836  pStreamName.c_str(), subStream, status.ToString().c_str() );
837 
838  //--------------------------------------------------------------------------
839  // Reinsert the stuff that we have failed to sent
840  //--------------------------------------------------------------------------
841  if( pSubStreams[subStream]->outMsgHelper.msg )
842  {
843  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
844  pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
845  h.stateful );
846  pIncomingQueue->RemoveMessageHandler(h.handler);
847  pSubStreams[subStream]->outMsgHelper.Reset();
848  }
849 
850  //--------------------------------------------------------------------------
851  // Reinsert the receiving handler and reset any partially read partial
852  //--------------------------------------------------------------------------
853  if( pSubStreams[subStream]->inMsgHelper.handler )
854  {
855  InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
856  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
857  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
858  if( xrdHandler ) xrdHandler->PartialReceived();
859  h.Reset();
860  }
861 
862  //--------------------------------------------------------------------------
863  // We are dealing with an error of a peripheral stream. If we don't have
864  // anything to send don't bother recovering. Otherwise move the requests
865  // to stream 0 if possible.
866  //--------------------------------------------------------------------------
867  if( subStream > 0 )
868  {
869  if( pSubStreams[subStream]->outQueue->IsEmpty() )
870  return;
871 
872  if( pSubStreams[0]->status != Socket::Disconnected )
873  {
874  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
875  if( pSubStreams[0]->status == Socket::Connected )
876  {
877  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
878  if( !st.IsOK() )
879  OnFatalError( 0, st, scopedLock );
880  return;
881  }
882  }
883  OnFatalError( subStream, status, scopedLock );
884  return;
885  }
886 
887  //--------------------------------------------------------------------------
888  // If we lost the stream 0 we have lost the session, we re-enable the
889  // stream if we still have things in one of the outgoing queues, otherwise
890  // there is not point to recover at this point.
891  //--------------------------------------------------------------------------
892  if( subStream == 0 )
893  {
894  MonitorDisconnection( status );
895 
896  SubStreamList::iterator it;
897  size_t outstanding = 0;
898  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
899  outstanding += (*it)->outQueue->GetSizeStateless();
900 
901  if( outstanding )
902  {
903  PathID path( 0, 0 );
904  XRootDStatus st = EnableLink( path );
905  if( !st.IsOK() )
906  {
907  OnFatalError( 0, st, scopedLock );
908  return;
909  }
910  }
911 
912  //------------------------------------------------------------------------
913  // We're done here, unlock the stream mutex to avoid deadlocks and
914  // report the disconnection event to the handlers
915  //------------------------------------------------------------------------
916  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
917  "message handlers.", pStreamName.c_str() );
918  OutQueue q;
919  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
920  q.GrabStateful( *(*it)->outQueue );
921  scopedLock.UnLock();
922 
923  q.Report( status );
924  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
925  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
926  return;
927  }
928  }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::Status::IsOK(), XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::InQueue::RemoveMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnFault(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t  subStream,
std::shared_ptr< Message msg,
uint32_t  bytesReceived 
)

Call back when a message has been reconstructed.

Definition at line 471 of file XrdClStream.cc.

474  {
475  msg->SetSessionId( pSessionId );
476  pBytesReceived += bytesReceived;
477 
478  MsgHandler *handler = nullptr;
479  uint16_t action = 0;
480  {
481  InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482  handler = mh.handler;
483  action = mh.action;
484  mh.Reset();
485  }
486 
487  if( !IsPartial( *msg ) )
488  {
489  uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490  *pChannelData );
491  if( streamAction & TransportHandler::DigestMsg )
492  return;
493 
494  if( streamAction & TransportHandler::RequestClose )
495  {
496  RequestClose( *msg );
497  return;
498  }
499  }
500 
501  Log *log = DefaultEnv::GetLog();
502 
503  //--------------------------------------------------------------------------
504  // No handler, we discard the message ...
505  //--------------------------------------------------------------------------
506  if( !handler )
507  {
508  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509  log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
510  "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511  pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
512  rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513  return;
514  }
515 
516  //--------------------------------------------------------------------------
517  // We have a handler, so we call the callback
518  //--------------------------------------------------------------------------
519  log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
520  pStreamName.c_str(), (void*)msg.get() );
521 
523  {
524  log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
525  pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
526 
527  // if we are handling partial response we have to take down the timeout fence
528  if( IsPartial( *msg ) )
529  {
530  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531  if( xrdHandler ) xrdHandler->PartialReceived();
532  }
533 
534  return;
535  }
536 
537  Job *job = new HandleIncMsgJob( handler );
538  pJobManager->QueueJob( job );
539  }
kXR_char streamid[2]
Definition: XProtocol.hh:914
ServerResponseHeader hdr
Definition: XProtocol.hh:1288
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::TransportHandler::MessageReceived(), XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t  subStream,
Message msg,
uint32_t  bytesSent 
)

Definition at line 601 of file XrdClStream.cc.

604  {
605  pTransport->MessageSent( msg, subStream, bytesSent,
606  *pChannelData );
607  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
608  pBytesSent += bytesSent;
609  if( h.handler )
610  {
611  // ensure expiration time is assigned if still in queue
612  pIncomingQueue->AssignTimeout( h.handler );
613  // OnStatusReady may cause the handler to delete itself, in
614  // which case the handler or the user callback may also delete msg
615  h.handler->OnStatusReady( msg, XRootDStatus() );
616  }
617  pSubStreams[subStream]->outMsgHelper.Reset();
618  }
void AssignTimeout(MsgHandler *handler)
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.

References XrdCl::InQueue::AssignTimeout(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::TransportHandler::MessageSent(), and XrdCl::MsgHandler::OnStatusReady().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t  subStream)

On read timeout.

Definition at line 1050 of file XrdClStream.cc.

1051  {
1052  //--------------------------------------------------------------------------
1053  // We only take the main stream into account
1054  //--------------------------------------------------------------------------
1055  if( substream != 0 )
1056  return true;
1057 
1058  //--------------------------------------------------------------------------
1059  // Check if there is no outgoing messages and if the stream TTL is elapesed.
1060  // It is assumed that the underlying transport makes sure that there is no
1061  // pending requests that are not answered, ie. all possible virtual streams
1062  // are de-allocated
1063  //--------------------------------------------------------------------------
1064  Log *log = DefaultEnv::GetLog();
1065  SubStreamList::iterator it;
1066  time_t now = time(0);
1067 
1068  XrdSysMutexHelper scopedLock( pMutex );
1069  uint32_t outgoingMessages = 0;
1070  time_t lastActivity = 0;
1071  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1072  {
1073  outgoingMessages += (*it)->outQueue->GetSize();
1074  time_t sockLastActivity = (*it)->socket->GetLastActivity();
1075  if( lastActivity < sockLastActivity )
1076  lastActivity = sockLastActivity;
1077  }
1078 
1079  if( !outgoingMessages )
1080  {
1081  bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1082  *pChannelData );
1083  if( disconnect )
1084  {
1085  log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1086  pStreamName.c_str() );
1087  scopedLock.UnLock();
1088  //----------------------------------------------------------------------
1089  // Important note!
1090  //
1091  // This destroys the Stream object itself, the underlined
1092  // AsyncSocketHandler object (that called this method) and the Channel
1093  // object that aggregates this Stream.
1094  //
1095  // Additionally &(*pUrl) is used by ForceDisconnect to check if we are
1096  // in a Channel that was previously collapsed in a redirect.
1097  //----------------------------------------------------------------------
1099  return false;
1100  }
1101  }
1102 
1103  //--------------------------------------------------------------------------
1104  // Check if the stream is broken
1105  //--------------------------------------------------------------------------
1106  XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1107  *pChannelData );
1108  if( !st.IsOK() )
1109  {
1110  scopedLock.UnLock();
1111  OnError( substream, st );
1112  return false;
1113  }
1114  return true;
1115  }
Status ForceDisconnect(const URL &url)
Shut down a channel.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClStream.cc:828
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0

References XrdCl::Log::Debug(), XrdCl::PostMaster::ForceDisconnect(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::IsStreamBroken(), XrdCl::TransportHandler::IsStreamTTLElapsed(), OnError(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t  subStream)

Definition at line 545 of file XrdClStream.cc.

546  {
547  XrdSysMutexHelper scopedLock( pMutex );
548  Log *log = DefaultEnv::GetLog();
549  if( pSubStreams[subStream]->outQueue->IsEmpty() )
550  {
551  log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552  pSubStreams[subStream]->socket->GetStreamName().c_str() );
553 
554  pSubStreams[subStream]->socket->DisableUplink();
555  return std::make_pair( (Message *)0, (MsgHandler *)0 );
556  }
557 
558  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559  h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560  h.expires,
561  h.stateful );
562 
563  log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
564  "from out-queue to in-queue, starting to send outgoing.",
565  pUrl->GetHostId().c_str(), (void*)h.handler,
566  h.msg->GetObfuscatedDescription().c_str() );
567 
568  scopedLock.UnLock();
569 
570  if( h.handler )
571  {
572  bool rmMsg = false;
573  pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
574  if( rmMsg )
575  {
576  Log *log = DefaultEnv::GetLog();
577  log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
578  pStreamName.c_str() );
579  }
580  h.handler->OnReadyToSend( h.msg );
581  }
582  return std::make_pair( h.msg, h.handler );
583  }
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
Definition: XrdClInQueue.cc:54

References XrdCl::InQueue::AddMessageHandler(), XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, XrdSysMutexHelper::UnLock(), and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t  subStream)

On write timeout.

Definition at line 1120 of file XrdClStream.cc.

1121  {
1122  return true;
1123  }

Referenced by XrdCl::AsyncSocketHandler::OnWriteTimeout().

+ Here is the caller graph for this function:

◆ Query()

Status XrdCl::Stream::Query ( uint16_t  query,
AnyObject result 
)

Query the stream.

Definition at line 1242 of file XrdClStream.cc.

1243  {
1244  switch( query )
1245  {
1246  case StreamQuery::IpAddr:
1247  {
1248  result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1249  return Status();
1250  }
1251 
1252  case StreamQuery::IpStack:
1253  {
1254  result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1255  return Status();
1256  }
1257 
1258  case StreamQuery::HostName:
1259  {
1260  result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1261  return Status();
1262  }
1263 
1264  default:
1265  return Status( stError, errQueryNotSupported );
1266  }
1267  }
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

Referenced by XrdCl::Channel::QueryTransport().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler handler)

Register channel event handler.

Definition at line 1128 of file XrdClStream.cc.

1129  {
1130  pChannelEvHandlers.AddHandler( handler );
1131  }
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.

References XrdCl::ChannelHandlerList::AddHandler().

Referenced by XrdCl::Channel::RegisterEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler handler)

Remove a channel event handler.

Definition at line 1136 of file XrdClStream.cc.

1137  {
1138  pChannelEvHandlers.RemoveHandler( handler );
1139  }
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.

References XrdCl::ChannelHandlerList::RemoveHandler().

Referenced by XrdCl::Channel::RemoveEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message msg,
MsgHandler handler,
bool  stateful,
time_t  expires 
)

Queue the message for sending.

Definition at line 297 of file XrdClStream.cc.

301  {
302  XrdSysMutexHelper scopedLock( pMutex );
303  Log *log = DefaultEnv::GetLog();
304 
305  //--------------------------------------------------------------------------
306  // Check the session ID and bounce if needed
307  //--------------------------------------------------------------------------
308  if( msg->GetSessionId() &&
309  (pSubStreams[0]->status != Socket::Connected ||
310  pSessionId != msg->GetSessionId()) )
311  return XRootDStatus( stError, errInvalidSession );
312 
313  //--------------------------------------------------------------------------
314  // Decide on the path to send the message
315  //--------------------------------------------------------------------------
316  PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317  if( pSubStreams.size() <= path.up )
318  {
319  log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320  "substream %d, using 0 instead", pStreamName.c_str(),
321  msg->GetObfuscatedDescription().c_str(), path.up );
322  path.up = 0;
323  }
324 
325  log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
326  "substream %d expecting answer at %d", pStreamName.c_str(),
327  msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
328 
329  //--------------------------------------------------------------------------
330  // Enable *a* path and insert the message to the right queue
331  //--------------------------------------------------------------------------
332  XRootDStatus st = EnableLink( path );
333  if( st.IsOK() )
334  {
335  pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336  pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337  expires, stateful );
338  }
339  else
340  st.status = stFatal;
341  return st;
342  }
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::MultiplexSubStream(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

Referenced by XrdCl::Channel::Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject channelData)
inline

Set the channel data.

Definition at line 115 of file XrdClStream.hh.

116  {
117  pChannelData = channelData;
118  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue incomingQueue)
inline

Set the incoming queue.

Definition at line 107 of file XrdClStream.hh.

108  {
109  pIncomingQueue = incomingQueue;
110  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager jobManager)
inline

Set job manager.

Definition at line 131 of file XrdClStream.hh.

132  {
133  pJobManager = jobManager;
134  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > &  onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 263 of file XrdClStream.hh.

264  {
265  XrdSysMutexHelper scopedLock( pMutex );
266  pOnDataConnJob = onConnJob;
267  }

Referenced by XrdCl::Channel::SetOnDataConnectHandler().

+ Here is the caller graph for this function:

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller poller)
inline

Set the poller.

Definition at line 99 of file XrdClStream.hh.

100  {
101  pPoller = poller;
102  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager taskManager)
inline

Set task manager.

Definition at line 123 of file XrdClStream.hh.

124  {
125  pTaskManager = taskManager;
126  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler transport)
inline

Set the transport.

Definition at line 91 of file XrdClStream.hh.

92  {
93  pTransport = transport;
94  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ Tick()

void XrdCl::Stream::Tick ( time_t  now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 377 of file XrdClStream.cc.

378  {
379  //--------------------------------------------------------------------------
380  // Check for timed-out requests and incoming handlers
381  //--------------------------------------------------------------------------
382  pMutex.Lock();
383  OutQueue q;
384  SubStreamList::iterator it;
385  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386  q.GrabExpired( *(*it)->outQueue, now );
387  pMutex.UnLock();
388 
389  q.Report( XRootDStatus( stError, errOperationExpired ) );
390  pIncomingQueue->ReportTimeout( now );
391  }
void ReportTimeout(time_t now=0)
Timeout handlers.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdSysMutex::Lock(), XrdCl::OutQueue::Report(), XrdCl::InQueue::ReportTimeout(), XrdCl::stError, and XrdSysMutex::UnLock().

Referenced by XrdCl::Channel::Tick().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: