XRootD
XrdClPostMaster.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #include "XrdCl/XrdClPostMaster.hh"
22 #include "XrdCl/XrdClMessage.hh"
23 #include "XrdCl/XrdClConstants.hh"
24 #include "XrdCl/XrdClDefaultEnv.hh"
25 #include "XrdCl/XrdClPoller.hh"
27 #include "XrdCl/XrdClJobManager.hh"
29 #include "XrdCl/XrdClChannel.hh"
30 #include "XrdCl/XrdClConstants.hh"
31 #include "XrdCl/XrdClLog.hh"
33 
34 #include "XrdSys/XrdSysPthread.hh"
35 
36 #include <unordered_set>
37 
38 namespace XrdCl
39 {
40  struct ConnErrJob : public Job
41  {
43  std::function<void( const URL&, const XRootDStatus& )> handler) : url( url ),
44  status( status ),
45  handler( handler )
46  {
47  }
48 
49  void Run( void *arg )
50  {
51  handler( url, status );
52  delete this;
53  }
54 
57  std::function<void( const URL&, const XRootDStatus& )> handler;
58  };
59 
61  {
62  PostMasterImpl() : pPoller( 0 ), pInitialized( false ), pRunning( false )
63  {
64  Env *env = DefaultEnv::GetEnv();
65  int workerThreads = DefaultWorkerThreads;
66  env->GetInt( "WorkerThreads", workerThreads );
67 
68  pTaskManager = new TaskManager();
69  pJobManager = new JobManager(workerThreads);
70  }
71 
73  {
74  delete pPoller;
75  delete pTaskManager;
76  delete pJobManager;
77  }
78 
79  //--------------------------------------------------------------------------
81  //--------------------------------------------------------------------------
82  void addFinalize(Channel *ch)
83  {
85  pFinalizeSet.insert( ch );
86  }
87 
88  //--------------------------------------------------------------------------
90  //--------------------------------------------------------------------------
91  std::shared_ptr<Channel> GetChannel( const URL &url );
92 
93  //--------------------------------------------------------------------------
95  //--------------------------------------------------------------------------
97  {
99  pFinalizeSet.erase( ch );
100  }
101 
102  typedef std::map<std::string, std::shared_ptr<Channel> > ChannelMap;
103 
107  std::unordered_set<Channel*> pFinalizeSet;
108 
109  // Mutex protecting access of pChannelMap
111 
112  // Mutex protecting access of pFinalizeSet
114 
116  bool pRunning;
118 
120  std::unique_ptr<Job> pOnConnJob;
121  std::function<void( const URL&, const XRootDStatus& )> pOnConnErrCB;
122  };
123 
124  //----------------------------------------------------------------------------
125  // Constructor
126  //----------------------------------------------------------------------------
128  {
129  }
130 
131  //----------------------------------------------------------------------------
132  // Destructor
133  //----------------------------------------------------------------------------
135  {
136  }
137 
138  //----------------------------------------------------------------------------
139  // Initializer
140  //----------------------------------------------------------------------------
142  {
143  Env *env = DefaultEnv::GetEnv();
144  std::string pollerPref = DefaultPollerPreference;
145  env->GetString( "PollerPreference", pollerPref );
146 
147  pImpl->pPoller = PollerFactory::CreatePoller( pollerPref );
148 
149  if( !pImpl->pPoller )
150  return false;
151 
152  bool st = pImpl->pPoller->Initialize();
153 
154  if( !st )
155  {
156  delete pImpl->pPoller;
157  return false;
158  }
159 
160  pImpl->pJobManager->Initialize();
161  pImpl->pInitialized = true;
162  return true;
163  }
164 
165  //----------------------------------------------------------------------------
166  // Finalizer
167  //----------------------------------------------------------------------------
169  {
170  //--------------------------------------------------------------------------
171  // Clean up the channels
172  //--------------------------------------------------------------------------
173  if( !pImpl->pInitialized )
174  return true;
175 
176  pImpl->pInitialized = false;
177  pImpl->pJobManager->Finalize();
178 
179  //--------------------------------------------------------------------------
180  // Finalize may cause some of the channels to remove themselves from the
181  // finalize set. So make a copy. Should be no concurrency as poller and
182  // jobmanager are stopped, so no lock.
183  //--------------------------------------------------------------------------
184  auto finSet = pImpl->pFinalizeSet;
185  for( auto ch: finSet ) ch->Finalize();
186 
187  pImpl->pChannelMap.clear();
188  return pImpl->pPoller->Finalize();
189  }
190 
191  //----------------------------------------------------------------------------
192  // Start the post master
193  //----------------------------------------------------------------------------
195  {
196  if( !pImpl->pInitialized )
197  return false;
198 
199  if( !pImpl->pPoller->Start() )
200  return false;
201 
202  if( !pImpl->pTaskManager->Start() )
203  {
204  pImpl->pPoller->Stop();
205  return false;
206  }
207 
208  if( !pImpl->pJobManager->Start() )
209  {
210  pImpl->pPoller->Stop();
211  pImpl->pTaskManager->Stop();
212  return false;
213  }
214 
215  pImpl->pRunning = true;
216  return true;
217  }
218 
219  //----------------------------------------------------------------------------
220  // Stop the postmaster
221  //----------------------------------------------------------------------------
223  {
224  if( !pImpl->pInitialized || !pImpl->pRunning )
225  return true;
226 
227  if( !pImpl->pJobManager->Stop() )
228  return false;
229  if( !pImpl->pPoller->Stop() )
230  return false;
231  if( !pImpl->pTaskManager->Stop() )
232  return false;
233  pImpl->pRunning = false;
234  return true;
235  }
236 
237  //----------------------------------------------------------------------------
238  // Reinitialize after fork
239  //----------------------------------------------------------------------------
241  {
242  return true;
243  }
244 
245  //----------------------------------------------------------------------------
246  // Send the message asynchronously
247  //----------------------------------------------------------------------------
249  Message *msg,
250  MsgHandler *handler,
251  bool stateful,
252  time_t expires )
253  {
254  auto channel = pImpl->GetChannel( url );
255 
256  if( !channel )
258 
259  return channel->Send( msg, handler, stateful, expires );
260  }
261 
263  Message *msg,
264  MsgHandler *inHandler )
265  {
267  VirtualRedirector *redirector = registry.Get( url );
268  if( !redirector )
269  return Status( stError, errInvalidOp );
270  return redirector->HandleRequest( msg, inHandler );
271  }
272 
273  //----------------------------------------------------------------------------
274  // Query the transport handler
275  //----------------------------------------------------------------------------
277  uint16_t query,
278  AnyObject &result )
279  {
280  std::shared_ptr<Channel> channel;
281  {
282  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
283  PostMasterImpl::ChannelMap::iterator it =
284  pImpl->pChannelMap.find( url.GetChannelId() );
285  if( it == pImpl->pChannelMap.end() )
286  return Status( stError, errInvalidOp );
287  channel = it->second;
288  }
289 
290  if( !channel )
291  return Status( stError, errNotSupported );
292 
293  return channel->QueryTransport( query, result );
294  }
295 
296  //----------------------------------------------------------------------------
297  // Register channel event handler
298  //----------------------------------------------------------------------------
300  ChannelEventHandler *handler )
301  {
302  auto channel = pImpl->GetChannel( url );
303 
304  if( !channel )
305  return Status( stError, errNotSupported );
306 
307  channel->RegisterEventHandler( handler );
308  return Status();
309  }
310 
311  //----------------------------------------------------------------------------
312  // Remove a channel event handler
313  //----------------------------------------------------------------------------
315  ChannelEventHandler *handler )
316  {
317  auto channel = pImpl->GetChannel( url );
318 
319  if( !channel )
320  return Status( stError, errNotSupported );
321 
322  channel->RemoveEventHandler( handler );
323  return Status();
324  }
325 
326  //------------------------------------------------------------------------
327  // Get the task manager object user by the post master
328  //------------------------------------------------------------------------
330  {
331  return pImpl->pTaskManager;
332  }
333 
334  //------------------------------------------------------------------------
335  // Get the job manager object user by the post master
336  //------------------------------------------------------------------------
338  {
339  return pImpl->pJobManager;
340  }
341 
342  //------------------------------------------------------------------------
343  // Shut down a channel
344  //------------------------------------------------------------------------
346  {
347  return ForceDisconnect(url, false);
348  }
349 
350  //------------------------------------------------------------------------
351  // Shut down a channel
352  //------------------------------------------------------------------------
353  Status PostMaster::ForceDisconnect( const URL &url, bool hush )
354  {
355  std::shared_ptr<Channel> channel;
356  {
357  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
358  PostMasterImpl::ChannelMap::iterator it =
359  pImpl->pChannelMap.find( url.GetChannelId() );
360 
361  if( it == pImpl->pChannelMap.end() )
362  return Status( stError, errInvalidOp );
363  channel = it->second;
364  pImpl->pChannelMap.erase( it );
365  }
366 
367  channel->ForceDisconnect( hush );
368  return Status();
369  }
370 
371  //------------------------------------------------------------------------
372  // Shut down a channel. This version is used by the channel itself.
373  //------------------------------------------------------------------------
374  Status PostMaster::ForceDisconnect( std::shared_ptr<Channel> channel,
375  const uint64_t sess )
376  {
377  if( !channel )
378  return Status( stError, errNotSupported );
379 
380  {
381  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
382  PostMasterImpl::ChannelMap::iterator it =
383  pImpl->pChannelMap.find( channel->GetURL().GetChannelId() );
384 
385  if( it != pImpl->pChannelMap.end() && it->second == channel )
386  pImpl->pChannelMap.erase( it );
387  }
388 
389  channel->ForceDisconnect( channel, sess );
390  return Status();
391  }
392 
394  {
395  std::shared_ptr<Channel> channel;
396  {
397  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
398  PostMasterImpl::ChannelMap::iterator it =
399  pImpl->pChannelMap.find( url.GetChannelId() );
400 
401  if( it == pImpl->pChannelMap.end() )
402  return Status( stError, errInvalidOp );
403  channel = it->second;
404  }
405 
406  channel->ForceReconnect();
407  return Status();
408  }
409 
410  //------------------------------------------------------------------------
411  // Get the number of connected data streams
412  //------------------------------------------------------------------------
413  uint16_t PostMaster::NbConnectedStrm( const URL &url )
414  {
415  auto channel = pImpl->GetChannel( url );
416  if( !channel ) return 0;
417  return channel->NbConnectedStrm();
418  }
419 
420  //------------------------------------------------------------------------
422  //------------------------------------------------------------------------
424  std::shared_ptr<Job> onConnJob )
425  {
426  auto channel = pImpl->GetChannel( url );
427  if( !channel ) return;
428  channel->SetOnDataConnectHandler( onConnJob );
429  }
430 
431  //------------------------------------------------------------------------
433  //------------------------------------------------------------------------
434  void PostMaster::SetOnConnectHandler( std::unique_ptr<Job> onConnJob )
435  {
436  XrdSysMutexHelper lck( pImpl->pMtx );
437  pImpl->pOnConnJob = std::move( onConnJob );
438  }
439 
440  //------------------------------------------------------------------------
441  // Set the global connection error handler
442  //------------------------------------------------------------------------
443  void PostMaster::SetConnectionErrorHandler( std::function<void( const URL&, const XRootDStatus& )> handler )
444  {
445  XrdSysMutexHelper lck( pImpl->pMtx );
446  pImpl->pOnConnErrCB = std::move( handler );
447  }
448 
449  //------------------------------------------------------------------------
450  // Notify the global on-connect handler
451  //------------------------------------------------------------------------
453  {
454  XrdSysMutexHelper lck( pImpl->pMtx );
455  if( pImpl->pOnConnJob )
456  {
457  URL *ptr = new URL( url );
458  pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
459  }
460  }
461 
462  //------------------------------------------------------------------------
463  // Notify the global error connection handler
464  //------------------------------------------------------------------------
465  void PostMaster::NotifyConnErrHandler( const URL &url, const XRootDStatus &status )
466  {
467  XrdSysMutexHelper lck( pImpl->pMtx );
468  if( pImpl->pOnConnErrCB )
469  {
470  ConnErrJob *job = new ConnErrJob( url, status, pImpl->pOnConnErrCB );
471  pImpl->pJobManager->QueueJob( job, nullptr );
472  }
473  }
474 
475  //----------------------------------------------------------------------------
477  //----------------------------------------------------------------------------
478  void PostMaster::CollapseRedirect( const URL &alias, const URL &url )
479  {
480  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
481  //--------------------------------------------------------------------------
482  // Get the passive channel
483  //--------------------------------------------------------------------------
484  std::shared_ptr<Channel> passive;
485  PostMasterImpl::ChannelMap::iterator it =
486  pImpl->pChannelMap.find( alias.GetChannelId() );
487  if( it != pImpl->pChannelMap.end() )
488  passive = it->second;
489 
490  //--------------------------------------------------------------------------
491  // If the channel does not exist there's nothing to do
492  //--------------------------------------------------------------------------
493  if( !passive ) return;
494 
495  //--------------------------------------------------------------------------
496  // Check if this URL is eligible for collapsing. To avoid depencencies
497  // we don't call CanCollapse while holding the channel map mutex. So we
498  // reverify the content of the map afterwards.
499  //--------------------------------------------------------------------------
500  scopedLock.UnLock();
501  if( !passive->CanCollapse( url ) ) return;
502 
503  scopedLock.Lock( &pImpl->pChannelMapMutex );
504  it = pImpl->pChannelMap.find( alias.GetChannelId() );
505  if( it == pImpl->pChannelMap.end() || it->second != passive )
506  {
507  // something changed. Retry.
508  scopedLock.UnLock();
509  CollapseRedirect( alias, url );
510  return;
511  }
512 
513  //--------------------------------------------------------------------------
514  // Create the active channel
515  //--------------------------------------------------------------------------
517  TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
518 
519  if( !trHandler )
520  {
521  Log *log = DefaultEnv::GetLog();
522  log->Error( PostMasterMsg, "Unable to get transport handler for %s "
523  "protocol", url.GetProtocol().c_str() );
524  return;
525  }
526 
527  Log *log = DefaultEnv::GetLog();
528  log->Info( PostMasterMsg, "Label channel %s with alias %s.",
529  url.GetHostId().c_str(), alias.GetHostId().c_str() );
530 
531  std::shared_ptr<Channel> active(new Channel{ alias,
532  pImpl->pPoller, trHandler, pImpl->pTaskManager, pImpl->pJobManager, url },
533  [this](Channel *ch) { this->pImpl->removeFinalize( ch ); delete ch; });
534  pImpl->addFinalize( active.get() );
535  active->SetSelf( active );
536 
537  pImpl->pChannelMap[alias.GetChannelId()] = active;
538  //--------------------------------------------------------------------------
539  // The passive channel will be deallocated by TTL
540  //--------------------------------------------------------------------------
541  }
542 
543  //------------------------------------------------------------------------
544  // Decrement file object instance count bound to this channel
545  //------------------------------------------------------------------------
546  void PostMaster::DecFileInstCnt( const URL &url )
547  {
548  auto channel = pImpl->GetChannel( url );
549 
550  if( !channel ) return;
551 
552  return channel->DecFileInstCnt();
553  }
554 
555  //------------------------------------------------------------------------
556  //true if underlying threads are running, false otherwise
557  //------------------------------------------------------------------------
559  {
560  return pImpl->pRunning;
561  }
562 
563  //----------------------------------------------------------------------------
564  // Get the channel
565  //----------------------------------------------------------------------------
566  std::shared_ptr<Channel> PostMasterImpl::GetChannel( const URL &url )
567  {
568  XrdSysMutexHelper scopedLock( pChannelMapMutex );
569  std::shared_ptr<Channel> channel;
570  PostMasterImpl::ChannelMap::iterator it = pChannelMap.find( url.GetChannelId() );
571 
572  if( it == pChannelMap.end() )
573  {
575  TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
576 
577  if( !trHandler )
578  {
579  Log *log = DefaultEnv::GetLog();
580  log->Error( PostMasterMsg, "Unable to get transport handler for %s "
581  "protocol", url.GetProtocol().c_str() );
582  return 0;
583  }
584 
585  std::shared_ptr<Channel> newchan(new Channel{ url, pPoller,
586  trHandler, pTaskManager, pJobManager },
587  [this](Channel *ch) { this->removeFinalize( ch ); delete ch; });
588  addFinalize( newchan.get() );
589  channel = newchan;
590  channel->SetSelf( channel );
591 
592  pChannelMap[url.GetChannelId()] = channel;
593  }
594  else
595  channel = it->second;
596  return channel;
597  }
598 }
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A synchronized queue.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
static Poller * CreatePoller(const std::string &preference)
Interface for socket pollers.
Definition: XrdClPoller.hh:88
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
bool Start()
Start the post master.
bool Finalize()
Finalizer.
XRootDStatus Send(const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Status ForceReconnect(const URL &url)
Reconnect the channel.
bool Stop()
Stop the postmaster.
bool Reinitialize()
Reinitialize after fork.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
uint16_t NbConnectedStrm(const URL &url)
Get the number of connected data streams.
void SetOnConnectHandler(std::unique_ptr< Job > onConnJob)
Set the global connection error handler.
Status RemoveEventHandler(const URL &url, ChannelEventHandler *handler)
Remove a channel event handler.
virtual ~PostMaster()
Destructor.
PostMaster()
Constructor.
void SetConnectionErrorHandler(std::function< void(const URL &, const XRootDStatus &)> handler)
Set the global on-error on-connect handler for control streams.
Status ForceDisconnect(const URL &url)
Shut down a channel.
Status Redirect(const URL &url, Message *msg, MsgHandler *handler)
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
Status RegisterEventHandler(const URL &url, ChannelEventHandler *handler)
Register channel event handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
JobManager * GetJobManager()
Get the job manager object user by the post master.
bool Initialize()
Initializer.
void DecFileInstCnt(const URL &url)
Decrement file object instance count bound to this channel.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Perform the handshake and the authentication for each physical stream.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
URL representation.
Definition: XrdClURL.hh:31
std::string GetChannelId() const
Definition: XrdClURL.cc:512
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:99
const std::string & GetProtocol() const
Get the protocol.
Definition: XrdClURL.hh:118
An interface for metadata redirectors.
virtual XRootDStatus HandleRequest(const Message *msg, MsgHandler *handler)=0
void Lock(XrdSysMutex *Mutex)
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t PostMasterMsg
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51
const char *const DefaultPollerPreference
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const int DefaultWorkerThreads
void Run(void *arg)
The job logic.
XRootDStatus status
ConnErrJob(const URL &url, const XRootDStatus &status, std::function< void(const URL &, const XRootDStatus &)> handler)
std::function< void(const URL &, const XRootDStatus &)> handler
TaskManager * pTaskManager
std::map< std::string, std::shared_ptr< Channel > > ChannelMap
std::unique_ptr< Job > pOnConnJob
void addFinalize(Channel *ch)
Used to maintain a non-owning set of live Channels. Used by Finalize.
std::unordered_set< Channel * > pFinalizeSet
std::function< void(const URL &, const XRootDStatus &)> pOnConnErrCB
void removeFinalize(Channel *ch)
Used to maintain a non-owning set of live Channels. Used by Finalize.
std::shared_ptr< Channel > GetChannel(const URL &url)
Get a channel for url, creating one if needed.
Procedure execution status.
Definition: XrdClStatus.hh:115