XRootD
XrdClChannel.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdCl/XrdClChannel.hh"
26 #include "XrdCl/XrdClDefaultEnv.hh"
27 #include "XrdCl/XrdClStream.hh"
28 #include "XrdCl/XrdClSocket.hh"
29 #include "XrdCl/XrdClConstants.hh"
30 #include "XrdCl/XrdClLog.hh"
33 #include "XrdSys/XrdSysPthread.hh"
34 
35 #include <ctime>
36 
37 namespace XrdCl
38 {
40  {
41  public:
42  //------------------------------------------------------------------------
43  // Constructor
44  //------------------------------------------------------------------------
45  TickGeneratorTask( XrdCl::Channel *channel, const std::string &hostId ):
46  pChannel( channel )
47  {
48  std::string name = "TickGeneratorTask for: ";
49  name += hostId;
50  SetName( name );
51  }
52 
53  //------------------------------------------------------------------------
54  // Run the task
55  //------------------------------------------------------------------------
56  time_t Run( time_t now )
57  {
58  XrdSysMutexHelper lck( pMtx );
59  if( !pChannel ) return 0;
60 
61  using namespace XrdCl;
62  pChannel->Tick( now );
63 
64  Env *env = DefaultEnv::GetEnv();
65  int timeoutResolution = DefaultTimeoutResolution;
66  env->GetInt( "TimeoutResolution", timeoutResolution );
67  return now+timeoutResolution;
68  }
69 
70  void Invalidate()
71  {
72  XrdSysMutexHelper lck( pMtx );
73  pChannel = 0;
74  }
75 
76  private:
77  XrdCl::Channel *pChannel;
78  XrdSysMutex pMtx;
79  };
80 
81  //----------------------------------------------------------------------------
82  // Constructor
83  //----------------------------------------------------------------------------
84  Channel::Channel( const URL &url,
85  Poller *poller,
86  TransportHandler *transport,
87  TaskManager *taskManager,
88  JobManager *jobManager,
89  const URL &prefurl ):
90  pUrl( url.GetHostId() ),
91  pPoller( poller ),
92  pTransport( transport ),
93  pTaskManager( taskManager ),
94  pTickGenerator( 0 ),
95  pJobManager( jobManager )
96  {
97  Env *env = DefaultEnv::GetEnv();
98  Log *log = DefaultEnv::GetLog();
99 
100  int timeoutResolution = DefaultTimeoutResolution;
101  env->GetInt( "TimeoutResolution", timeoutResolution );
102 
103  pTransport->InitializeChannel( url, pChannelData );
104  log->Debug( PostMasterMsg, "Creating new channel to: %s",
105  url.GetChannelId().c_str() );
106 
107  pUrl.SetParams( url.GetParams() );
108  pUrl.SetProtocol( url.GetProtocol() );
109 
110  //--------------------------------------------------------------------------
111  // Create the stream
112  //--------------------------------------------------------------------------
113  pStream = std::make_unique<Stream>( &pUrl, prefurl );
114  pStream->SetTransport( transport );
115  pStream->SetPoller( poller );
116  pStream->SetIncomingQueue( &pIncoming );
117  pStream->SetTaskManager( taskManager );
118  pStream->SetJobManager( jobManager );
119  pStream->SetChannelData( &pChannelData );
120  pStream->Initialize();
121 
122  //--------------------------------------------------------------------------
123  // Register the task generating timeout events
124  //--------------------------------------------------------------------------
125  pTickGenerator = new TickGeneratorTask( this, pUrl.GetChannelId() );
126  pTaskManager->RegisterTask( pTickGenerator, ::time(0)+timeoutResolution );
127  }
128 
129  //----------------------------------------------------------------------------
130  // Destructor
131  //----------------------------------------------------------------------------
133  {
134  pTickGenerator->Invalidate();
135  pStream.reset();
136  pTransport->FinalizeChannel( pChannelData );
137  }
138 
139  //----------------------------------------------------------------------------
140  // Send the message asynchronously
141  //----------------------------------------------------------------------------
143  MsgHandler *handler,
144  bool stateful,
145  time_t expires )
146 
147  {
148  return pStream->Send( msg, handler, stateful, expires );
149  }
150 
151  //----------------------------------------------------------------------------
152  // Handle a time event
153  //----------------------------------------------------------------------------
154  void Channel::Tick( time_t now )
155  {
156  pStream->Tick( now );
157  }
158 
159  //----------------------------------------------------------------------------
160  // Finalize stream
161  //----------------------------------------------------------------------------
163  {
164  pStream->Finalize();
165  }
166 
167  //----------------------------------------------------------------------------
168  // Force disconnect of all streams
169  //----------------------------------------------------------------------------
171  {
172  return ForceDisconnect(false);
173  }
174 
175  //----------------------------------------------------------------------------
176  // Force disconnect of all streams
177  //----------------------------------------------------------------------------
179  {
180  //--------------------------------------------------------------------------
181  // Disconnect the stream and all substreams
182  //--------------------------------------------------------------------------
183  pStream->ForceError( Status( stError, errOperationInterrupted ), hush, 0);
184 
185  return Status();
186  }
187 
188  //----------------------------------------------------------------------------
189  // Force disconnect of all streams. Internal version.
190  //----------------------------------------------------------------------------
191  Status Channel::ForceDisconnect( std::shared_ptr<Channel> self,
192  const uint64_t sess )
193  {
194  //--------------------------------------------------------------------------
195  // Disconnect the stream and all substreams
196  //--------------------------------------------------------------------------
197  pStream->ForceError( Status( stError, errOperationInterrupted ), false, sess );
198 
199  return Status();
200  }
201 
202  //----------------------------------------------------------------------------
203  // Force reconnect
204  //----------------------------------------------------------------------------
206  {
207  pStream->ForceConnect();
208  return Status();
209  }
210 
211  //------------------------------------------------------------------------
212  // Get the number of connected data streams
213  //------------------------------------------------------------------------
215  {
216  return XRootDTransport::NbConnectedStrm( pChannelData );
217  }
218 
219  //------------------------------------------------------------------------
220  // Set the on-connect handler for data streams
221  //------------------------------------------------------------------------
222  void Channel::SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
223  {
224  pStream->SetOnDataConnectHandler( onConnJob );
225  }
226 
227  //------------------------------------------------------------------------
228  // Check if channel can be collapsed using given URL
229  //------------------------------------------------------------------------
230  bool Channel::CanCollapse( const URL &url )
231  {
232  return pStream->CanCollapse( url );
233  }
234 
235  //------------------------------------------------------------------------
236  // Decrement file object instance count bound to this channel
237  //------------------------------------------------------------------------
239  {
240  pTransport->DecFileInstCnt( pChannelData );
241  }
242 
243  //----------------------------------------------------------------------------
244  // Query the transport handler
245  //----------------------------------------------------------------------------
246  Status Channel::QueryTransport( uint16_t query, AnyObject &result )
247  {
248  if( query < 2000 )
249  return pTransport->Query( query, result, pChannelData );
250  return pStream->Query( query, result );
251  }
252 
253  //----------------------------------------------------------------------------
254  // Register channel event handler
255  //----------------------------------------------------------------------------
257  {
258  pStream->RegisterEventHandler( handler );
259  }
260 
261  //------------------------------------------------------------------------
262  // Remove a channel event handler
263  //------------------------------------------------------------------------
265  {
266  pStream->RemoveEventHandler( handler );
267  }
268 
269  void Channel::SetSelf( std::shared_ptr<Channel> &self )
270  {
271  pSelf = self;
272  if( pStream ) pStream->SetChannel( pSelf );
273  }
274 }
uint16_t NbConnectedStrm()
Get the number of connected data streams.
Status ForceReconnect()
Force reconnect.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
~Channel()
Destructor.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void DecFileInstCnt()
Decrement file object instance count bound to this channel.
Status ForceDisconnect()
Force disconnect of all streams.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
void SetSelf(std::shared_ptr< Channel > &self)
Gives us access to the shared pointer that the postmaster holds for us.
void Tick(time_t now)
Handle a time event.
bool CanCollapse(const URL &url)
Status QueryTransport(uint16_t query, AnyObject &result)
Channel(const URL &url, Poller *poller, TransportHandler *transport, TaskManager *taskManager, JobManager *jobManager, const URL &prefurl=URL())
Definition: XrdClChannel.cc:84
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A synchronized queue.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
Interface for socket pollers.
Definition: XrdClPoller.hh:88
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
void SetName(const std::string &name)
Set name of the task.
time_t Run(time_t now)
Definition: XrdClChannel.cc:56
TickGeneratorTask(XrdCl::Channel *channel, const std::string &hostId)
Definition: XrdClChannel.cc:45
Perform the handshake and the authentication for each physical stream.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual void FinalizeChannel(AnyObject &channelData)=0
Finalize channel.
virtual void InitializeChannel(const URL &url, AnyObject &channelData)=0
Initialize channel.
virtual void DecFileInstCnt(AnyObject &channelData)=0
Decrement file object instance count bound to this channel.
URL representation.
Definition: XrdClURL.hh:31
std::string GetChannelId() const
Definition: XrdClURL.cc:512
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:402
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
void SetProtocol(const std::string &protocol)
Set protocol.
Definition: XrdClURL.hh:126
static uint16_t NbConnectedStrm(AnyObject &channelData)
Number of currently connected data streams.
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t PostMasterMsg
const int DefaultTimeoutResolution
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
Procedure execution status.
Definition: XrdClStatus.hh:115