xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClStream.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef __XRD_CL_STREAM_HH__
20 #define __XRD_CL_STREAM_HH__
21 
22 #include "XrdCl/XrdClPoller.hh"
23 #include "XrdCl/XrdClStatus.hh"
24 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClJobManager.hh"
28 #include "XrdCl/XrdClInQueue.hh"
29 #include "XrdCl/XrdClUtils.hh"
30 
31 #include "XrdSys/XrdSysPthread.hh"
32 #include "XrdSys/XrdSysRAtomic.hh"
33 #include "XrdNet/XrdNetAddr.hh"
34 #include <list>
35 #include <vector>
36 #include <functional>
37 #include <memory>
38 
39 namespace XrdCl
40 {
41  class Message;
42  class Channel;
43  class TransportHandler;
44  class TaskManager;
45  struct SubStreamData;
46 
47  //----------------------------------------------------------------------------
49  //----------------------------------------------------------------------------
50  class Stream
51  {
52  public:
53  //------------------------------------------------------------------------
55  //------------------------------------------------------------------------
57  {
59  Connected = 1,
60  Connecting = 2,
61  Error = 3
62  };
63 
64  //------------------------------------------------------------------------
66  //------------------------------------------------------------------------
67  Stream( const URL *url, const URL &prefer = URL() );
68 
69  //------------------------------------------------------------------------
71  //------------------------------------------------------------------------
72  ~Stream();
73 
74  //------------------------------------------------------------------------
76  //------------------------------------------------------------------------
78 
79  //------------------------------------------------------------------------
81  //------------------------------------------------------------------------
83  MsgHandler *handler,
84  bool stateful,
85  time_t expires );
86 
87  //------------------------------------------------------------------------
89  //------------------------------------------------------------------------
90  void SetTransport( TransportHandler *transport )
91  {
92  pTransport = transport;
93  }
94 
95  //------------------------------------------------------------------------
97  //------------------------------------------------------------------------
98  void SetPoller( Poller *poller )
99  {
100  pPoller = poller;
101  }
102 
103  //------------------------------------------------------------------------
105  //------------------------------------------------------------------------
106  void SetIncomingQueue( InQueue *incomingQueue )
107  {
108  pIncomingQueue = incomingQueue;
109  }
110 
111  //------------------------------------------------------------------------
113  //------------------------------------------------------------------------
114  void SetChannelData( AnyObject *channelData )
115  {
116  pChannelData = channelData;
117  }
118 
119  //------------------------------------------------------------------------
121  //------------------------------------------------------------------------
122  void SetTaskManager( TaskManager *taskManager )
123  {
124  pTaskManager = taskManager;
125  }
126 
127  //------------------------------------------------------------------------
129  //------------------------------------------------------------------------
130  void SetJobManager( JobManager *jobManager )
131  {
132  pJobManager = jobManager;
133  }
134 
135  //------------------------------------------------------------------------
139  //------------------------------------------------------------------------
140  XRootDStatus EnableLink( PathID &path );
141 
142  //------------------------------------------------------------------------
144  //------------------------------------------------------------------------
145  void Disconnect( bool force = false );
146 
147  //------------------------------------------------------------------------
150  //------------------------------------------------------------------------
151  void Tick( time_t now );
152 
153  //------------------------------------------------------------------------
155  //------------------------------------------------------------------------
156  const URL *GetURL() const
157  {
158  return pUrl;
159  }
160 
161  //------------------------------------------------------------------------
163  //------------------------------------------------------------------------
164  void ForceConnect();
165 
166  //------------------------------------------------------------------------
168  //------------------------------------------------------------------------
169  const std::string &GetName() const
170  {
171  return pStreamName;
172  }
173 
174  //------------------------------------------------------------------------
176  //------------------------------------------------------------------------
177  void DisableIfEmpty( uint16_t subStream );
178 
179  //------------------------------------------------------------------------
181  //------------------------------------------------------------------------
182  void OnIncoming( uint16_t subStream,
183  std::shared_ptr<Message> msg,
184  uint32_t bytesReceived );
185 
186  //------------------------------------------------------------------------
187  // Call when one of the sockets is ready to accept a new message
188  //------------------------------------------------------------------------
189  std::pair<Message *, MsgHandler *>
190  OnReadyToWrite( uint16_t subStream );
191 
192  //------------------------------------------------------------------------
193  // Call when a message is written to the socket
194  //------------------------------------------------------------------------
195  void OnMessageSent( uint16_t subStream,
196  Message *msg,
197  uint32_t bytesSent );
198 
199  //------------------------------------------------------------------------
201  //------------------------------------------------------------------------
202  void OnConnect( uint16_t subStream );
203 
204  //------------------------------------------------------------------------
206  //------------------------------------------------------------------------
207  void OnConnectError( uint16_t subStream, XRootDStatus status );
208 
209  //------------------------------------------------------------------------
211  //------------------------------------------------------------------------
212  void OnError( uint16_t subStream, XRootDStatus status );
213 
214  //------------------------------------------------------------------------
216  //------------------------------------------------------------------------
217  void ForceError( XRootDStatus status );
218 
219  //------------------------------------------------------------------------
221  //------------------------------------------------------------------------
222  void OnReadTimeout( uint16_t subStream );
223 
224  //------------------------------------------------------------------------
226  //------------------------------------------------------------------------
227  void OnWriteTimeout( uint16_t subStream );
228 
229  //------------------------------------------------------------------------
231  //------------------------------------------------------------------------
232  void RegisterEventHandler( ChannelEventHandler *handler );
233 
234  //------------------------------------------------------------------------
236  //------------------------------------------------------------------------
237  void RemoveEventHandler( ChannelEventHandler *handler );
238 
239  //------------------------------------------------------------------------
248  //------------------------------------------------------------------------
249  MsgHandler*
250  InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream );
251 
252  //------------------------------------------------------------------------
256  //------------------------------------------------------------------------
257  uint16_t InspectStatusRsp( uint16_t stream, MsgHandler *&incHandler );
258 
259  //------------------------------------------------------------------------
261  //------------------------------------------------------------------------
262  void SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
263  {
264  XrdSysMutexHelper scopedLock( pMutex );
265  pOnDataConnJob = onConnJob;
266  }
267 
268  //------------------------------------------------------------------------
271  //------------------------------------------------------------------------
272  bool CanCollapse( const URL &url );
273 
274  //------------------------------------------------------------------------
276  //------------------------------------------------------------------------
277  Status Query( uint16_t query, AnyObject &result );
278 
279  private:
280 
281  //------------------------------------------------------------------------
283  //------------------------------------------------------------------------
284  static bool IsPartial( Message &msg );
285 
286  //------------------------------------------------------------------------
288  //------------------------------------------------------------------------
289  inline static bool HasNetAddr( const XrdNetAddr &addr,
290  std::vector<XrdNetAddr> &addresses )
291  {
292  auto itr = addresses.begin();
293  for( ; itr != addresses.end() ; ++itr )
294  {
295  if( itr->Same( &addr ) ) return true;
296  }
297 
298  return false;
299  }
300 
301  //------------------------------------------------------------------------
302  // Job handling the incoming messages
303  //------------------------------------------------------------------------
304  class HandleIncMsgJob: public Job
305  {
306  public:
307  HandleIncMsgJob( MsgHandler *handler ): pHandler( handler ) {};
308  virtual ~HandleIncMsgJob() {};
309  virtual void Run( void* )
310  {
311  pHandler->Process();
312  delete this;
313  }
314  private:
316  };
317 
318  //------------------------------------------------------------------------
320  //------------------------------------------------------------------------
321  void OnFatalError( uint16_t subStream,
322  XRootDStatus status,
323  XrdSysMutexHelper &lock );
324 
325  //------------------------------------------------------------------------
327  //------------------------------------------------------------------------
328  void MonitorDisconnection( XRootDStatus status );
329 
330  //------------------------------------------------------------------------
332  //------------------------------------------------------------------------
334 
335  typedef std::vector<SubStreamData*> SubStreamList;
336 
337  //------------------------------------------------------------------------
338  // Data members
339  //------------------------------------------------------------------------
340  const URL *pUrl;
341  const URL pPrefer;
342  std::string pStreamName;
358  std::vector<XrdNetAddr> pAddresses;
361  uint64_t pSessionId;
362 
363  //------------------------------------------------------------------------
364  // Monitoring info
365  //------------------------------------------------------------------------
368  uint64_t pBytesSent;
369  uint64_t pBytesReceived;
370 
371  //------------------------------------------------------------------------
372  // Data stream on-connect handler
373  //------------------------------------------------------------------------
374  std::shared_ptr<Job> pOnDataConnJob;
375 
376  //------------------------------------------------------------------------
377  // Track last assigned Id across all Streams, to ensure unique sessionId
378  //------------------------------------------------------------------------
380  };
381 }
382 
383 #endif // __XRD_CL_STREAM_HH__
A synchronized queue.
Definition: XrdClJobManager.hh:50
Definition: XrdClStream.hh:304
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClAnyObject.hh:32
uint32_t pLastStreamError
Definition: XrdClStream.hh:350
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void OnFatalError(uint16_t subStream, XRootDStatus status, XrdSysMutexHelper &lock)
On fatal error - unlocks the stream.
std::vector< XrdNetAddr > pAddresses
Definition: XrdClStream.hh:358
Definition: XrdSysPthread.hh:241
Interface for socket pollers.
Definition: XrdClPoller.hh:86
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void OnWriteTimeout(uint16_t subStream)
On write timeout.
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
timeval pConnectionDone
Definition: XrdClStream.hh:367
Definition: XrdClPostMasterInterfaces.hh:268
InQueue * pIncomingQueue
Definition: XrdClStream.hh:348
In the process of being connected.
Definition: XrdClStream.hh:60
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
Definition: XrdClStream.hh:106
ChannelHandlerList pChannelEvHandlers
Definition: XrdClStream.hh:360
void SetPoller(Poller *poller)
Set the poller.
Definition: XrdClStream.hh:98
void MonitorDisconnection(XRootDStatus status)
Inform the monitoring about disconnection.
Message handler.
Definition: XrdClPostMasterInterfaces.hh:50
static bool HasNetAddr(const XrdNetAddr &addr, std::vector< XrdNetAddr > &addresses)
Check if addresses contains given address.
Definition: XrdClStream.hh:289
A helper for handling channel event handlers.
Definition: XrdClChannelHandlerList.hh:33
std::vector< SubStreamData * > SubStreamList
Definition: XrdClStream.hh:335
void ForceError(XRootDStatus status)
Force error.
virtual void Run(void *)
The job logic.
Definition: XrdClStream.hh:309
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
const URL pPrefer
Definition: XrdClStream.hh:341
Definition: XrdNetAddr.hh:41
Procedure execution status.
Definition: XrdClStatus.hh:114
void SetTaskManager(TaskManager *taskManager)
Set task manager.
Definition: XrdClStream.hh:122
virtual void Process()
Definition: XrdClPostMasterInterfaces.hh:125
uint64_t pSessionId
Definition: XrdClStream.hh:361
uint16_t pConnectionWindow
Definition: XrdClStream.hh:356
void SetTransport(TransportHandler *transport)
Set the transport.
Definition: XrdClStream.hh:90
AddressType
Address type.
Definition: XrdClUtils.hh:86
uint16_t pConnectionCount
Definition: XrdClStream.hh:353
Broken.
Definition: XrdClStream.hh:61
AnyObject * pChannelData
Definition: XrdClStream.hh:349
TransportHandler * pTransport
Definition: XrdClStream.hh:343
TaskManager * pTaskManager
Definition: XrdClStream.hh:345
XRootDStatus Initialize()
Initializer.
uint16_t pConnectionRetry
Definition: XrdClStream.hh:354
timeval pConnectionStarted
Definition: XrdClStream.hh:366
XrdSysRecMutex pMutex
Definition: XrdClStream.hh:347
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:309
virtual ~HandleIncMsgJob()
Definition: XrdClStream.hh:308
JobManager * pJobManager
Definition: XrdClStream.hh:346
Request status.
Definition: XrdClXRootDResponses.hh:218
void SetJobManager(JobManager *jobManager)
Set job manager.
Definition: XrdClStream.hh:130
Channel event handler.
Definition: XrdClPostMasterInterfaces.hh:209
Poller * pPoller
Definition: XrdClStream.hh:344
Status Query(uint16_t query, AnyObject &result)
Query the stream.
uint64_t pBytesSent
Definition: XrdClStream.hh:368
A synchronize queue for incoming data.
Definition: XrdClInQueue.hh:36
Utils::AddressType pAddressType
Definition: XrdClStream.hh:359
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
~Stream()
Destructor.
XRootDStatus pLastFatalError
Definition: XrdClStream.hh:351
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
Connected.
Definition: XrdClStream.hh:59
Definition: XrdSysRAtomic.hh:25
void ForceConnect()
Force connection.
std::shared_ptr< Job > pOnDataConnJob
Definition: XrdClStream.hh:374
void SetChannelData(AnyObject *channelData)
Set the channel data.
Definition: XrdClStream.hh:114
uint64_t pBytesReceived
Definition: XrdClStream.hh:369
SubStreamList pSubStreams
Definition: XrdClStream.hh:357
URL representation.
Definition: XrdClURL.hh:30
MsgHandler * pHandler
Definition: XrdClStream.hh:315
std::string pStreamName
Definition: XrdClStream.hh:342
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
Stream.
Definition: XrdClStream.hh:50
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:156
void Tick(time_t now)
const std::string & GetName() const
Return stream name.
Definition: XrdClStream.hh:169
bool CanCollapse(const URL &url)
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:33
const URL * pUrl
Definition: XrdClStream.hh:340
Stream(const URL *url, const URL &prefer=URL())
Constructor.
static RAtomic_uint64_t sSessCntGen
Definition: XrdClStream.hh:379
HandleIncMsgJob(MsgHandler *handler)
Definition: XrdClStream.hh:307
time_t pConnectionInitTime
Definition: XrdClStream.hh:355
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
XRootDStatus RequestClose(Message &resp)
Send close after an open request timed out.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
Definition: XrdClStream.hh:262
Definition: XrdClTaskManager.hh:75
Definition: XrdSysPthread.hh:262
static bool IsPartial(Message &msg)
Check if message is a partial response.
void OnReadTimeout(uint16_t subStream)
On read timeout.
StreamStatus
Status of the stream.
Definition: XrdClStream.hh:56
uint16_t pStreamErrorWindow
Definition: XrdClStream.hh:352
Not connected.
Definition: XrdClStream.hh:58