XRootD
XrdClPostMaster.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #include "XrdCl/XrdClPostMaster.hh"
22 #include "XrdCl/XrdClMessage.hh"
23 #include "XrdCl/XrdClConstants.hh"
24 #include "XrdCl/XrdClDefaultEnv.hh"
25 #include "XrdCl/XrdClPoller.hh"
27 #include "XrdCl/XrdClJobManager.hh"
29 #include "XrdCl/XrdClChannel.hh"
30 #include "XrdCl/XrdClConstants.hh"
31 #include "XrdCl/XrdClLog.hh"
33 
34 #include "XrdSys/XrdSysPthread.hh"
35 
36 namespace XrdCl
37 {
38  struct ConnErrJob : public Job
39  {
41  std::function<void( const URL&, const XRootDStatus& )> handler) : url( url ),
42  status( status ),
43  handler( handler )
44  {
45  }
46 
47  void Run( void *arg )
48  {
49  handler( url, status );
50  delete this;
51  }
52 
55  std::function<void( const URL&, const XRootDStatus& )> handler;
56  };
57 
59  {
60  PostMasterImpl() : pPoller( 0 ), pInitialized( false ), pRunning( false )
61  {
62  Env *env = DefaultEnv::GetEnv();
63  int workerThreads = DefaultWorkerThreads;
64  env->GetInt( "WorkerThreads", workerThreads );
65 
66  pTaskManager = new TaskManager();
67  pJobManager = new JobManager(workerThreads);
68  }
69 
71  {
72  delete pPoller;
73  delete pTaskManager;
74  delete pJobManager;
75  }
76 
77  typedef std::map<std::string, Channel*> ChannelMap;
78  typedef std::map<const URL*, Channel*> CollapsedMap;
79 
84 
85  // lock MapMutex if accessing maps while holding pDisconnectLock Read
86  // if MapMutex is required, acquire after pDisconnetLock.
88 
90  bool pRunning;
92 
94  std::unique_ptr<Job> pOnConnJob;
95  std::function<void( const URL&, const XRootDStatus& )> pOnConnErrCB;
96 
97  // take DisconnectLock: Read while using Channel* from map.
98  // Write if destroying Channel
99  // if MapMutex is required, acquire DisconnectLock first.
101  };
102 
103  //----------------------------------------------------------------------------
104  // Constructor
105  //----------------------------------------------------------------------------
107  {
108  }
109 
110  //----------------------------------------------------------------------------
111  // Destructor
112  //----------------------------------------------------------------------------
114  {
115  }
116 
117  //----------------------------------------------------------------------------
118  // Initializer
119  //----------------------------------------------------------------------------
121  {
122  Env *env = DefaultEnv::GetEnv();
123  std::string pollerPref = DefaultPollerPreference;
124  env->GetString( "PollerPreference", pollerPref );
125 
126  pImpl->pPoller = PollerFactory::CreatePoller( pollerPref );
127 
128  if( !pImpl->pPoller )
129  return false;
130 
131  bool st = pImpl->pPoller->Initialize();
132 
133  if( !st )
134  {
135  delete pImpl->pPoller;
136  return false;
137  }
138 
139  pImpl->pJobManager->Initialize();
140  pImpl->pInitialized = true;
141  return true;
142  }
143 
144  //----------------------------------------------------------------------------
145  // Finalizer
146  //----------------------------------------------------------------------------
148  {
149  //--------------------------------------------------------------------------
150  // Clean up the channels
151  //--------------------------------------------------------------------------
152  if( !pImpl->pInitialized )
153  return true;
154 
155  pImpl->pInitialized = false;
156  pImpl->pJobManager->Finalize();
157  PostMasterImpl::ChannelMap::iterator it;
158 
159  for( it = pImpl->pChannelMap.begin(); it != pImpl->pChannelMap.end(); ++it )
160  delete it->second;
161 
162  pImpl->pChannelMap.clear();
163  return pImpl->pPoller->Finalize();
164  }
165 
166  //----------------------------------------------------------------------------
167  // Start the post master
168  //----------------------------------------------------------------------------
170  {
171  if( !pImpl->pInitialized )
172  return false;
173 
174  if( !pImpl->pPoller->Start() )
175  return false;
176 
177  if( !pImpl->pTaskManager->Start() )
178  {
179  pImpl->pPoller->Stop();
180  return false;
181  }
182 
183  if( !pImpl->pJobManager->Start() )
184  {
185  pImpl->pPoller->Stop();
186  pImpl->pTaskManager->Stop();
187  return false;
188  }
189 
190  pImpl->pRunning = true;
191  return true;
192  }
193 
194  //----------------------------------------------------------------------------
195  // Stop the postmaster
196  //----------------------------------------------------------------------------
198  {
199  if( !pImpl->pInitialized || !pImpl->pRunning )
200  return true;
201 
202  if( !pImpl->pJobManager->Stop() )
203  return false;
204  if( !pImpl->pPoller->Stop() )
205  return false;
206  if( !pImpl->pTaskManager->Stop() )
207  return false;
208  pImpl->pRunning = false;
209  return true;
210  }
211 
212  //----------------------------------------------------------------------------
213  // Reinitialize after fork
214  //----------------------------------------------------------------------------
216  {
217  return true;
218  }
219 
220  //----------------------------------------------------------------------------
221  // Send the message asynchronously
222  //----------------------------------------------------------------------------
224  Message *msg,
225  MsgHandler *handler,
226  bool stateful,
227  time_t expires )
228  {
229  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
230  Channel *channel = GetChannel( url );
231 
232  if( !channel )
234 
235  return channel->Send( msg, handler, stateful, expires );
236  }
237 
239  Message *msg,
240  MsgHandler *inHandler )
241  {
243  VirtualRedirector *redirector = registry.Get( url );
244  if( !redirector )
245  return Status( stError, errInvalidOp );
246  return redirector->HandleRequest( msg, inHandler );
247  }
248 
249  //----------------------------------------------------------------------------
250  // Query the transport handler
251  //----------------------------------------------------------------------------
253  uint16_t query,
254  AnyObject &result )
255  {
256  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
257  Channel *channel = 0;
258  {
259  XrdSysMutexHelper scopedLock2( pImpl->pChannelMapMutex );
260  PostMasterImpl::ChannelMap::iterator it =
261  pImpl->pChannelMap.find( url.GetChannelId() );
262  if( it == pImpl->pChannelMap.end() )
263  return Status( stError, errInvalidOp );
264  channel = it->second;
265  }
266 
267  if( !channel )
268  return Status( stError, errNotSupported );
269 
270  return channel->QueryTransport( query, result );
271  }
272 
273  //----------------------------------------------------------------------------
274  // Register channel event handler
275  //----------------------------------------------------------------------------
277  ChannelEventHandler *handler )
278  {
279  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
280  Channel *channel = GetChannel( url );
281 
282  if( !channel )
283  return Status( stError, errNotSupported );
284 
285  channel->RegisterEventHandler( handler );
286  return Status();
287  }
288 
289  //----------------------------------------------------------------------------
290  // Remove a channel event handler
291  //----------------------------------------------------------------------------
293  ChannelEventHandler *handler )
294  {
295  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
296  Channel *channel = GetChannel( url );
297 
298  if( !channel )
299  return Status( stError, errNotSupported );
300 
301  channel->RemoveEventHandler( handler );
302  return Status();
303  }
304 
305  //------------------------------------------------------------------------
306  // Get the task manager object user by the post master
307  //------------------------------------------------------------------------
309  {
310  return pImpl->pTaskManager;
311  }
312 
313  //------------------------------------------------------------------------
314  // Get the job manager object user by the post master
315  //------------------------------------------------------------------------
317  {
318  return pImpl->pJobManager;
319  }
320 
321  //------------------------------------------------------------------------
322  // Shut down a channel
323  //------------------------------------------------------------------------
325  {
326  return ForceDisconnect(url, false);
327  }
328 
329  //------------------------------------------------------------------------
330  // Shut down a channel
331  //------------------------------------------------------------------------
332  Status PostMaster::ForceDisconnect( const URL &url, bool hush )
333  {
334  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock, false );
335  {
336  //--------------------------------------------------------------------
337  // See if this is called by channel replaced by collapse, reaching TTL
338  //--------------------------------------------------------------------
339  PostMasterImpl::CollapsedMap::iterator it =
340  pImpl->pCollapsedMap.find( &url );
341  if( it != pImpl->pCollapsedMap.end() )
342  {
343  Channel *passive = it->second;
344  passive->ForceDisconnect( hush );
345  delete passive;
346  pImpl->pCollapsedMap.erase( it );
347  return Status();
348  }
349  }
350 
351  PostMasterImpl::ChannelMap::iterator it =
352  pImpl->pChannelMap.find( url.GetChannelId() );
353 
354  if( it == pImpl->pChannelMap.end() )
355  return Status( stError, errInvalidOp );
356 
357  it->second->ForceDisconnect( hush );
358  delete it->second;
359  pImpl->pChannelMap.erase( it );
360 
361  return Status();
362  }
363 
365  {
366  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock, false );
367  PostMasterImpl::ChannelMap::iterator it =
368  pImpl->pChannelMap.find( url.GetChannelId() );
369 
370  if( it == pImpl->pChannelMap.end() )
371  return Status( stError, errInvalidOp );
372 
373  it->second->ForceReconnect();
374  return Status();
375  }
376 
377  //------------------------------------------------------------------------
378  // Get the number of connected data streams
379  //------------------------------------------------------------------------
380  uint16_t PostMaster::NbConnectedStrm( const URL &url )
381  {
382  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
383  Channel *channel = GetChannel( url );
384  if( !channel ) return 0;
385  return channel->NbConnectedStrm();
386  }
387 
388  //------------------------------------------------------------------------
390  //------------------------------------------------------------------------
392  std::shared_ptr<Job> onConnJob )
393  {
394  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
395  Channel *channel = GetChannel( url );
396  if( !channel ) return;
397  channel->SetOnDataConnectHandler( onConnJob );
398  }
399 
400  //------------------------------------------------------------------------
402  //------------------------------------------------------------------------
403  void PostMaster::SetOnConnectHandler( std::unique_ptr<Job> onConnJob )
404  {
405  XrdSysMutexHelper lck( pImpl->pMtx );
406  pImpl->pOnConnJob = std::move( onConnJob );
407  }
408 
409  //------------------------------------------------------------------------
410  // Set the global connection error handler
411  //------------------------------------------------------------------------
412  void PostMaster::SetConnectionErrorHandler( std::function<void( const URL&, const XRootDStatus& )> handler )
413  {
414  XrdSysMutexHelper lck( pImpl->pMtx );
415  pImpl->pOnConnErrCB = std::move( handler );
416  }
417 
418  //------------------------------------------------------------------------
419  // Notify the global on-connect handler
420  //------------------------------------------------------------------------
422  {
423  XrdSysMutexHelper lck( pImpl->pMtx );
424  if( pImpl->pOnConnJob )
425  {
426  URL *ptr = new URL( url );
427  pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
428  }
429  }
430 
431  //------------------------------------------------------------------------
432  // Notify the global error connection handler
433  //------------------------------------------------------------------------
434  void PostMaster::NotifyConnErrHandler( const URL &url, const XRootDStatus &status )
435  {
436  XrdSysMutexHelper lck( pImpl->pMtx );
437  if( pImpl->pOnConnErrCB )
438  {
439  ConnErrJob *job = new ConnErrJob( url, status, pImpl->pOnConnErrCB );
440  pImpl->pJobManager->QueueJob( job, nullptr );
441  }
442  }
443 
444  //----------------------------------------------------------------------------
446  //----------------------------------------------------------------------------
447  void PostMaster::CollapseRedirect( const URL &alias, const URL &url )
448  {
449  XrdSysRWLockHelper scopedDiscLock( pImpl->pDisconnectLock );
450  XrdSysMutexHelper scopedMapLock( pImpl->pChannelMapMutex );
451 
452  //--------------------------------------------------------------------------
453  // Get the passive channel
454  //--------------------------------------------------------------------------
455  PostMasterImpl::ChannelMap::iterator it =
456  pImpl->pChannelMap.find( alias.GetChannelId() );
457  Channel *passive = 0;
458  if( it != pImpl->pChannelMap.end() )
459  passive = it->second;
460  //--------------------------------------------------------------------------
461  // If the channel does not exist there's nothing to do
462  //--------------------------------------------------------------------------
463  else return;
464 
465  //--------------------------------------------------------------------------
466  // Check if this URL is eligible for collapsing
467  //--------------------------------------------------------------------------
468  if( !passive->CanCollapse( url ) ) return;
469 
470  //--------------------------------------------------------------------------
471  // Create the active channel
472  //--------------------------------------------------------------------------
474  TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
475 
476  if( !trHandler )
477  {
478  Log *log = DefaultEnv::GetLog();
479  log->Error( PostMasterMsg, "Unable to get transport handler for %s "
480  "protocol", url.GetProtocol().c_str() );
481  return;
482  }
483 
484  Log *log = DefaultEnv::GetLog();
485  log->Info( PostMasterMsg, "Label channel %s with alias %s.",
486  url.GetHostId().c_str(), alias.GetHostId().c_str() );
487 
488  Channel *active = new Channel( alias, pImpl->pPoller, trHandler,
489  pImpl->pTaskManager, pImpl->pJobManager, url );
490  pImpl->pChannelMap[alias.GetChannelId()] = active;
491  pImpl->pCollapsedMap[&passive->GetURL()] = passive;
492 
493  //--------------------------------------------------------------------------
494  // The passive channel will be deallocated by TTL
495  //--------------------------------------------------------------------------
496  }
497 
498  //------------------------------------------------------------------------
499  // Decrement file object instance count bound to this channel
500  //------------------------------------------------------------------------
501  void PostMaster::DecFileInstCnt( const URL &url )
502  {
503  XrdSysRWLockHelper scopedLock( pImpl->pDisconnectLock );
504  Channel *channel = GetChannel( url );
505 
506  if( !channel ) return;
507 
508  return channel->DecFileInstCnt();
509  }
510 
511  //------------------------------------------------------------------------
512  //true if underlying threads are running, false otherwise
513  //------------------------------------------------------------------------
515  {
516  return pImpl->pRunning;
517  }
518 
519  //----------------------------------------------------------------------------
520  // Get the channel
521  //----------------------------------------------------------------------------
522  Channel *PostMaster::GetChannel( const URL &url )
523  {
524  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
525  Channel *channel = 0;
526  PostMasterImpl::ChannelMap::iterator it = pImpl->pChannelMap.find( url.GetChannelId() );
527 
528  if( it == pImpl->pChannelMap.end() )
529  {
531  TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
532 
533  if( !trHandler )
534  {
535  Log *log = DefaultEnv::GetLog();
536  log->Error( PostMasterMsg, "Unable to get transport handler for %s "
537  "protocol", url.GetProtocol().c_str() );
538  return 0;
539  }
540 
541  channel = new Channel( url, pImpl->pPoller, trHandler, pImpl->pTaskManager,
542  pImpl->pJobManager );
543  pImpl->pChannelMap[url.GetChannelId()] = channel;
544  }
545  else
546  channel = it->second;
547  return channel;
548  }
549 }
A communication channel between the client and the server.
Definition: XrdClChannel.hh:49
uint16_t NbConnectedStrm()
Get the number of connected data streams.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
const URL & GetURL() const
Get the URL.
Definition: XrdClChannel.hh:75
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void DecFileInstCnt()
Decrement file object instance count bound to this channel.
Status ForceDisconnect()
Force disconnect of all streams.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
bool CanCollapse(const URL &url)
Status QueryTransport(uint16_t query, AnyObject &result)
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A synchronized queue.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
static Poller * CreatePoller(const std::string &preference)
Interface for socket pollers.
Definition: XrdClPoller.hh:87
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
bool Start()
Start the post master.
bool Finalize()
Finalizer.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status ForceReconnect(const URL &url)
Reconnect the channel.
bool Stop()
Stop the postmaster.
bool Reinitialize()
Reinitialize after fork.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
uint16_t NbConnectedStrm(const URL &url)
Get the number of connected data streams.
void SetOnConnectHandler(std::unique_ptr< Job > onConnJob)
Set the global connection error handler.
Status RemoveEventHandler(const URL &url, ChannelEventHandler *handler)
Remove a channel event handler.
virtual ~PostMaster()
Destructor.
PostMaster()
Constructor.
void SetConnectionErrorHandler(std::function< void(const URL &, const XRootDStatus &)> handler)
Set the global on-error on-connect handler for control streams.
Status ForceDisconnect(const URL &url)
Shut down a channel.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
Status RegisterEventHandler(const URL &url, ChannelEventHandler *handler)
Register channel event handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
JobManager * GetJobManager()
Get the job manager object user by the post master.
bool Initialize()
Initializer.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Perform the handshake and the authentication for each physical stream.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
URL representation.
Definition: XrdClURL.hh:31
std::string GetChannelId() const
Definition: XrdClURL.cc:512
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
An interface for metadata redirectors.
virtual XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)=0
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t PostMasterMsg
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const char *const DefaultPollerPreference
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const int DefaultWorkerThreads
void Run(void *arg)
The job logic.
XRootDStatus status
ConnErrJob(const URL &url, const XRootDStatus &status, std::function< void(const URL &, const XRootDStatus &)> handler)
std::function< void(const URL &, const XRootDStatus &)> handler
TaskManager * pTaskManager
std::map< std::string, Channel * > ChannelMap
std::unique_ptr< Job > pOnConnJob
XrdSysRWLock pDisconnectLock
XrdSysMutex pChannelMapMutex
std::map< const URL *, Channel * > CollapsedMap
std::function< void(const URL &, const XRootDStatus &)> pOnConnErrCB
CollapsedMap pCollapsedMap
Procedure execution status.
Definition: XrdClStatus.hh:115