XRootD
XrdClPollerBuiltIn.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClLog.hh"
27 #include "XrdCl/XrdClDefaultEnv.hh"
28 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClSocket.hh"
30 #include "XrdCl/XrdClOptimizers.hh"
31 #include "XrdSys/XrdSysE2T.hh"
32 #include "XrdSys/XrdSysIOEvents.hh"
33 
34 namespace
35 {
36  //----------------------------------------------------------------------------
37  // A helper struct passed to the callback as a custom arg
38  //----------------------------------------------------------------------------
39  struct PollerHelper
40  {
41  PollerHelper():
42  channel(0), callBack(0), readEnabled(false), writeEnabled(false),
43  readTimeout(0), writeTimeout(0)
44  {}
47  bool readEnabled;
48  bool writeEnabled;
49  time_t readTimeout;
50  time_t writeTimeout;
51  };
52 
53  //----------------------------------------------------------------------------
54  // Call back implementation
55  //----------------------------------------------------------------------------
56  class SocketCallBack: public XrdSys::IOEvents::CallBack
57  {
58  public:
59  enum CallBackFlags
60  {
61  kRunningCallBack = 1,
62  kIdSet = 2,
63  kWantDisable = 4,
64  kDisabled = 8
65  };
66 
67  struct DisableControl
68  {
69  DisableControl() : pFlags( 0 ), pCnd( 0 ) { }
70  ~DisableControl() { }
71 
72  //------------------------------------------------------------------------
73  // Want to disable further callbacks. If callback is currently running
74  // in a different thread from our caller, wait until callback finishes.
75  //------------------------------------------------------------------------
76  void DisableCallBack()
77  {
78  const int flags = pFlags.fetch_or( kWantDisable );
79  if( !(flags & kIdSet) ) return;
80  if( !(flags & kRunningCallBack) ) return;
81  XrdSysCondVarHelper lck( pCnd );
82  if( XrdSysThread::Same( XrdSysThread::ID(), pSelfId ) ) return;
83  while( !(pFlags.load() & kDisabled) ) pCnd.Wait();
84  }
85 
86  std::atomic<int> pFlags;
87  XrdSysCondVar pCnd;
88  pthread_t pSelfId;
89  };
90 
91  SocketCallBack( XrdCl::Socket *sock, XrdCl::SocketHandler *sh ):
92  pSocket( sock ), pHandler( sh )
93  {
94  pControl = std::make_shared<DisableControl>();
95  }
96 
97  virtual ~SocketCallBack() {};
98 
99  virtual bool Event( XrdSys::IOEvents::Channel *chP,
100  void *cbArg,
101  int evFlags )
102  {
103  using namespace XrdCl;
104  uint8_t ev = 0;
105 
106  if( evFlags & ReadyToRead ) ev |= SocketHandler::ReadyToRead;
107  if( evFlags & ReadTimeOut ) ev |= SocketHandler::ReadTimeOut;
108  if( evFlags & ReadyToWrite ) ev |= SocketHandler::ReadyToWrite;
109  if( evFlags & WriteTimeOut ) ev |= SocketHandler::WriteTimeOut;
110 
111  Log *log = DefaultEnv::GetLog();
112  if( unlikely(log->GetLevel() >= Log::DumpMsg) )
113  {
114  log->Dump( PollerMsg, "%s Got an event: %s",
115  pSocket->GetName().c_str(),
116  SocketHandler::EventTypeToString( ev ).c_str() );
117  }
118 
119  int flags = pControl->pFlags.fetch_or( kRunningCallBack );
120  if( !( flags & kIdSet ) )
121  {
122  XrdSysCondVarHelper lck( pControl->pCnd );
123  pControl->pSelfId = XrdSysThread::ID();
124  flags = pControl->pFlags.fetch_or( kIdSet );
125  }
126  if( flags & kWantDisable )
127  {
128  XrdSysCondVarHelper lck( pControl->pCnd );
129  pControl->pFlags &= ~kRunningCallBack;
130  pControl->pFlags |= kDisabled;
131  pControl->pCnd.Broadcast();
132  return false;
133  }
134 
135  //----------------------------------------------------------------------
136  // If the event handler calls RemoveSocket for itself this object may
137  // be destroyed during the Event call.
138  //----------------------------------------------------------------------
139  auto control = pControl;
140  pHandler->Event( ev, pSocket );
141 
142  flags = control->pFlags.fetch_and( ~kRunningCallBack );
143  if( flags & kWantDisable )
144  {
145  XrdSysCondVarHelper lck( control->pCnd );
146  control->pFlags |= kDisabled;
147  control->pCnd.Broadcast();
148  return false;
149  }
150  return true;
151  }
152 
153  std::shared_ptr<DisableControl> GetControl()
154  {
155  return pControl;
156  }
157 
158  private:
159  XrdCl::Socket *pSocket;
160  XrdCl::SocketHandler *pHandler;
161  std::shared_ptr<DisableControl> pControl;
162  };
163 }
164 
165 
166 namespace XrdCl
167 {
168  //----------------------------------------------------------------------------
169  // Initialize the poller
170  //----------------------------------------------------------------------------
172  {
173  return true;
174  }
175 
176  //----------------------------------------------------------------------------
177  // Finalize the poller
178  //----------------------------------------------------------------------------
180  {
181  //--------------------------------------------------------------------------
182  // Clean up the channels
183  //--------------------------------------------------------------------------
184  SocketMap::iterator it;
185  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
186  {
187  PollerHelper *helper = (PollerHelper*)it->second;
188  if( helper->channel ) helper->channel->Delete();
189  delete helper->callBack;
190  delete helper;
191  }
192  pSocketMap.clear();
193 
194  return true;
195  }
196 
197  //------------------------------------------------------------------------
198  // Start polling
199  //------------------------------------------------------------------------
201  {
202  //--------------------------------------------------------------------------
203  // Start the poller
204  //--------------------------------------------------------------------------
205  using namespace XrdSys;
206 
207  Log *log = DefaultEnv::GetLog();
208  log->Debug( PollerMsg, "Creating and starting the built-in poller..." );
209  XrdSysMutexHelper scopedLock( pMutex );
210  int errNum = 0;
211  const char *errMsg = 0;
212 
213  for( int i = 0; i < pNbPoller; ++i )
214  {
215  XrdSys::IOEvents::Poller* poller = IOEvents::Poller::Create( errNum, &errMsg );
216  if( !poller )
217  {
218  log->Error( PollerMsg, "Unable to create the internal poller object: "
219  "%s (%s)", XrdSysE2T( errno ), errMsg );
220  return false;
221  }
222  pPollerPool.push_back( poller );
223  }
224 
225  pNext = pPollerPool.begin();
226 
227  log->Debug( PollerMsg, "Using %d poller threads", pNbPoller );
228 
229  //--------------------------------------------------------------------------
230  // Check if we have any descriptors to reinsert from the last time we
231  // were started
232  //--------------------------------------------------------------------------
233  SocketMap::iterator it;
234  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
235  {
236  PollerHelper *helper = (PollerHelper*)it->second;
237  Socket *socket = it->first;
238  helper->channel = new IOEvents::Channel( RegisterAndGetPoller( socket ), socket->GetFD(),
239  helper->callBack );
240  if( helper->readEnabled )
241  {
242  bool status = helper->channel->Enable( IOEvents::Channel::readEvents,
243  helper->readTimeout, &errMsg );
244  if( !status )
245  {
246  log->Error( PollerMsg, "Unable to enable read notifications "
247  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
248 
249  return false;
250  }
251  }
252 
253  if( helper->writeEnabled )
254  {
255  bool status = helper->channel->Enable( IOEvents::Channel::writeEvents,
256  helper->writeTimeout, &errMsg );
257  if( !status )
258  {
259  log->Error( PollerMsg, "Unable to enable write notifications "
260  "while re-starting %s (%s)", XrdSysE2T( errno ), errMsg );
261 
262  return false;
263  }
264  }
265  }
266  return true;
267  }
268 
269  //------------------------------------------------------------------------
270  // Stop polling
271  //------------------------------------------------------------------------
273  {
274  using namespace XrdSys::IOEvents;
275 
276  Log *log = DefaultEnv::GetLog();
277  log->Debug( PollerMsg, "Stopping the poller..." );
278 
279  XrdSysMutexHelper scopedLock( pMutex );
280 
281  if( pPollerPool.empty() )
282  {
283  log->Debug( PollerMsg, "Stopping a poller that has not been started" );
284  return true;
285  }
286 
287  while( !pPollerPool.empty() )
288  {
289  XrdSys::IOEvents::Poller *poller = pPollerPool.back();
290  if( *pNext == poller )
291  pNext = pPollerPool.begin();
292  pPollerPool.pop_back();
293 
294  if( !poller ) continue;
295 
296  scopedLock.UnLock();
297  poller->Stop();
298  delete poller;
299  scopedLock.Lock( &pMutex );
300  }
301  pNext = pPollerPool.end();
302  pPollerMap.clear();
303 
304  SocketMap::iterator it;
305  const char *errMsg = 0;
306 
307  for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
308  {
309  PollerHelper *helper = (PollerHelper*)it->second;
310  if( !helper->channel ) continue;
311  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
312  if( !status )
313  {
314  Socket *socket = it->first;
315  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
316  socket->GetName().c_str(), errMsg );
317  }
318  helper->channel->Delete();
319  helper->channel = 0;
320  }
321 
322  return true;
323  }
324 
325  //------------------------------------------------------------------------
326  // Add socket to the polling queue
327  //------------------------------------------------------------------------
329  SocketHandler *handler )
330  {
331  Log *log = DefaultEnv::GetLog();
332  XrdSysMutexHelper scopedLock( pMutex );
333 
334  if( !socket )
335  {
336  log->Error( PollerMsg, "Invalid socket, impossible to poll" );
337  return false;
338  }
339 
340  if( socket->GetStatus() != Socket::Connected &&
341  socket->GetStatus() != Socket::Connecting )
342  {
343  log->Error( PollerMsg, "Socket is not in a state valid for polling" );
344  return false;
345  }
346 
347  log->Debug( PollerMsg, "Adding socket %p to the poller", (void*)socket );
348 
349  //--------------------------------------------------------------------------
350  // Check if the socket is already registered
351  //--------------------------------------------------------------------------
352  SocketMap::const_iterator it = pSocketMap.find( socket );
353  if( it != pSocketMap.end() )
354  {
355  log->Warning( PollerMsg, "%s Already registered with this poller",
356  socket->GetName().c_str() );
357  return false;
358  }
359 
360  //--------------------------------------------------------------------------
361  // Create the socket helper
362  //--------------------------------------------------------------------------
363  XrdSys::IOEvents::Poller* poller = RegisterAndGetPoller( socket );
364 
365  if( !poller )
366  {
367  log->Error( PollerMsg, "No poller available, can not add socket" );
368  return false;
369  }
370 
371  PollerHelper *helper = new PollerHelper();
372  helper->callBack = new ::SocketCallBack( socket, handler );
373 
374  if( poller )
375  {
376  helper->channel = new XrdSys::IOEvents::Channel( poller,
377  socket->GetFD(),
378  helper->callBack );
379  }
380 
381  handler->Initialize( this );
382  pSocketMap[socket] = helper;
383  return true;
384  }
385 
386  //----------------------------------------------------------------------------
387  // This disables further callbacks to the socket's handler from the poller
388  // thread. Will wait until any current callback completes (unless our caller
389  // is the same thread). We do not yet remove the socket from the poller.
390  // Removal may still block until the relevant poller thread can run.
391  //----------------------------------------------------------------------------
393  {
394  XrdSysMutexHelper scopedLock( pMutex );
395  SocketMap::iterator it = pSocketMap.find( socket );
396  if( it == pSocketMap.end() )
397  return;
398 
399  PollerHelper *helper = (PollerHelper*)it->second;
400  if( !helper ) return;
401  XrdSys::IOEvents::CallBack *cb = helper->callBack;
402  if( !cb ) return;
403  SocketCallBack *scb = dynamic_cast<SocketCallBack*>( cb );
404  if( !scb ) return;
405  auto dc = scb->GetControl();
406  scopedLock.UnLock();
407  dc->DisableCallBack();
408  }
409 
410  //------------------------------------------------------------------------
411  // Remove the socket
412  //------------------------------------------------------------------------
414  {
415  using namespace XrdSys::IOEvents;
416  Log *log = DefaultEnv::GetLog();
417 
418  //--------------------------------------------------------------------------
419  // Find the right socket
420  //--------------------------------------------------------------------------
421  XrdSysMutexHelper scopedLock( pMutex );
422  SocketMap::iterator it = pSocketMap.find( socket );
423  if( it == pSocketMap.end() )
424  return true;
425 
426  log->Debug( PollerMsg, "%s Removing socket from the poller",
427  socket->GetName().c_str() );
428 
429  // unregister from the poller it's currently associated with
430  UnregisterFromPoller( socket );
431 
432  //--------------------------------------------------------------------------
433  // Remove the socket
434  //--------------------------------------------------------------------------
435  PollerHelper *helper = (PollerHelper*)it->second;
436  pSocketMap.erase( it );
437  scopedLock.UnLock();
438 
439  if( helper->channel )
440  {
441  const char *errMsg;
442  bool status = helper->channel->Disable( Channel::allEvents, &errMsg );
443  if( !status )
444  {
445  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
446  socket->GetName().c_str(), errMsg );
447  return false;
448  }
449  helper->channel->Delete();
450  }
451  delete helper->callBack;
452  delete helper;
453  return true;
454  }
455 
456  //----------------------------------------------------------------------------
457  // Notify the handler about read events
458  //----------------------------------------------------------------------------
460  bool notify,
461  time_t timeout )
462  {
463  using namespace XrdSys::IOEvents;
464  Log *log = DefaultEnv::GetLog();
465 
466  if( !socket )
467  {
468  log->Error( PollerMsg, "Invalid socket, read events unavailable" );
469  return false;
470  }
471 
472  //--------------------------------------------------------------------------
473  // Check if the socket is registered
474  //--------------------------------------------------------------------------
475  XrdSysMutexHelper scopedLock( pMutex );
476  SocketMap::const_iterator it = pSocketMap.find( socket );
477  if( it == pSocketMap.end() )
478  {
479  log->Warning( PollerMsg, "%s Socket is not registered",
480  socket->GetName().c_str() );
481  return false;
482  }
483 
484  PollerHelper *helper = (PollerHelper*)it->second;
485  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
486 
487  //--------------------------------------------------------------------------
488  // Enable read notifications
489  //--------------------------------------------------------------------------
490  if( notify )
491  {
492  if( helper->readEnabled )
493  return true;
494  helper->readTimeout = timeout;
495 
496  log->Dump( PollerMsg, "%s Enable read notifications, timeout: %lld",
497  socket->GetName().c_str(), (long long)timeout );
498 
499  if( poller )
500  {
501  const char *errMsg;
502  bool status = helper->channel->Enable( Channel::readEvents, timeout,
503  &errMsg );
504  if( !status )
505  {
506  log->Error( PollerMsg, "%s Unable to enable read notifications: %s",
507  socket->GetName().c_str(), errMsg );
508  return false;
509  }
510  }
511  helper->readEnabled = true;
512  }
513 
514  //--------------------------------------------------------------------------
515  // Disable read notifications
516  //--------------------------------------------------------------------------
517  else
518  {
519  if( !helper->readEnabled )
520  return true;
521 
522  log->Dump( PollerMsg, "%s Disable read notifications",
523  socket->GetName().c_str() );
524 
525  if( poller )
526  {
527  const char *errMsg;
528  bool status = helper->channel->Disable( Channel::readEvents, &errMsg );
529  if( !status )
530  {
531  log->Error( PollerMsg, "%s Unable to disable read notifications: %s",
532  socket->GetName().c_str(), errMsg );
533  return false;
534  }
535  }
536  helper->readEnabled = false;
537  }
538  return true;
539  }
540 
541  //----------------------------------------------------------------------------
542  // Notify the handler about write events
543  //----------------------------------------------------------------------------
545  bool notify,
546  time_t timeout )
547  {
548  using namespace XrdSys::IOEvents;
549  Log *log = DefaultEnv::GetLog();
550 
551  if( !socket )
552  {
553  log->Error( PollerMsg, "Invalid socket, write events unavailable" );
554  return false;
555  }
556 
557  //--------------------------------------------------------------------------
558  // Check if the socket is registered
559  //--------------------------------------------------------------------------
560  XrdSysMutexHelper scopedLock( pMutex );
561  SocketMap::const_iterator it = pSocketMap.find( socket );
562  if( it == pSocketMap.end() )
563  {
564  log->Warning( PollerMsg, "%s Socket is not registered",
565  socket->GetName().c_str() );
566  return false;
567  }
568 
569  PollerHelper *helper = (PollerHelper*)it->second;
570  XrdSys::IOEvents::Poller *poller = GetPoller( socket );
571 
572  //--------------------------------------------------------------------------
573  // Enable write notifications
574  //--------------------------------------------------------------------------
575  if( notify )
576  {
577  if( helper->writeEnabled )
578  return true;
579 
580  helper->writeTimeout = timeout;
581 
582  log->Dump( PollerMsg, "%s Enable write notifications, timeout: %lld",
583  socket->GetName().c_str(), (long long)timeout );
584 
585  if( poller )
586  {
587  const char *errMsg;
588  bool status = helper->channel->Enable( Channel::writeEvents, timeout,
589  &errMsg );
590  if( !status )
591  {
592  log->Error( PollerMsg, "%s Unable to enable write notifications: %s",
593  socket->GetName().c_str(), errMsg );
594  return false;
595  }
596  }
597  helper->writeEnabled = true;
598  }
599 
600  //--------------------------------------------------------------------------
601  // Disable read notifications
602  //--------------------------------------------------------------------------
603  else
604  {
605  if( !helper->writeEnabled )
606  return true;
607 
608  log->Dump( PollerMsg, "%s Disable write notifications",
609  socket->GetName().c_str() );
610  if( poller )
611  {
612  const char *errMsg;
613  bool status = helper->channel->Disable( Channel::writeEvents, &errMsg );
614  if( !status )
615  {
616  log->Error( PollerMsg, "%s Unable to disable write notifications: %s",
617  socket->GetName().c_str(), errMsg );
618  return false;
619  }
620  }
621  helper->writeEnabled = false;
622  }
623  return true;
624  }
625 
626  //----------------------------------------------------------------------------
627  // Check whether the socket is registered with the poller
628  //----------------------------------------------------------------------------
630  {
631  XrdSysMutexHelper scopedLock( pMutex );
632  SocketMap::iterator it = pSocketMap.find( socket );
633  return it != pSocketMap.end();
634  }
635 
636  //----------------------------------------------------------------------------
637  // Return poller threads in round-robin fashion
638  //----------------------------------------------------------------------------
639  XrdSys::IOEvents::Poller* PollerBuiltIn::GetNextPoller()
640  {
641  if( pPollerPool.empty() ) return 0;
642 
643  PollerPool::iterator ret = pNext;
644  ++pNext;
645  if( pNext == pPollerPool.end() )
646  pNext = pPollerPool.begin();
647  return *ret;
648  }
649 
650  //----------------------------------------------------------------------------
651  // Return the poller associated with the respective channel
652  //----------------------------------------------------------------------------
653  XrdSys::IOEvents::Poller* PollerBuiltIn::RegisterAndGetPoller(const Socket * socket)
654  {
655  PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
656 
657  if( itr == pPollerMap.end() )
658  {
659  XrdSys::IOEvents::Poller* poller = GetNextPoller();
660  if( poller )
661  pPollerMap[socket->GetFD()] = poller;
662  return poller;
663  }
664 
665  return itr->second;
666  }
667 
668  void PollerBuiltIn::UnregisterFromPoller( const Socket *socket )
669  {
670  PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
671  if( itr == pPollerMap.end() ) return;
672  pPollerMap.erase( itr );
673  }
674 
675  XrdSys::IOEvents::Poller* PollerBuiltIn::GetPoller(const Socket * socket)
676  {
677  PollerMap::iterator itr = pPollerMap.find( socket->GetFD() );
678  if( itr == pPollerMap.end() ) return 0;
679  return itr->second;
680  }
681 
682  //----------------------------------------------------------------------------
683  // Get the initial value for pNbPoller
684  //----------------------------------------------------------------------------
685  int PollerBuiltIn::GetNbPollerInit()
686  {
687  Env * env = DefaultEnv::GetEnv();
689  env->GetInt("ParallelEvtLoop", ret);
690  return ret;
691  }
692 }
#define unlikely(x)
bool Create
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
LogLevel GetLevel() const
Get the log level.
Definition: XrdClLog.hh:258
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
virtual bool AddSocket(Socket *socket, SocketHandler *handler)
virtual bool EnableReadNotification(Socket *socket, bool notify, time_t timeout=60)
virtual bool RemoveSocket(Socket *socket)
Remove the socket.
virtual bool Stop()
Stop polling.
virtual void ShutdownEvents(Socket *socket)
virtual bool EnableWriteNotification(Socket *socket, bool notify, time_t timeout=60)
virtual bool IsRegistered(Socket *socket)
Check whether the socket is registered with the poller.
virtual bool Finalize()
Finalize the poller.
virtual bool Initialize()
Initialize the poller.
virtual bool Start()
Start polling.
virtual void Initialize(Poller *)
Initializer.
Definition: XrdClPoller.hh:56
A network socket.
Definition: XrdClSocket.hh:43
std::string GetName() const
Get the string representation of the socket.
Definition: XrdClSocket.cc:672
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
int GetFD() const
Get the file descriptor.
Definition: XrdClSocket.hh:214
SocketStatus GetStatus() const
Get the socket status.
Definition: XrdClSocket.hh:125
void Lock(XrdSysMutex *Mutex)
static int Same(pthread_t t1, pthread_t t2)
static pthread_t ID(void)
virtual bool Event(Channel *chP, void *cbArg, int evFlags)=0
bool Enable(int events, int timeout=0, const char **eText=0)
const uint64_t PollerMsg
const int DefaultParallelEvtLoop