XRootD
XrdCl::PollerBuiltIn Class Reference

A poller implementation using the build-in XRootD poller. More...

#include <XrdClPollerBuiltIn.hh>

+ Inheritance diagram for XrdCl::PollerBuiltIn:
+ Collaboration diagram for XrdCl::PollerBuiltIn:

Public Member Functions

 PollerBuiltIn ()
 Constructor. More...
 
 ~PollerBuiltIn ()
 
virtual bool AddSocket (Socket *socket, SocketHandler *handler)
 
virtual bool EnableReadNotification (Socket *socket, bool notify, time_t timeout=60)
 
virtual bool EnableWriteNotification (Socket *socket, bool notify, time_t timeout=60)
 
virtual bool Finalize ()
 Finalize the poller. More...
 
virtual bool Initialize ()
 Initialize the poller. More...
 
virtual bool IsRegistered (Socket *socket)
 Check whether the socket is registered with the poller. More...
 
virtual bool IsRunning () const
 Is the event loop running? More...
 
virtual bool RemoveSocket (Socket *socket)
 Remove the socket. More...
 
virtual void ShutdownEvents (Socket *socket)
 
virtual bool Start ()
 Start polling. More...
 
virtual bool Stop ()
 Stop polling. More...
 
- Public Member Functions inherited from XrdCl::Poller
virtual ~Poller ()
 Destructor. More...
 

Detailed Description

A poller implementation using the build-in XRootD poller.

Definition at line 40 of file XrdClPollerBuiltIn.hh.

Constructor & Destructor Documentation

◆ PollerBuiltIn()

XrdCl::PollerBuiltIn::PollerBuiltIn ( )
inline

Constructor.

Definition at line 46 of file XrdClPollerBuiltIn.hh.

46 : pNbPoller( GetNbPollerInit() ){}

◆ ~PollerBuiltIn()

XrdCl::PollerBuiltIn::~PollerBuiltIn ( )
inline

Definition at line 48 of file XrdClPollerBuiltIn.hh.

48 {}

Member Function Documentation

◆ AddSocket()

bool XrdCl::PollerBuiltIn::AddSocket ( Socket socket,
SocketHandler handler 
)
virtual

Add socket to the polling loop

Parameters
socketthe socket
handlerobject handling the events

Implements XrdCl::Poller.

Definition at line 328 of file XrdClPollerBuiltIn.cc.

330  {
331  Log *log = DefaultEnv::GetLog();
332  XrdSysMutexHelper scopedLock( pMutex );
333 
334  if( !socket )
335  {
336  log->Error( PollerMsg, "Invalid socket, impossible to poll" );
337  return false;
338  }
339 
340  if( socket->GetStatus() != Socket::Connected &&
341  socket->GetStatus() != Socket::Connecting )
342  {
343  log->Error( PollerMsg, "Socket is not in a state valid for polling" );
344  return false;
345  }
346 
347  log->Debug( PollerMsg, "Adding socket %p to the poller", (void*)socket );
348 
349  //--------------------------------------------------------------------------
350  // Check if the socket is already registered
351  //--------------------------------------------------------------------------
352  SocketMap::const_iterator it = pSocketMap.find( socket );
353  if( it != pSocketMap.end() )
354  {
355  log->Warning( PollerMsg, "%s Already registered with this poller",
356  socket->GetName().c_str() );
357  return false;
358  }
359 
360  //--------------------------------------------------------------------------
361  // Create the socket helper
362  //--------------------------------------------------------------------------
363  XrdSys::IOEvents::Poller* poller = RegisterAndGetPoller( socket );
364 
365  if( !poller )
366  {
367  log->Error( PollerMsg, "No poller available, can not add socket" );
368  return false;
369  }
370 
371  PollerHelper *helper = new PollerHelper();
372  helper->callBack = new ::SocketCallBack( socket, handler );
373 
374  if( poller )
375  {
376  helper->channel = new XrdSys::IOEvents::Channel( poller,
377  socket->GetFD(),
378  helper->callBack );
379  }
380 
381  handler->Initialize( this );
382  pSocketMap[socket] = helper;
383  return true;
384  }
static Log * GetLog()
Get default log.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
virtual void Initialize(Poller *)
Initializer.
Definition: XrdClPoller.hh:56
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
SocketStatus GetStatus() const
Get the socket status.
Definition: XrdClSocket.hh:125
const uint64_t PollerMsg

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::Socket::GetFD(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::Socket::GetStatus(), XrdCl::SocketHandler::Initialize(), XrdCl::PollerMsg, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ EnableReadNotification()

bool XrdCl::PollerBuiltIn::EnableReadNotification ( Socket socket,
bool  notify,
time_t  timeout = 60 
)
virtual

Notify the handler about read events

Parameters
socketthe socket
notifyspecify if the handler should be notified
timeoutif no read event occurred after this time a timeout event will be generated

Implements XrdCl::Poller.

Definition at line 459 of file XrdClPollerBuiltIn.cc.

462  {
463  using namespace XrdSys::IOEvents;
464  Log *log = DefaultEnv::GetLog();
465 
466  if( !socket )
467  {
468  log->Error( PollerMsg, "Invalid socket, read events unavailable" );
469  return false;
470  }
471 
472  //--------------------------------------------------------------------------
473  // Check if the socket is registered
474  //--------------------------------------------------------------------------
475  XrdSysMutexHelper scopedLock( pMutex );
476  SocketMap::const_iterator it = pSocketMap.find( socket );
477  if( it == pSocketMap.end() )
478  {
479  log->Warning( PollerMsg, "%s Socket is not registered",
480  socket->GetName().c_str() );
481  return false;
482  }
483 
484  PollerHelper *helper = (PollerHelper*)it->second;
485  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
486 
487  //--------------------------------------------------------------------------
488  // Enable read notifications
489  //--------------------------------------------------------------------------
490  if( notify )
491  {
492  if( helper->readEnabled )
493  return true;
494  helper->readTimeout = timeout;
495 
496  log->Dump( PollerMsg, "%s Enable read notifications, timeout: %lld",
497  socket->GetName().c_str(), (long long)timeout );
498 
499  if( poller )
500  {
501  const char *errMsg;
502  bool status = helper->channel->Enable( Channel::readEvents, timeout,
503  &errMsg );
504  if( !status )
505  {
506  log->Error( PollerMsg, "%s Unable to enable read notifications: %s",
507  socket->GetName().c_str(), errMsg );
508  return false;
509  }
510  }
511  helper->readEnabled = true;
512  }
513 
514  //--------------------------------------------------------------------------
515  // Disable read notifications
516  //--------------------------------------------------------------------------
517  else
518  {
519  if( !helper->readEnabled )
520  return true;
521 
522  log->Dump( PollerMsg, "%s Disable read notifications",
523  socket->GetName().c_str() );
524 
525  if( poller )
526  {
527  const char *errMsg;
528  bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
529  if( !status )
530  {
531  log->Error( PollerMsg, "%s Unable to disable read notifications: %s",
532  socket->GetName().c_str(), errMsg );
533  return false;
534  }
535  }
536  helper->readEnabled = false;
537  }
538  return true;
539  }
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
std::string GetName() const
Get the string representation of the socket.
Definition: XrdClSocket.cc:672

References XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::PollerMsg, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ EnableWriteNotification()

bool XrdCl::PollerBuiltIn::EnableWriteNotification ( Socket socket,
bool  notify,
time_t  timeout = 60 
)
virtual

Notify the handler about write events

Parameters
socketthe socket
notifyspecify if the handler should be notified
timeoutif no write event occurred after this time a timeout event will be generated

Implements XrdCl::Poller.

Definition at line 544 of file XrdClPollerBuiltIn.cc.

547  {
548  using namespace XrdSys::IOEvents;
549  Log *log = DefaultEnv::GetLog();
550 
551  if( !socket )
552  {
553  log->Error( PollerMsg, "Invalid socket, write events unavailable" );
554  return false;
555  }
556 
557  //--------------------------------------------------------------------------
558  // Check if the socket is registered
559  //--------------------------------------------------------------------------
560  XrdSysMutexHelper scopedLock( pMutex );
561  SocketMap::const_iterator it = pSocketMap.find( socket );
562  if( it == pSocketMap.end() )
563  {
564  log->Warning( PollerMsg, "%s Socket is not registered",
565  socket->GetName().c_str() );
566  return false;
567  }
568 
569  PollerHelper *helper = (PollerHelper*)it->second;
570  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
571 
572  //--------------------------------------------------------------------------
573  // Enable write notifications
574  //--------------------------------------------------------------------------
575  if( notify )
576  {
577  if( helper->writeEnabled )
578  return true;
579 
580  helper->writeTimeout = timeout;
581 
582  log->Dump( PollerMsg, "%s Enable write notifications, timeout: %lld",
583  socket->GetName().c_str(), (long long)timeout );
584 
585  if( poller )
586  {
587  const char *errMsg;
588  bool status = helper->channel->Enable( Channel::writeEvents, timeout,
589  &errMsg );
590  if( !status )
591  {
592  log->Error( PollerMsg, "%s Unable to enable write notifications: %s",
593  socket->GetName().c_str(), errMsg );
594  return false;
595  }
596  }
597  helper->writeEnabled = true;
598  }
599 
600  //--------------------------------------------------------------------------
601  // Disable read notifications
602  //--------------------------------------------------------------------------
603  else
604  {
605  if( !helper->writeEnabled )
606  return true;
607 
608  log->Dump( PollerMsg, "%s Disable write notifications",
609  socket->GetName().c_str() );
610  if( poller )
611  {
612  const char *errMsg;
613  bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
614  if( !status )
615  {
616  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
617  socket->GetName().c_str(), errMsg );
618  return false;
619  }
620  }
621  helper->writeEnabled = false;
622  }
623  return true;
624  }

References XrdCl::Log::Dump(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::PollerMsg, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ Finalize()

bool XrdCl::PollerBuiltIn::Finalize ( )
virtual

Finalize the poller.

Implements XrdCl::Poller.

Definition at line 179 of file XrdClPollerBuiltIn.cc.

180  {
181  //--------------------------------------------------------------------------
182  // Clean up the channels
183  //--------------------------------------------------------------------------
184  SocketMap::iterator it;
185  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
186  {
187  PollerHelper *helper = (PollerHelper*)it->second;
188  if( helper->channel ) helper->channel->Delete();
189  delete helper->callBack;
190  delete helper;
191  }
192  pSocketMap.clear();
193 
194  return true;
195  }

◆ Initialize()

bool XrdCl::PollerBuiltIn::Initialize ( )
virtual

Initialize the poller.

Implements XrdCl::Poller.

Definition at line 171 of file XrdClPollerBuiltIn.cc.

172  {
173  return true;
174  }

◆ IsRegistered()

bool XrdCl::PollerBuiltIn::IsRegistered ( Socket socket)
virtual

Check whether the socket is registered with the poller.

Implements XrdCl::Poller.

Definition at line 629 of file XrdClPollerBuiltIn.cc.

630  {
631  XrdSysMutexHelper scopedLock( pMutex );
632  SocketMap::iterator it = pSocketMap.find( socket );
633  return it != pSocketMap.end();
634  }

◆ IsRunning()

virtual bool XrdCl::PollerBuiltIn::IsRunning ( ) const
inlinevirtual

Is the event loop running?

Implements XrdCl::Poller.

Definition at line 123 of file XrdClPollerBuiltIn.hh.

124  {
125  return !pPollerPool.empty();
126  }

◆ RemoveSocket()

bool XrdCl::PollerBuiltIn::RemoveSocket ( Socket socket)
virtual

Remove the socket.

Implements XrdCl::Poller.

Definition at line 413 of file XrdClPollerBuiltIn.cc.

414  {
415  using namespace XrdSys::IOEvents;
416  Log *log = DefaultEnv::GetLog();
417 
418  //--------------------------------------------------------------------------
419  // Find the right socket
420  //--------------------------------------------------------------------------
421  XrdSysMutexHelper scopedLock( pMutex );
422  SocketMap::iterator it = pSocketMap.find( socket );
423  if( it == pSocketMap.end() )
424  return true;
425 
426  log->Debug( PollerMsg, "%s Removing socket from the poller",
427  socket->GetName().c_str() );
428 
429  // unregister from the poller it's currently associated with
430  UnregisterFromPoller( socket );
431 
432  //--------------------------------------------------------------------------
433  // Remove the socket
434  //--------------------------------------------------------------------------
435  PollerHelper *helper = (PollerHelper*)it->second;
436  pSocketMap.erase( it );
437  scopedLock.UnLock();
438 
439  if( helper->channel )
440  {
441  const char *errMsg;
442  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
443  if( !status )
444  {
445  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
446  socket->GetName().c_str(), errMsg );
447  return false;
448  }
449  helper->channel->Delete();
450  }
451  delete helper->callBack;
452  delete helper;
453  return true;
454  }

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdCl::PollerMsg, and XrdSysMutexHelper::UnLock().

+ Here is the call graph for this function:

◆ ShutdownEvents()

void XrdCl::PollerBuiltIn::ShutdownEvents ( Socket socket)
virtual

Disables further callbacks to the socket's event handler. If callback is currently running wait for it, unless it is the current thread.

Implements XrdCl::Poller.

Definition at line 392 of file XrdClPollerBuiltIn.cc.

393  {
394  XrdSysMutexHelper scopedLock( pMutex );
395  SocketMap::iterator it = pSocketMap.find( socket );
396  if( it == pSocketMap.end() )
397  return;
398 
399  PollerHelper *helper = (PollerHelper*)it->second;
400  if( !helper ) return;
401  XrdSys::IOEvents::CallBack *cb = helper->callBack;
402  if( !cb ) return;
403  SocketCallBack *scb = dynamic_cast<SocketCallBack*>( cb );
404  if( !scb ) return;
405  auto dc = scb->GetControl();
406  scopedLock.UnLock();
407  dc->DisableCallBack();
408  }

References XrdSysMutexHelper::UnLock().

+ Here is the call graph for this function:

◆ Start()

bool XrdCl::PollerBuiltIn::Start ( )
virtual

Start polling.

Implements XrdCl::Poller.

Definition at line 200 of file XrdClPollerBuiltIn.cc.

201  {
202  //--------------------------------------------------------------------------
203  // Start the poller
204  //--------------------------------------------------------------------------
205  using namespace XrdSys;
206 
207  Log *log = DefaultEnv::GetLog();
208  log->Debug( PollerMsg, "Creating and starting the built-in poller..." );
209  XrdSysMutexHelper scopedLock( pMutex );
210  int errNum = 0;
211  const char *errMsg = 0;
212 
213  for( int i = 0; i < pNbPoller; ++i )
214  {
215  XrdSys::IOEvents::Poller* poller = IOEvents::Poller::Create( errNum, &errMsg );
216  if( !poller )
217  {
218  log->Error( PollerMsg, "Unable to create the internal poller object: "
219  "%s (%s)", XrdSysE2T( errno ), errMsg );
220  return false;
221  }
222  pPollerPool.push_back( poller );
223  }
224 
225  pNext = pPollerPool.begin();
226 
227  log->Debug( PollerMsg, "Using %d poller threads", pNbPoller );
228 
229  //--------------------------------------------------------------------------
230  // Check if we have any descriptors to reinsert from the last time we
231  // were started
232  //--------------------------------------------------------------------------
233  SocketMap::iterator it;
234  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
235  {
236  PollerHelper *helper = (PollerHelper*)it->second;
237  Socket *socket = it->first;
238  helper->channel = new IOEvents::Channel( RegisterAndGetPoller( socket ), socket->GetFD(),
239  helper->callBack );
240  if( helper->readEnabled )
241  {
242  bool status = helper->channel->Enable( IOEvents::Channel::readEvents,
243  helper->readTimeout, &errMsg );
244  if( !status )
245  {
246  log->Error( PollerMsg, "Unable to enable read notifications "
247  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
248 
249  return false;
250  }
251  }
252 
253  if( helper->writeEnabled )
254  {
255  bool status = helper->channel->Enable( IOEvents::Channel::writeEvents,
256  helper->writeTimeout, &errMsg );
257  if( !status )
258  {
259  log->Error( PollerMsg, "Unable to enable write notifications "
260  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
261 
262  return false;
263  }
264  }
265  }
266  return true;
267  }
bool Create
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
A network socket.
Definition: XrdClSocket.hh:43
bool Enable(int events, int timeout=0, const char **eText=0)

References Create, XrdCl::Log::Debug(), XrdSys::IOEvents::Channel::Enable(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::PollerMsg, and XrdSysE2T().

+ Here is the call graph for this function:

◆ Stop()

bool XrdCl::PollerBuiltIn::Stop ( )
virtual

Stop polling.

Implements XrdCl::Poller.

Definition at line 272 of file XrdClPollerBuiltIn.cc.

273  {
274  using namespace XrdSys::IOEvents;
275 
276  Log *log = DefaultEnv::GetLog();
277  log->Debug( PollerMsg, "Stopping the poller..." );
278 
279  XrdSysMutexHelper scopedLock( pMutex );
280 
281  if( pPollerPool.empty() )
282  {
283  log->Debug( PollerMsg, "Stopping a poller that has not been started" );
284  return true;
285  }
286 
287  while( !pPollerPool.empty() )
288  {
289  XrdSys::IOEvents::Poller *poller = pPollerPool.back();
290  if( *pNext == poller )
291  pNext = pPollerPool.begin();
292  pPollerPool.pop_back();
293 
294  if( !poller ) continue;
295 
296  scopedLock.UnLock();
297  poller->Stop();
298  delete poller;
299  scopedLock.Lock( &pMutex );
300  }
301  pNext = pPollerPool.end();
302  pPollerMap.clear();
303 
304  SocketMap::iterator it;
305  const char *errMsg = 0;
306 
307  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
308  {
309  PollerHelper *helper = (PollerHelper*)it->second;
310  if( !helper->channel ) continue;
311  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
312  if( !status )
313  {
314  Socket *socket = it->first;
315  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
316  socket->GetName().c_str(), errMsg );
317  }
318  helper->channel->Delete();
319  helper->channel = 0;
320  }
321 
322  return true;
323  }

References XrdCl::Log::Debug(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::Socket::GetName(), XrdSysMutexHelper::Lock(), XrdCl::PollerMsg, XrdSys::IOEvents::Poller::Stop(), and XrdSysMutexHelper::UnLock().

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: