XRootD
XrdClStream.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef __XRD_CL_STREAM_HH__
20 #define __XRD_CL_STREAM_HH__
21 
22 #include "XrdCl/XrdClPoller.hh"
23 #include "XrdCl/XrdClStatus.hh"
24 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClJobManager.hh"
28 #include "XrdCl/XrdClInQueue.hh"
29 #include "XrdCl/XrdClUtils.hh"
30 
31 #include "XrdSys/XrdSysPthread.hh"
32 #include "XrdSys/XrdSysRAtomic.hh"
33 #include "XrdNet/XrdNetAddr.hh"
34 #include "XrdOuc/XrdOucCompiler.hh"
35 #include <list>
36 #include <vector>
37 #include <functional>
38 #include <memory>
39 
40 #include <assert.h>
41 
42 namespace XrdCl
43 {
44  class Message;
45  class Channel;
46  class TransportHandler;
47  class TaskManager;
48  struct SubStreamData;
49 
50  //----------------------------------------------------------------------------
60  //----------------------------------------------------------------------------
62  {
63  public:
64  StreamMutex(): mcv(0), hasfn(false) { }
66  {
67  assert( mlist.empty() );
68  assert( mclosing.empty() );
69  assert( mthmap.empty() );
70  assert( hasfn == false );
71  }
72 
73  //------------------------------------------------------------------------
75  //------------------------------------------------------------------------
76  void AddClosing( uint16_t subStream );
77 
78  //------------------------------------------------------------------------
80  //------------------------------------------------------------------------
81  void RemoveClosing( uint16_t subStream );
82 
83  //------------------------------------------------------------------------
85  //------------------------------------------------------------------------
86  void Lock();
87 
88  //------------------------------------------------------------------------
90  //------------------------------------------------------------------------
91  void Lock( uint16_t subStream, bool &isclosing );
92 
93  //------------------------------------------------------------------------
98  //------------------------------------------------------------------------
99  void Lock( const std::function<void()> &func, bool &isclosing );
100 
101  //------------------------------------------------------------------------
103  //------------------------------------------------------------------------
104  void UnLock();
105 
106  struct MtxInfo
107  {
108  MtxInfo(): cnt( 0 ) { }
109  MtxInfo( const std::function<void()> &func ): cnt( 0 ), fn( func ) { }
110  ~MtxInfo() { }
111 
112  size_t cnt;
113  std::function<void()> fn;
114  };
115 
117  std::list<MtxInfo> mlist;
118  std::map<uint16_t, size_t> mclosing;
119  std::map<pthread_t, std::list<MtxInfo>::iterator> mthmap;
120  bool hasfn;
121  std::list<MtxInfo>::iterator fnlistit;
122  };
123 
124  //----------------------------------------------------------------------------
128  //----------------------------------------------------------------------------
130  {
131  public:
133  {
134  mtx->Lock();
135  }
136 
137  StreamMutexHelper( StreamMutex &sm, uint16_t idx,
138  bool &isclosing ): mtx( &sm )
139  {
140  mtx->Lock( idx, isclosing );
141  if( isclosing ) mtx = nullptr;
142  }
143 
144  StreamMutexHelper( StreamMutex &sm, const std::function<void()> &func,
145  bool &isclosing ): mtx( &sm )
146  {
147  mtx->Lock( func, isclosing );
148  if( isclosing ) mtx = nullptr;
149  }
150 
152  {
153  UnLock();
154  }
155 
156  void UnLock()
157  {
158  if( !mtx ) return;
159  mtx->UnLock();
160  mtx = nullptr;
161  }
162 
164  };
165 
166  //----------------------------------------------------------------------------
168  //----------------------------------------------------------------------------
169  class Stream
170  {
171  public:
172  //------------------------------------------------------------------------
174  //------------------------------------------------------------------------
176  {
178  Connected = 1,
180  Error = 3
181  };
182 
183  //------------------------------------------------------------------------
185  //------------------------------------------------------------------------
186  Stream( const URL *url, const URL &prefer = URL() );
187 
188  //------------------------------------------------------------------------
190  //------------------------------------------------------------------------
191  ~Stream();
192 
193  //------------------------------------------------------------------------
195  //------------------------------------------------------------------------
197 
198  //------------------------------------------------------------------------
200  //------------------------------------------------------------------------
201  XRootDStatus Send( Message *msg,
202  MsgHandler *handler,
203  bool stateful,
204  time_t expires );
205 
206  //------------------------------------------------------------------------
208  //------------------------------------------------------------------------
209  void SetTransport( TransportHandler *transport )
210  {
211  pTransport = transport;
212  }
213 
214  //------------------------------------------------------------------------
216  //------------------------------------------------------------------------
217  void SetPoller( Poller *poller )
218  {
219  pPoller = poller;
220  }
221 
222  //------------------------------------------------------------------------
224  //------------------------------------------------------------------------
225  void SetIncomingQueue( InQueue *incomingQueue )
226  {
227  pIncomingQueue = incomingQueue;
228  }
229 
230  //------------------------------------------------------------------------
232  //------------------------------------------------------------------------
233  void SetChannel( std::weak_ptr<Channel> &channel )
234  {
235  pChannel = channel;
236  }
237 
238  //------------------------------------------------------------------------
240  //------------------------------------------------------------------------
241  void SetChannelData( AnyObject *channelData )
242  {
243  pChannelData = channelData;
244  }
245 
246  //------------------------------------------------------------------------
248  //------------------------------------------------------------------------
249  void SetTaskManager( TaskManager *taskManager )
250  {
251  pTaskManager = taskManager;
252  }
253 
254  //------------------------------------------------------------------------
256  //------------------------------------------------------------------------
257  void SetJobManager( JobManager *jobManager )
258  {
259  pJobManager = jobManager;
260  }
261 
262  //------------------------------------------------------------------------
266  //------------------------------------------------------------------------
267  XRootDStatus EnableLink( PathID &path );
268 
269  //------------------------------------------------------------------------
272  //------------------------------------------------------------------------
273  void Finalize();
274 
275  //------------------------------------------------------------------------
278  //------------------------------------------------------------------------
279  void Tick( time_t now );
280 
281  //------------------------------------------------------------------------
283  //------------------------------------------------------------------------
284  const URL *GetURL() const
285  {
286  return pUrl;
287  }
288 
289  //------------------------------------------------------------------------
291  //------------------------------------------------------------------------
292  void ForceConnect();
293 
294  //------------------------------------------------------------------------
296  //------------------------------------------------------------------------
297  const std::string &GetName() const
298  {
299  return pStreamName;
300  }
301 
302  //------------------------------------------------------------------------
304  //------------------------------------------------------------------------
305  void DisableIfEmpty( uint16_t subStream );
306 
307  //------------------------------------------------------------------------
309  //------------------------------------------------------------------------
310  void OnIncoming( uint16_t subStream,
311  std::shared_ptr<Message> msg,
312  uint32_t bytesReceived );
313 
314  //------------------------------------------------------------------------
315  // Call when one of the sockets is ready to accept a new message
316  //------------------------------------------------------------------------
317  std::pair<Message *, MsgHandler *>
318  OnReadyToWrite( uint16_t subStream );
319 
320  //------------------------------------------------------------------------
321  // Call when a message is written to the socket
322  //------------------------------------------------------------------------
323  void OnMessageSent( uint16_t subStream,
324  Message *msg,
325  uint32_t bytesSent );
326 
327  //------------------------------------------------------------------------
329  //------------------------------------------------------------------------
330  void OnConnect( uint16_t subStream );
331 
332  //------------------------------------------------------------------------
334  //------------------------------------------------------------------------
335  void OnConnectError( uint16_t subStream, XRootDStatus status );
336 
337  //------------------------------------------------------------------------
339  //------------------------------------------------------------------------
340  void OnError( uint16_t subStream, XRootDStatus status );
341 
342  //------------------------------------------------------------------------
344  //------------------------------------------------------------------------
345  void ForceError( XRootDStatus status, const bool hush,
346  const uint64_t sess );
347 
348  //------------------------------------------------------------------------
350  //------------------------------------------------------------------------
351  bool OnReadTimeout( uint16_t subStream ) XRD_WARN_UNUSED_RESULT;
352 
353  //------------------------------------------------------------------------
355  //------------------------------------------------------------------------
356  bool OnWriteTimeout( uint16_t subStream ) XRD_WARN_UNUSED_RESULT;
357 
358  //------------------------------------------------------------------------
360  //------------------------------------------------------------------------
361  void RegisterEventHandler( ChannelEventHandler *handler );
362 
363  //------------------------------------------------------------------------
365  //------------------------------------------------------------------------
366  void RemoveEventHandler( ChannelEventHandler *handler );
367 
368  //------------------------------------------------------------------------
377  //------------------------------------------------------------------------
378  MsgHandler*
379  InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream );
380 
381  //------------------------------------------------------------------------
385  //------------------------------------------------------------------------
386  uint16_t InspectStatusRsp( uint16_t stream, MsgHandler *&incHandler );
387 
388  //------------------------------------------------------------------------
390  //------------------------------------------------------------------------
391  void SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
392  {
393  StreamMutexHelper scopedLock( pMutex );
394  pOnDataConnJob = onConnJob;
395  }
396 
397  //------------------------------------------------------------------------
400  //------------------------------------------------------------------------
401  bool CanCollapse( const URL &url );
402 
403  //------------------------------------------------------------------------
405  //------------------------------------------------------------------------
406  Status Query( uint16_t query, AnyObject &result );
407 
408  //------------------------------------------------------------------------
413  //------------------------------------------------------------------------
414  std::shared_ptr<Channel> GetChannel()
415  {
416  return pChannel.lock();
417  }
418 
419  private:
420 
421  //------------------------------------------------------------------------
423  //------------------------------------------------------------------------
424  static bool IsPartial( Message &msg );
425 
426  //------------------------------------------------------------------------
428  //------------------------------------------------------------------------
429  inline static bool HasNetAddr( const XrdNetAddr &addr,
430  std::vector<XrdNetAddr> &addresses )
431  {
432  auto itr = addresses.begin();
433  for( ; itr != addresses.end() ; ++itr )
434  {
435  if( itr->Same( &addr ) ) return true;
436  }
437 
438  return false;
439  }
440 
441  //------------------------------------------------------------------------
442  // Used under error conditions to move handlers from the out & in queue
443  // helpers back to main out queue for the subStream or the in queue.
444  //------------------------------------------------------------------------
445  void Reinsert( uint16_t subStream );
446 
447  //------------------------------------------------------------------------
448  // Job handling the incoming messages
449  //------------------------------------------------------------------------
450  class HandleIncMsgJob: public Job
451  {
452  public:
453  HandleIncMsgJob( MsgHandler *handler ): pHandler( handler ) {};
454  virtual ~HandleIncMsgJob() {};
455  virtual void Run( void* )
456  {
457  pHandler->Process();
458  delete this;
459  }
460  private:
461  MsgHandler *pHandler;
462  };
463 
464  //------------------------------------------------------------------------
466  //------------------------------------------------------------------------
467  void OnFatalError( uint16_t subStream,
468  XRootDStatus status,
469  StreamMutexHelper &lock );
470 
471  //------------------------------------------------------------------------
473  //------------------------------------------------------------------------
474  void MonitorDisconnection( XRootDStatus status );
475 
476  //------------------------------------------------------------------------
478  //------------------------------------------------------------------------
479  XRootDStatus RequestClose( Message &resp );
480 
481  //------------------------------------------------------------------------
483  //------------------------------------------------------------------------
484  void SockHandlerClose( uint16_t subStream );
485 
486 
487  typedef std::vector<SubStreamData*> SubStreamList;
488 
489  //------------------------------------------------------------------------
490  // Data members
491  //------------------------------------------------------------------------
492  const URL *pUrl;
493  const URL pPrefer;
494  std::string pStreamName;
495  TransportHandler *pTransport;
496  Poller *pPoller;
497  TaskManager *pTaskManager;
498  JobManager *pJobManager;
499  StreamMutex pMutex;
500  InQueue *pIncomingQueue;
501  AnyObject *pChannelData;
502  uint32_t pLastStreamError;
503  XRootDStatus pLastFatalError;
504  uint16_t pStreamErrorWindow;
505  uint16_t pConnectionCount;
506  uint16_t pConnectionRetry;
507  time_t pConnectionInitTime;
508  uint16_t pConnectionWindow;
509  SubStreamList pSubStreams;
510  std::vector<XrdNetAddr> pAddresses;
511  Utils::AddressType pAddressType;
512  ChannelHandlerList pChannelEvHandlers;
513  uint64_t pSessionId;
514 
515  //------------------------------------------------------------------------
516  // Monitoring info
517  //------------------------------------------------------------------------
518  timeval pConnectionStarted;
519  timeval pConnectionDone;
520  std::atomic<uint64_t> pBytesSent;
521  std::atomic<uint64_t> pBytesReceived;
522 
523  //------------------------------------------------------------------------
524  // Data stream on-connect handler
525  //------------------------------------------------------------------------
526  std::shared_ptr<Job> pOnDataConnJob;
527 
528  //------------------------------------------------------------------------
529  // Track last assigned Id across all Streams, to ensure unique sessionId
530  //------------------------------------------------------------------------
531  static RAtomic_uint64_t sSessCntGen;
532 
533  //------------------------------------------------------------------------
534  // Non owning copy of the shared_ptr PostMaster creates for our Channel
535  //------------------------------------------------------------------------
536  std::weak_ptr<Channel> pChannel;
537  };
538 }
539 
540 #endif // __XRD_CL_STREAM_HH__
#define XRD_WARN_UNUSED_RESULT
A synchronize queue for incoming data.
Definition: XrdClInQueue.hh:37
A synchronized queue.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
Interface for socket pollers.
Definition: XrdClPoller.hh:88
StreamMutexHelper(StreamMutex &sm, const std::function< void()> &func, bool &isclosing)
Definition: XrdClStream.hh:144
StreamMutexHelper(StreamMutex &sm)
Definition: XrdClStream.hh:132
StreamMutexHelper(StreamMutex &sm, uint16_t idx, bool &isclosing)
Definition: XrdClStream.hh:137
void UnLock()
UnLock.
Definition: XrdClStream.cc:238
void AddClosing(uint16_t subStream)
AddClosing. Notified that subStream will be closed.
Definition: XrdClStream.cc:98
std::map< pthread_t, std::list< MtxInfo >::iterator > mthmap
Definition: XrdClStream.hh:119
std::list< MtxInfo >::iterator fnlistit
Definition: XrdClStream.hh:121
void Lock()
Lock. Regular, non-subStream aware recursive lock.
Definition: XrdClStream.cc:119
XrdSysCondVar mcv
Definition: XrdClStream.hh:116
void RemoveClosing(uint16_t subStream)
RemoveClosing. Notified that subStream close has completed.
Definition: XrdClStream.cc:108
std::map< uint16_t, size_t > mclosing
Definition: XrdClStream.hh:118
std::list< MtxInfo > mlist
Definition: XrdClStream.hh:117
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
Definition: XrdClStream.cc:501
void SetTransport(TransportHandler *transport)
Set the transport.
Definition: XrdClStream.hh:209
StreamStatus
Status of the stream.
Definition: XrdClStream.hh:176
@ Disconnected
Not connected.
Definition: XrdClStream.hh:177
@ Error
Broken.
Definition: XrdClStream.hh:180
@ Connected
Connected.
Definition: XrdClStream.hh:178
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:179
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
Definition: XrdClStream.hh:225
void SetChannel(std::weak_ptr< Channel > &channel)
Sets a weak_ptr of our owning Channel.
Definition: XrdClStream.hh:233
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void SetPoller(Poller *poller)
Set the poller.
Definition: XrdClStream.hh:217
void ForceConnect()
Force connection.
Definition: XrdClStream.cc:552
void SetTaskManager(TaskManager *taskManager)
Set task manager.
Definition: XrdClStream.hh:249
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
Definition: XrdClStream.hh:391
void SetJobManager(JobManager *jobManager)
Set job manager.
Definition: XrdClStream.hh:257
Status Query(uint16_t query, AnyObject &result)
Query the stream.
const std::string & GetName() const
Return stream name.
Definition: XrdClStream.hh:297
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:391
Stream(const URL *url, const URL &prefer=URL())
Constructor.
Definition: XrdClStream.cc:299
std::shared_ptr< Channel > GetChannel()
Definition: XrdClStream.hh:414
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:834
void Tick(time_t now)
Definition: XrdClStream.cc:584
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:947
~Stream()
Destructor.
Definition: XrdClStream.cc:356
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
Definition: XrdClStream.cc:794
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
Definition: XrdClStream.cc:812
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:678
void OnError(uint16_t subStream, XRootDStatus status)
On error.
const URL * GetURL() const
Get the URL.
Definition: XrdClStream.hh:284
void ForceError(XRootDStatus status, const bool hush, const uint64_t sess)
Force error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
void SetChannelData(AnyObject *channelData)
Set the channel data.
Definition: XrdClStream.hh:241
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
Definition: XrdClStream.cc:752
XRootDStatus Initialize()
Initializer.
Definition: XrdClStream.cc:375
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
Perform the handshake and the authentication for each physical stream.
URL representation.
Definition: XrdClURL.hh:31
AddressType
Address type.
Definition: XrdClUtils.hh:87
Procedure execution status.
Definition: XrdClStatus.hh:115
std::function< void()> fn
Definition: XrdClStream.hh:113
MtxInfo(const std::function< void()> &func)
Definition: XrdClStream.hh:109