xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClStream.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef __XRD_CL_STREAM_HH__
20 #define __XRD_CL_STREAM_HH__
21 
22 #include "XrdCl/XrdClPoller.hh"
23 #include "XrdCl/XrdClStatus.hh"
24 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClJobManager.hh"
28 #include "XrdCl/XrdClInQueue.hh"
29 #include "XrdCl/XrdClUtils.hh"
30 
31 #include "XrdSys/XrdSysPthread.hh"
32 #include "XrdSys/XrdSysRAtomic.hh"
33 #include "XrdNet/XrdNetAddr.hh"
34 #include "XrdOuc/XrdOucCompiler.hh"
35 #include <list>
36 #include <vector>
37 #include <functional>
38 #include <memory>
39 
40 namespace XrdCl
41 {
42  class Message;
43  class Channel;
44  class TransportHandler;
45  class TaskManager;
46  struct SubStreamData;
47 
48  //----------------------------------------------------------------------------
50  //----------------------------------------------------------------------------
51  class Stream
52  {
53  public:
54  //------------------------------------------------------------------------
56  //------------------------------------------------------------------------
58  {
60  Connected = 1,
61  Connecting = 2,
62  Error = 3
63  };
64 
65  //------------------------------------------------------------------------
67  //------------------------------------------------------------------------
68  Stream( const URL *url, const URL &prefer = URL() );
69 
70  //------------------------------------------------------------------------
72  //------------------------------------------------------------------------
73  ~Stream();
74 
75  //------------------------------------------------------------------------
77  //------------------------------------------------------------------------
79 
80  //------------------------------------------------------------------------
82  //------------------------------------------------------------------------
84  MsgHandler *handler,
85  bool stateful,
86  time_t expires );
87 
88  //------------------------------------------------------------------------
90  //------------------------------------------------------------------------
91  void SetTransport( TransportHandler *transport )
92  {
93  pTransport = transport;
94  }
95 
96  //------------------------------------------------------------------------
98  //------------------------------------------------------------------------
99  void SetPoller( Poller *poller )
100  {
101  pPoller = poller;
102  }
103 
104  //------------------------------------------------------------------------
106  //------------------------------------------------------------------------
107  void SetIncomingQueue( InQueue *incomingQueue )
108  {
109  pIncomingQueue = incomingQueue;
110  }
111 
112  //------------------------------------------------------------------------
114  //------------------------------------------------------------------------
115  void SetChannelData( AnyObject *channelData )
116  {
117  pChannelData = channelData;
118  }
119 
120  //------------------------------------------------------------------------
122  //------------------------------------------------------------------------
123  void SetTaskManager( TaskManager *taskManager )
124  {
125  pTaskManager = taskManager;
126  }
127 
128  //------------------------------------------------------------------------
130  //------------------------------------------------------------------------
131  void SetJobManager( JobManager *jobManager )
132  {
133  pJobManager = jobManager;
134  }
135 
136  //------------------------------------------------------------------------
140  //------------------------------------------------------------------------
141  XRootDStatus EnableLink( PathID &path );
142 
143  //------------------------------------------------------------------------
145  //------------------------------------------------------------------------
146  void Disconnect( bool force = false );
147 
148  //------------------------------------------------------------------------
151  //------------------------------------------------------------------------
152  void Tick( time_t now );
153 
154  //------------------------------------------------------------------------
156  //------------------------------------------------------------------------
157  const URL *GetURL() const
158  {
159  return pUrl;
160  }
161 
162  //------------------------------------------------------------------------
164  //------------------------------------------------------------------------
165  void ForceConnect();
166 
167  //------------------------------------------------------------------------
169  //------------------------------------------------------------------------
170  const std::string &GetName() const
171  {
172  return pStreamName;
173  }
174 
175  //------------------------------------------------------------------------
177  //------------------------------------------------------------------------
178  void DisableIfEmpty( uint16_t subStream );
179 
180  //------------------------------------------------------------------------
182  //------------------------------------------------------------------------
183  void OnIncoming( uint16_t subStream,
184  std::shared_ptr<Message> msg,
185  uint32_t bytesReceived );
186 
187  //------------------------------------------------------------------------
188  // Call when one of the sockets is ready to accept a new message
189  //------------------------------------------------------------------------
190  std::pair<Message *, MsgHandler *>
191  OnReadyToWrite( uint16_t subStream );
192 
193  //------------------------------------------------------------------------
194  // Call when a message is written to the socket
195  //------------------------------------------------------------------------
196  void OnMessageSent( uint16_t subStream,
197  Message *msg,
198  uint32_t bytesSent );
199 
200  //------------------------------------------------------------------------
202  //------------------------------------------------------------------------
203  void OnConnect( uint16_t subStream );
204 
205  //------------------------------------------------------------------------
207  //------------------------------------------------------------------------
208  void OnConnectError( uint16_t subStream, XRootDStatus status );
209 
210  //------------------------------------------------------------------------
212  //------------------------------------------------------------------------
213  void OnError( uint16_t subStream, XRootDStatus status );
214 
215  //------------------------------------------------------------------------
217  //------------------------------------------------------------------------
218  void ForceError( XRootDStatus status );
219 
220  //------------------------------------------------------------------------
222  //------------------------------------------------------------------------
223  bool OnReadTimeout( uint16_t subStream ) XRD_WARN_UNUSED_RESULT;
224 
225  //------------------------------------------------------------------------
227  //------------------------------------------------------------------------
228  bool OnWriteTimeout( uint16_t subStream ) XRD_WARN_UNUSED_RESULT;
229 
230  //------------------------------------------------------------------------
232  //------------------------------------------------------------------------
233  void RegisterEventHandler( ChannelEventHandler *handler );
234 
235  //------------------------------------------------------------------------
237  //------------------------------------------------------------------------
238  void RemoveEventHandler( ChannelEventHandler *handler );
239 
240  //------------------------------------------------------------------------
249  //------------------------------------------------------------------------
250  MsgHandler*
251  InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream );
252 
253  //------------------------------------------------------------------------
257  //------------------------------------------------------------------------
258  uint16_t InspectStatusRsp( uint16_t stream, MsgHandler *&incHandler );
259 
260  //------------------------------------------------------------------------
262  //------------------------------------------------------------------------
263  void SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
264  {
265  XrdSysMutexHelper scopedLock( pMutex );
266  pOnDataConnJob = onConnJob;
267  }
268 
269  //------------------------------------------------------------------------
272  //------------------------------------------------------------------------
273  bool CanCollapse( const URL &url );
274 
275  //------------------------------------------------------------------------
277  //------------------------------------------------------------------------
278  Status Query( uint16_t query, AnyObject &result );
279 
280  private:
281 
282  //------------------------------------------------------------------------
284  //------------------------------------------------------------------------
285  static bool IsPartial( Message &msg );
286 
287  //------------------------------------------------------------------------
289  //------------------------------------------------------------------------
290  inline static bool HasNetAddr( const XrdNetAddr &addr,
291  std::vector<XrdNetAddr> &addresses )
292  {
293  auto itr = addresses.begin();
294  for( ; itr != addresses.end() ; ++itr )
295  {
296  if( itr->Same( &addr ) ) return true;
297  }
298 
299  return false;
300  }
301 
302  //------------------------------------------------------------------------
303  // Job handling the incoming messages
304  //------------------------------------------------------------------------
305  class HandleIncMsgJob: public Job
306  {
307  public:
308  HandleIncMsgJob( MsgHandler *handler ): pHandler( handler ) {};
309  virtual ~HandleIncMsgJob() {};
310  virtual void Run( void* )
311  {
312  pHandler->Process();
313  delete this;
314  }
315  private:
317  };
318 
319  //------------------------------------------------------------------------
321  //------------------------------------------------------------------------
322  void OnFatalError( uint16_t subStream,
323  XRootDStatus status,
324  XrdSysMutexHelper &lock );
325 
326  //------------------------------------------------------------------------
328  //------------------------------------------------------------------------
329  void MonitorDisconnection( XRootDStatus status );
330 
331  //------------------------------------------------------------------------
333  //------------------------------------------------------------------------
335 
336  typedef std::vector<SubStreamData*> SubStreamList;
337 
338  //------------------------------------------------------------------------
339  // Data members
340  //------------------------------------------------------------------------
341  const URL *pUrl;
342  const URL pPrefer;
343  std::string pStreamName;
359  std::vector<XrdNetAddr> pAddresses;
362  uint64_t pSessionId;
363 
364  //------------------------------------------------------------------------
365  // Monitoring info
366  //------------------------------------------------------------------------
369  uint64_t pBytesSent;
370  uint64_t pBytesReceived;
371 
372  //------------------------------------------------------------------------
373  // Data stream on-connect handler
374  //------------------------------------------------------------------------
375  std::shared_ptr<Job> pOnDataConnJob;
376 
377  //------------------------------------------------------------------------
378  // Track last assigned Id across all Streams, to ensure unique sessionId
379  //------------------------------------------------------------------------
381  };
382 }
383 
384 #endif // __XRD_CL_STREAM_HH__
A synchronized queue.
Definition: XrdClJobManager.hh:50
Definition: XrdClStream.hh:305
void OnError(uint16_t subStream, XRootDStatus status)
On error.
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
Definition: XrdClAnyObject.hh:32
uint32_t pLastStreamError
Definition: XrdClStream.hh:351
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void OnFatalError(uint16_t subStream, XRootDStatus status, XrdSysMutexHelper &lock)
On fatal error - unlocks the stream.
std::vector< XrdNetAddr > pAddresses
Definition: XrdClStream.hh:359
Definition: XrdSysPthread.hh:241
Interface for socket pollers.
Definition: XrdClPoller.hh:86
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
timeval pConnectionDone
Definition: XrdClStream.hh:368
Definition: XrdClPostMasterInterfaces.hh:268
InQueue * pIncomingQueue
Definition: XrdClStream.hh:349
In the process of being connected.
Definition: XrdClStream.hh:61
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
Definition: XrdClStream.hh:107
ChannelHandlerList pChannelEvHandlers
Definition: XrdClStream.hh:361
void SetPoller(Poller *poller)
Set the poller.
Definition: XrdClStream.hh:99
void MonitorDisconnection(XRootDStatus status)
Inform the monitoring about disconnection.
Message handler.
Definition: XrdClPostMasterInterfaces.hh:50
static bool HasNetAddr(const XrdNetAddr &addr, std::vector< XrdNetAddr > &addresses)
Check if addresses contains given address.
Definition: XrdClStream.hh:290
A helper for handling channel event handlers.
Definition: XrdClChannelHandlerList.hh:33
std::vector< SubStreamData * > SubStreamList
Definition: XrdClStream.hh:336
void ForceError(XRootDStatus status)
Force error.
virtual void Run(void *)
The job logic.
Definition: XrdClStream.hh:310
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
const URL pPrefer
Definition: XrdClStream.hh:342
Definition: XrdNetAddr.hh:41
Procedure execution status.
Definition: XrdClStatus.hh:114
#define XRD_WARN_UNUSED_RESULT
Definition: XrdOucCompiler.hh:31
void SetTaskManager(TaskManager *taskManager)
Set task manager.
Definition: XrdClStream.hh:123
virtual void Process()
Definition: XrdClPostMasterInterfaces.hh:125
uint64_t pSessionId
Definition: XrdClStream.hh:362
uint16_t pConnectionWindow
Definition: XrdClStream.hh:357
void SetTransport(TransportHandler *transport)
Set the transport.
Definition: XrdClStream.hh:91
AddressType
Address type.
Definition: XrdClUtils.hh:86
uint16_t pConnectionCount
Definition: XrdClStream.hh:354
Broken.
Definition: XrdClStream.hh:62
AnyObject * pChannelData
Definition: XrdClStream.hh:350
TransportHandler * pTransport
Definition: XrdClStream.hh:344
TaskManager * pTaskManager
Definition: XrdClStream.hh:346
XRootDStatus Initialize()
Initializer.
uint16_t pConnectionRetry
Definition: XrdClStream.hh:355
timeval pConnectionStarted
Definition: XrdClStream.hh:367
XrdSysRecMutex pMutex
Definition: XrdClStream.hh:348
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:309
virtual ~HandleIncMsgJob()
Definition: XrdClStream.hh:309
JobManager * pJobManager
Definition: XrdClStream.hh:347
Request status.
Definition: XrdClXRootDResponses.hh:218
void SetJobManager(JobManager *jobManager)
Set job manager.
Definition: XrdClStream.hh:131
Channel event handler.
Definition: XrdClPostMasterInterfaces.hh:209
Poller * pPoller
Definition: XrdClStream.hh:345
Status Query(uint16_t query, AnyObject &result)
Query the stream.
uint64_t pBytesSent
Definition: XrdClStream.hh:369
A synchronize queue for incoming data.
Definition: XrdClInQueue.hh:36
Utils::AddressType pAddressType
Definition: XrdClStream.hh:360
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
~Stream()
Destructor.
XRootDStatus pLastFatalError
Definition: XrdClStream.hh:352
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
Connected.
Definition: XrdClStream.hh:60
Definition: XrdSysRAtomic.hh:25
void ForceConnect()
Force connection.
std::shared_ptr< Job > pOnDataConnJob
Definition: XrdClStream.hh:375
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
void SetChannelData(AnyObject *channelData)
Set the channel data.
Definition: XrdClStream.hh:115
uint64_t pBytesReceived
Definition: XrdClStream.hh:370
SubStreamList pSubStreams
Definition: XrdClStream.hh:358
URL representation.
Definition: XrdClURL.hh:30
MsgHandler * pHandler
Definition: XrdClStream.hh:316
std::string pStreamName
Definition: XrdClStream.hh:343
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
Stream.
Definition: XrdClStream.hh:51
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:157
void Tick(time_t now)
const std::string & GetName() const
Return stream name.
Definition: XrdClStream.hh:170
bool CanCollapse(const URL &url)
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:33
const URL * pUrl
Definition: XrdClStream.hh:341
Stream(const URL *url, const URL &prefer=URL())
Constructor.
static RAtomic_uint64_t sSessCntGen
Definition: XrdClStream.hh:380
HandleIncMsgJob(MsgHandler *handler)
Definition: XrdClStream.hh:308
time_t pConnectionInitTime
Definition: XrdClStream.hh:356
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
XRootDStatus RequestClose(Message &resp)
Send close after an open request timed out.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
Definition: XrdClStream.hh:263
Definition: XrdClTaskManager.hh:75
Definition: XrdSysPthread.hh:262
static bool IsPartial(Message &msg)
Check if message is a partial response.
StreamStatus
Status of the stream.
Definition: XrdClStream.hh:57
uint16_t pStreamErrorWindow
Definition: XrdClStream.hh:353
Not connected.
Definition: XrdClStream.hh:59