XRootD
XrdCl::PostMaster Class Reference

A hub for dispatching and receiving messages. More...

#include <XrdClPostMaster.hh>

+ Collaboration diagram for XrdCl::PostMaster:

Public Member Functions

 PostMaster ()
 Constructor. More...
 
virtual ~PostMaster ()
 Destructor. More...
 
void CollapseRedirect (const URL &oldurl, const URL &newURL)
 Collapse channel URL - replace the URL of the channel. More...
 
void DecFileInstCnt (const URL &url)
 Decrement file object instance count bound to this channel. More...
 
bool Finalize ()
 Finalizer. More...
 
Status ForceDisconnect (const URL &url)
 Shut down a channel. More...
 
Status ForceDisconnect (const URL &url, bool hush)
 Shut down a channel. More...
 
Status ForceDisconnect (std::shared_ptr< Channel > channel, const uint64_t sess)
 Shut down a channel. This version is used by the channel itself. More...
 
Status ForceReconnect (const URL &url)
 Reconnect the channel. More...
 
JobManagerGetJobManager ()
 Get the job manager object user by the post master. More...
 
TaskManagerGetTaskManager ()
 Get the task manager object user by the post master. More...
 
bool Initialize ()
 Initializer. More...
 
bool IsRunning ()
 
uint16_t NbConnectedStrm (const URL &url)
 Get the number of connected data streams. More...
 
void NotifyConnectHandler (const URL &url)
 Notify the global on-connect handler. More...
 
void NotifyConnErrHandler (const URL &url, const XRootDStatus &status)
 Notify the global error connection handler. More...
 
Status QueryTransport (const URL &url, uint16_t query, AnyObject &result)
 
Status Redirect (const URL &url, Message *msg, MsgHandler *handler)
 
Status RegisterEventHandler (const URL &url, ChannelEventHandler *handler)
 Register channel event handler. More...
 
bool Reinitialize ()
 Reinitialize after fork. More...
 
Status RemoveEventHandler (const URL &url, ChannelEventHandler *handler)
 Remove a channel event handler. More...
 
XRootDStatus Send (const URL &url, Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 
void SetConnectionErrorHandler (std::function< void(const URL &, const XRootDStatus &)> handler)
 Set the global on-error on-connect handler for control streams. More...
 
void SetOnConnectHandler (std::unique_ptr< Job > onConnJob)
 Set the global connection error handler. More...
 
void SetOnDataConnectHandler (const URL &url, std::shared_ptr< Job > onConnJob)
 Set the on-connect handler for data streams. More...
 
bool Start ()
 Start the post master. More...
 
bool Stop ()
 Stop the postmaster. More...
 

Detailed Description

A hub for dispatching and receiving messages.

Definition at line 47 of file XrdClPostMaster.hh.

Constructor & Destructor Documentation

◆ PostMaster()

XrdCl::PostMaster::PostMaster ( )

Constructor.

Definition at line 127 of file XrdClPostMaster.cc.

127  : pImpl( new PostMasterImpl() )
128  {
129  }

◆ ~PostMaster()

XrdCl::PostMaster::~PostMaster ( )
virtual

Destructor.

Definition at line 134 of file XrdClPostMaster.cc.

135  {
136  }

Member Function Documentation

◆ CollapseRedirect()

void XrdCl::PostMaster::CollapseRedirect ( const URL oldurl,
const URL newURL 
)

Collapse channel URL - replace the URL of the channel.

Definition at line 478 of file XrdClPostMaster.cc.

479  {
480  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
481  //--------------------------------------------------------------------------
482  // Get the passive channel
483  //--------------------------------------------------------------------------
484  std::shared_ptr<Channel> passive;
485  PostMasterImpl::ChannelMap::iterator it =
486  pImpl->pChannelMap.find( alias.GetChannelId() );
487  if( it != pImpl->pChannelMap.end() )
488  passive = it->second;
489 
490  //--------------------------------------------------------------------------
491  // If the channel does not exist there's nothing to do
492  //--------------------------------------------------------------------------
493  if( !passive ) return;
494 
495  //--------------------------------------------------------------------------
496  // Check if this URL is eligible for collapsing. To avoid depencencies
497  // we don't call CanCollapse while holding the channel map mutex. So we
498  // reverify the content of the map afterwards.
499  //--------------------------------------------------------------------------
500  scopedLock.UnLock();
501  if( !passive->CanCollapse( url ) ) return;
502 
503  scopedLock.Lock( &pImpl->pChannelMapMutex );
504  it = pImpl->pChannelMap.find( alias.GetChannelId() );
505  if( it == pImpl->pChannelMap.end() || it->second != passive )
506  {
507  // something changed. Retry.
508  scopedLock.UnLock();
509  CollapseRedirect( alias, url );
510  return;
511  }
512 
513  //--------------------------------------------------------------------------
514  // Create the active channel
515  //--------------------------------------------------------------------------
516  TransportManager *trManager = DefaultEnv::GetTransportManager();
517  TransportHandler *trHandler = trManager->GetHandler( url.GetProtocol() );
518 
519  if( !trHandler )
520  {
521  Log *log = DefaultEnv::GetLog();
522  log->Error( PostMasterMsg, "Unable to get transport handler for %s "
523  "protocol", url.GetProtocol().c_str() );
524  return;
525  }
526 
527  Log *log = DefaultEnv::GetLog();
528  log->Info( PostMasterMsg, "Label channel %s with alias %s.",
529  url.GetHostId().c_str(), alias.GetHostId().c_str() );
530 
531  std::shared_ptr<Channel> active(new Channel{ alias,
532  pImpl->pPoller, trHandler, pImpl->pTaskManager, pImpl->pJobManager, url },
533  [this](Channel *ch) { this->pImpl->removeFinalize( ch ); delete ch; });
534  pImpl->addFinalize( active.get() );
535  active->SetSelf( active );
536 
537  pImpl->pChannelMap[alias.GetChannelId()] = active;
538  //--------------------------------------------------------------------------
539  // The passive channel will be deallocated by TTL
540  //--------------------------------------------------------------------------
541  }
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
const uint64_t PostMasterMsg
XrdSysError Log
Definition: XrdConfig.cc:113

References XrdCl::Log::Error(), XrdCl::URL::GetChannelId(), XrdCl::TransportManager::GetHandler(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::URL::GetProtocol(), XrdCl::DefaultEnv::GetTransportManager(), XrdCl::Log::Info(), XrdSysMutexHelper::Lock(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::XRootDMsgHandler::Process().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DecFileInstCnt()

void XrdCl::PostMaster::DecFileInstCnt ( const URL url)

Decrement file object instance count bound to this channel.

Definition at line 546 of file XrdClPostMaster.cc.

547  {
548  auto channel = pImpl->GetChannel( url );
549 
550  if( !channel ) return;
551 
552  return channel->DecFileInstCnt();
553  }

Referenced by XrdCl::FileStateHandler::~FileStateHandler().

+ Here is the caller graph for this function:

◆ Finalize()

bool XrdCl::PostMaster::Finalize ( )

Finalizer.

Definition at line 168 of file XrdClPostMaster.cc.

169  {
170  //--------------------------------------------------------------------------
171  // Clean up the channels
172  //--------------------------------------------------------------------------
173  if( !pImpl->pInitialized )
174  return true;
175 
176  pImpl->pInitialized = false;
177  pImpl->pJobManager->Finalize();
178 
179  //--------------------------------------------------------------------------
180  // Finalize may cause some of the channels to remove themselves from the
181  // finalize set. So make a copy. Should be no concurrency as poller and
182  // jobmanager are stopped, so no lock.
183  //--------------------------------------------------------------------------
184  auto finSet = pImpl->pFinalizeSet;
185  for( auto ch: finSet ) ch->Finalize();
186 
187  pImpl->pChannelMap.clear();
188  return pImpl->pPoller->Finalize();
189  }

Referenced by XrdCl::ForkHandler::Child(), and XrdCl::DefaultEnv::GetPostMaster().

+ Here is the caller graph for this function:

◆ ForceDisconnect() [1/3]

Status XrdCl::PostMaster::ForceDisconnect ( const URL url)

Shut down a channel.

Definition at line 345 of file XrdClPostMaster.cc.

346  {
347  return ForceDisconnect(url, false);
348  }
Status ForceDisconnect(const URL &url)
Shut down a channel.

Referenced by XrdCl::Stream::OnReadTimeout().

+ Here is the caller graph for this function:

◆ ForceDisconnect() [2/3]

Status XrdCl::PostMaster::ForceDisconnect ( const URL url,
bool  hush 
)

Shut down a channel.

Definition at line 353 of file XrdClPostMaster.cc.

354  {
355  std::shared_ptr<Channel> channel;
356  {
357  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
358  PostMasterImpl::ChannelMap::iterator it =
359  pImpl->pChannelMap.find( url.GetChannelId() );
360 
361  if( it == pImpl->pChannelMap.end() )
362  return Status( stError, errInvalidOp );
363  channel = it->second;
364  pImpl->pChannelMap.erase( it );
365  }
366 
367  channel->ForceDisconnect( hush );
368  return Status();
369  }
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInvalidOp
Definition: XrdClStatus.hh:51

References XrdCl::errInvalidOp, XrdCl::URL::GetChannelId(), and XrdCl::stError.

+ Here is the call graph for this function:

◆ ForceDisconnect() [3/3]

Status XrdCl::PostMaster::ForceDisconnect ( std::shared_ptr< Channel channel,
const uint64_t  sess 
)

Shut down a channel. This version is used by the channel itself.

Definition at line 374 of file XrdClPostMaster.cc.

376  {
377  if( !channel )
378  return Status( stError, errNotSupported );
379 
380  {
381  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
382  PostMasterImpl::ChannelMap::iterator it =
383  pImpl->pChannelMap.find( channel->GetURL().GetChannelId() );
384 
385  if( it != pImpl->pChannelMap.end() && it->second == channel )
386  pImpl->pChannelMap.erase( it );
387  }
388 
389  channel->ForceDisconnect( channel, sess );
390  return Status();
391  }
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62

References XrdCl::errNotSupported, and XrdCl::stError.

◆ ForceReconnect()

Status XrdCl::PostMaster::ForceReconnect ( const URL url)

Reconnect the channel.

Definition at line 393 of file XrdClPostMaster.cc.

394  {
395  std::shared_ptr<Channel> channel;
396  {
397  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
398  PostMasterImpl::ChannelMap::iterator it =
399  pImpl->pChannelMap.find( url.GetChannelId() );
400 
401  if( it == pImpl->pChannelMap.end() )
402  return Status( stError, errInvalidOp );
403  channel = it->second;
404  }
405 
406  channel->ForceReconnect();
407  return Status();
408  }

References XrdCl::errInvalidOp, XrdCl::URL::GetChannelId(), and XrdCl::stError.

+ Here is the call graph for this function:

◆ GetJobManager()

JobManager * XrdCl::PostMaster::GetJobManager ( )

Get the job manager object user by the post master.

Definition at line 337 of file XrdClPostMaster.cc.

338  {
339  return pImpl->pJobManager;
340  }

Referenced by XrdCl::FileStateHandler::Close(), XrdCl::Operation< HasHndl >::Run(), XrdEc::ScheduleHandler(), and XrdCl::FileStateHandler::TimeOutRequests().

+ Here is the caller graph for this function:

◆ GetTaskManager()

TaskManager * XrdCl::PostMaster::GetTaskManager ( )

Get the task manager object user by the post master.

Definition at line 329 of file XrdClPostMaster.cc.

330  {
331  return pImpl->pTaskManager;
332  }

Referenced by XrdCl::ForkHandler::Child(), XrdCl::DefaultEnv::GetPostMaster(), and XrdCl::XRootDMsgHandler::Process().

+ Here is the caller graph for this function:

◆ Initialize()

bool XrdCl::PostMaster::Initialize ( )

Initializer.

Definition at line 141 of file XrdClPostMaster.cc.

142  {
143  Env *env = DefaultEnv::GetEnv();
144  std::string pollerPref = DefaultPollerPreference;
145  env->GetString( "PollerPreference", pollerPref );
146 
147  pImpl->pPoller = PollerFactory::CreatePoller( pollerPref );
148 
149  if( !pImpl->pPoller )
150  return false;
151 
152  bool st = pImpl->pPoller->Initialize();
153 
154  if( !st )
155  {
156  delete pImpl->pPoller;
157  return false;
158  }
159 
160  pImpl->pJobManager->Initialize();
161  pImpl->pInitialized = true;
162  return true;
163  }
static Env * GetEnv()
Get default client environment.
static Poller * CreatePoller(const std::string &preference)
const char *const DefaultPollerPreference

References XrdCl::PollerFactory::CreatePoller(), XrdCl::DefaultPollerPreference, XrdCl::DefaultEnv::GetEnv(), and XrdCl::Env::GetString().

Referenced by XrdCl::ForkHandler::Child(), and XrdCl::DefaultEnv::GetPostMaster().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ IsRunning()

bool XrdCl::PostMaster::IsRunning ( )
Returns
: true if underlying threads are running, false otherwise

Definition at line 558 of file XrdClPostMaster.cc.

559  {
560  return pImpl->pRunning;
561  }

◆ NbConnectedStrm()

uint16_t XrdCl::PostMaster::NbConnectedStrm ( const URL url)

Get the number of connected data streams.

Definition at line 413 of file XrdClPostMaster.cc.

414  {
415  auto channel = pImpl->GetChannel( url );
416  if( !channel ) return 0;
417  return channel->NbConnectedStrm();
418  }

◆ NotifyConnectHandler()

void XrdCl::PostMaster::NotifyConnectHandler ( const URL url)

Notify the global on-connect handler.

Definition at line 452 of file XrdClPostMaster.cc.

453  {
454  XrdSysMutexHelper lck( pImpl->pMtx );
455  if( pImpl->pOnConnJob )
456  {
457  URL *ptr = new URL( url );
458  pImpl->pJobManager->QueueJob( pImpl->pOnConnJob.get(), ptr );
459  }
460  }

Referenced by XrdCl::Stream::OnConnect().

+ Here is the caller graph for this function:

◆ NotifyConnErrHandler()

void XrdCl::PostMaster::NotifyConnErrHandler ( const URL url,
const XRootDStatus status 
)

Notify the global error connection handler.

Definition at line 465 of file XrdClPostMaster.cc.

466  {
467  XrdSysMutexHelper lck( pImpl->pMtx );
468  if( pImpl->pOnConnErrCB )
469  {
470  ConnErrJob *job = new ConnErrJob( url, status, pImpl->pOnConnErrCB );
471  pImpl->pJobManager->QueueJob( job, nullptr );
472  }
473  }

Referenced by XrdCl::Stream::OnConnectError().

+ Here is the caller graph for this function:

◆ QueryTransport()

Status XrdCl::PostMaster::QueryTransport ( const URL url,
uint16_t  query,
AnyObject result 
)

Query the transport handler for a given URL

Parameters
urlthe channel to be queried
querythe query as defined in the TransportQuery struct or others that may be recognized by the protocol transport
resultthe result of the query
Returns
status of the query

Definition at line 276 of file XrdClPostMaster.cc.

279  {
280  std::shared_ptr<Channel> channel;
281  {
282  XrdSysMutexHelper scopedLock( pImpl->pChannelMapMutex );
283  PostMasterImpl::ChannelMap::iterator it =
284  pImpl->pChannelMap.find( url.GetChannelId() );
285  if( it == pImpl->pChannelMap.end() )
286  return Status( stError, errInvalidOp );
287  channel = it->second;
288  }
289 
290  if( !channel )
291  return Status( stError, errNotSupported );
292 
293  return channel->QueryTransport( query, result );
294  }

References XrdCl::errInvalidOp, XrdCl::errNotSupported, XrdCl::URL::GetChannelId(), and XrdCl::stError.

Referenced by XrdCl::FileSystem::DirList(), XrdCl::FileStateHandler::PgRead(), and XrdCl::XRootDMsgHandler::Process().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Redirect()

Status XrdCl::PostMaster::Redirect ( const URL url,
Message msg,
MsgHandler handler 
)

Definition at line 262 of file XrdClPostMaster.cc.

265  {
266  RedirectorRegistry &registry = RedirectorRegistry::Instance();
267  VirtualRedirector *redirector = registry.Get( url );
268  if( !redirector )
269  return Status( stError, errInvalidOp );
270  return redirector->HandleRequest( msg, inHandler );
271  }
static RedirectorRegistry & Instance()
Returns reference to the single instance.

References XrdCl::errInvalidOp, XrdCl::RedirectorRegistry::Get(), XrdCl::VirtualRedirector::HandleRequest(), XrdCl::RedirectorRegistry::Instance(), and XrdCl::stError.

Referenced by XrdCl::MessageUtils::RedirectMessage().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RegisterEventHandler()

Status XrdCl::PostMaster::RegisterEventHandler ( const URL url,
ChannelEventHandler handler 
)

Register channel event handler.

Definition at line 299 of file XrdClPostMaster.cc.

301  {
302  auto channel = pImpl->GetChannel( url );
303 
304  if( !channel )
305  return Status( stError, errNotSupported );
306 
307  channel->RegisterEventHandler( handler );
308  return Status();
309  }

References XrdCl::errNotSupported, and XrdCl::stError.

◆ Reinitialize()

bool XrdCl::PostMaster::Reinitialize ( )

Reinitialize after fork.

Definition at line 240 of file XrdClPostMaster.cc.

241  {
242  return true;
243  }

◆ RemoveEventHandler()

Status XrdCl::PostMaster::RemoveEventHandler ( const URL url,
ChannelEventHandler handler 
)

Remove a channel event handler.

Definition at line 314 of file XrdClPostMaster.cc.

316  {
317  auto channel = pImpl->GetChannel( url );
318 
319  if( !channel )
320  return Status( stError, errNotSupported );
321 
322  channel->RemoveEventHandler( handler );
323  return Status();
324  }

References XrdCl::errNotSupported, and XrdCl::stError.

◆ Send()

XRootDStatus XrdCl::PostMaster::Send ( const URL url,
Message msg,
MsgHandler handler,
bool  stateful,
time_t  expires 
)

Send the message asynchronously - the message is inserted into the send queue and a listener is called when the message is succesfsully pushed through the wire or when the timeout elapses

DEADLOCK WARNING: no lock should be taken while calling this method that are used in the callback as well.

Parameters
urlrecipient of the message
msgmessage to be sent
expiresunix timestamp after which a failure is reported to the handler
handlerhandler will be notified about the status
statefulphysical stream disconnection causes an error
Returns
success if the message was successfully inserted into the send queues, failure otherwise

Definition at line 248 of file XrdClPostMaster.cc.

253  {
254  auto channel = pImpl->GetChannel( url );
255 
256  if( !channel )
257  return XRootDStatus( stError, errNotSupported );
258 
259  return channel->Send( msg, handler, stateful, expires );
260  }

References XrdCl::errNotSupported, and XrdCl::stError.

Referenced by XrdCl::MessageUtils::SendMessage().

+ Here is the caller graph for this function:

◆ SetConnectionErrorHandler()

void XrdCl::PostMaster::SetConnectionErrorHandler ( std::function< void(const URL &, const XRootDStatus &)>  handler)

Set the global on-error on-connect handler for control streams.

Definition at line 443 of file XrdClPostMaster.cc.

444  {
445  XrdSysMutexHelper lck( pImpl->pMtx );
446  pImpl->pOnConnErrCB = std::move( handler );
447  }

◆ SetOnConnectHandler()

void XrdCl::PostMaster::SetOnConnectHandler ( std::unique_ptr< Job onConnJob)

Set the global connection error handler.

Set the global on-connect handler for control streams.

Definition at line 434 of file XrdClPostMaster.cc.

435  {
436  XrdSysMutexHelper lck( pImpl->pMtx );
437  pImpl->pOnConnJob = std::move( onConnJob );
438  }

Referenced by XrdPosixConfig::conTracker().

+ Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::PostMaster::SetOnDataConnectHandler ( const URL url,
std::shared_ptr< Job onConnJob 
)

Set the on-connect handler for data streams.

Definition at line 423 of file XrdClPostMaster.cc.

425  {
426  auto channel = pImpl->GetChannel( url );
427  if( !channel ) return;
428  channel->SetOnDataConnectHandler( onConnJob );
429  }

◆ Start()

bool XrdCl::PostMaster::Start ( )

Start the post master.

Definition at line 194 of file XrdClPostMaster.cc.

195  {
196  if( !pImpl->pInitialized )
197  return false;
198 
199  if( !pImpl->pPoller->Start() )
200  return false;
201 
202  if( !pImpl->pTaskManager->Start() )
203  {
204  pImpl->pPoller->Stop();
205  return false;
206  }
207 
208  if( !pImpl->pJobManager->Start() )
209  {
210  pImpl->pPoller->Stop();
211  pImpl->pTaskManager->Stop();
212  return false;
213  }
214 
215  pImpl->pRunning = true;
216  return true;
217  }

Referenced by XrdCl::ForkHandler::Child(), XrdCl::DefaultEnv::GetPostMaster(), and XrdCl::ForkHandler::Parent().

+ Here is the caller graph for this function:

◆ Stop()

bool XrdCl::PostMaster::Stop ( )

Stop the postmaster.

Definition at line 222 of file XrdClPostMaster.cc.

223  {
224  if( !pImpl->pInitialized || !pImpl->pRunning )
225  return true;
226 
227  if( !pImpl->pJobManager->Stop() )
228  return false;
229  if( !pImpl->pPoller->Stop() )
230  return false;
231  if( !pImpl->pTaskManager->Stop() )
232  return false;
233  pImpl->pRunning = false;
234  return true;
235  }

Referenced by main(), and XrdCl::ForkHandler::Prepare().

+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: