XRootD
XrdSysIOEvents.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d S y s I O E v e n t s . c c */
4 /* */
5 /* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Andrew Hanushevsky for Stanford University under contract */
8 /* DE-AC02-76-SFO0515 with the Department of Energy */
9 /* */
10 /* This file is part of the XRootD software suite. */
11 /* */
12 /* XRootD is free software: you can redistribute it and/or modify it under */
13 /* the terms of the GNU Lesser General Public License as published by the */
14 /* Free Software Foundation, either version 3 of the License, or (at your */
15 /* option) any later version. */
16 /* */
17 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20 /* License for more details. */
21 /* */
22 /* You should have received a copy of the GNU Lesser General Public License */
23 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25 /* */
26 /* The copyright holder's institutional names and contributor's names may not */
27 /* be used to endorse or promote products derived from this software without */
28 /* specific prior written permission of the institution or contributor. */
29 /******************************************************************************/
30 
31 #include <cstdio>
32 #include <cstdlib>
33 
34 #include "XrdSys/XrdSysE2T.hh"
35 #include "XrdSys/XrdSysFD.hh"
36 #include "XrdSys/XrdSysIOEvents.hh"
37 #include "XrdSys/XrdSysHeaders.hh"
38 #include "XrdSys/XrdSysPlatform.hh"
39 #include "XrdSys/XrdSysPthread.hh"
40 
41 /******************************************************************************/
42 /* L o c a l D a t a */
43 /******************************************************************************/
44 
45 namespace
46 {
47 // Status code to name array corresponding to:
48 // enum Status {isClear = 0, isCBMode, isDead};
49 //
50  const char *statName[] = {"isClear", "isCBMode", "isDead"};
51 }
52 
53 /******************************************************************************/
54 /* L o c a l D e f i n e s */
55 /******************************************************************************/
56 
57 #define STATUS statName[(int)chStat]
58 
59 #define STATUSOF(x) statName[(int)(x->chStat)]
60 
61 #define SINGLETON(dlvar, theitem)\
62  theitem ->dlvar .next == theitem
63 
64 #define INSERT(dlvar, curitem, newitem) \
65  newitem ->dlvar .next = curitem; \
66  newitem ->dlvar .prev = curitem ->dlvar .prev; \
67  curitem ->dlvar .prev-> dlvar .next = newitem; \
68  curitem ->dlvar .prev = newitem
69 
70 #define REMOVE(dlbase, dlvar, curitem) \
71  if (dlbase == curitem) dlbase = (SINGLETON(dlvar,curitem) \
72  ? 0 : curitem ->dlvar .next);\
73  curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\
74  curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\
75  curitem ->dlvar .next = curitem;\
76  curitem ->dlvar .prev = curitem
77 
78 #define REVENTS(x) x & Channel:: readEvents
79 
80 #define WEVENTS(x) x & Channel::writeEvents
81 
82 #define ISPOLLER XrdSysThread::Same(XrdSysThread::ID(),pollTid)
83 
84 #define BOOLNAME(x) (x ? "true" : "false")
85 
86 #define DO_TRACE(x,fd,y) \
87  {PollerInit::traceMTX.Lock(); \
88  std::cerr <<"IOE fd "<<fd<<' '<<#x <<": "<<y<<'\n'<< std::flush; \
89  PollerInit::traceMTX.UnLock();}
90 
91 #define TRACING PollerInit::doTrace
92 
93 #define IF_TRACE(x,fd,y) if (TRACING) DO_TRACE(x,fd,y)
94 
95 #define TRACE_LOK " channel now " <<(isLocked ? "locked" : "unlocked")
96 
97 #define TRACE_MOD(x,fd,y) \
98  IF_TRACE(x,fd,"Modify(" <<y <<") == " \
99  <<BOOLNAME(retval) <<TRACE_LOK)
100 
101 #define TRACE_NOD(x,fd,y) \
102  IF_TRACE(x,fd,"Modify(" <<y <<") skipped; no events changed")
103 
104 /******************************************************************************/
105 /* G l o b a l D a t a */
106 /******************************************************************************/
107 
109  = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff);
110 
111  pid_t XrdSys::IOEvents::Poller::parentPID = getpid();
112 
113 /******************************************************************************/
114 /* L o c a l C l a s s e s */
115 /******************************************************************************/
116 /******************************************************************************/
117 /* T h r e a d S t a r t u p I n t e r f a c e */
118 /******************************************************************************/
119 
120 namespace XrdSys
121 {
122 namespace IOEvents
123 {
124 struct pollArg
126  const char *retMsg;
127  int retCode;
129 
130  pollArg() : retMsg(0), retCode(0)
131  {pollSync = new XrdSysSemaphore(0, "poll sync");}
132  ~pollArg() {}
133  };
134 
136 {
137 public:
138 
139 static void *Start(void *parg);
140 };
141 
142 void *BootStrap::Start(void *parg)
143 {
144  struct pollArg *pollArg = (struct pollArg *)parg;
145  Poller *thePoller = pollArg->pollP;
146  XrdSysSemaphore *theSem = pollArg->pollSync;
147  thePoller->pollTid = XrdSysThread::ID();
148 
149  thePoller->Begin(theSem, pollArg->retCode, &(pollArg->retMsg));
150  delete theSem;
151 
152  return (void *)0;
153 }
154 
155 /******************************************************************************/
156 /* P o l l e r E r r 1 */
157 /******************************************************************************/
158 
159 // This class is set in the channel when an error occurs but cannot be reflected
160 // immediately because either there is no callback function or all events are
161 // disabled. We need to do this because error events can be physically presented
162 // by the kernel even when logical events are disabled. Note that the error
163 // number and text will have been set and remain set as the channel was actually
164 // disabled preventing any new operation on the channel.
165 //
166 class PollerErr1 : public Poller
167 {
168 public:
169 
170  PollerErr1() : Poller(-1, -1) {}
172 
173 protected:
174 void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
175  {(void)syncp; (void)rc; (void)eTxt;}
176 
177 void Exclude(Channel *cP, bool &isLocked, bool dover=1)
178  {(void)cP; (void)isLocked; (void)dover;}
179 
180 bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
181  {(void)isLocked;
182  if (!(eNum = GetFault(cP))) eNum = EPROTO;
183  if (eTxt) *eTxt = "initializing channel";
184  return false;
185  }
186 
187 bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
188  {(void)isLocked;
189  if (!(eNum = GetFault(cP))) eNum = EPROTO;
190  if (eTxt) *eTxt = "modifying channel";
191  return false;
192  }
193 
194 void Shutdown() {}
195 };
196 
197 /******************************************************************************/
198 /* P o l l e r I n i t */
199 /******************************************************************************/
200 
201 // This class is used as the initial poller on a channel. It is responsible
202 // for adding the file descriptor to the poll set upon the initial enable. This
203 // avoids enabling a channel prior to it receiving a call back function.
204 //
205 class PollerInit : public Poller
206 {
207 public:
208 
209  PollerInit() : Poller(-1, -1) {}
211 
213 static bool doTrace;
214 
215 protected:
216 
217 void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {}
218 
219 void Exclude(Channel *cP, bool &isLocked, bool dover=1) {}
220 
221 bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
222  {eNum = EPROTO;
223  if (eTxt) *eTxt = "initializing channel";
224  return false;
225  }
226 
227 bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
228  {bool rc = Init(cP, eNum, eTxt, isLocked);
229  IF_TRACE(Modify,cP->GetFD(), "Init() returned " <<BOOLNAME(rc));
230  return rc;
231  }
232 
233 void Shutdown() {}
234 };
235 
236 bool PollerInit::doTrace = (getenv("XrdSysIOE_TRACE") != 0);
238 
239 /******************************************************************************/
240 /* P o l l e r W a i t */
241 /******************************************************************************/
242 
243 // This class is set in the channel when we need to serialize aces to the
244 // channel. Channel methods (as some others) check for this to see if they need
245 // to defer the current operation. We need to do his because some poller
246 // implementations must release the channel lock to avoid a deadlock.
247 //
248 class PollerWait : public Poller
249 {
250 public:
251 
252  PollerWait() : Poller(-1, -1) {}
254 
255 protected:
256 void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {}
257 
258 void Exclude(Channel *cP, bool &isLocked, bool dover=1) {}
259 
260 bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
261  {eNum = EIDRM;
262  if (eTxt) *eTxt = "initializing channel";
263  return false;
264  }
265 
266 bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
267  {return Init(cP, eNum, eTxt, isLocked);}
268 
269 void Shutdown() {}
270 };
271 
275 };
276 };
277 
278 /******************************************************************************/
279 /* C l a s s C h a n n e l M e t h o d s */
280 /******************************************************************************/
281 /******************************************************************************/
282 /* C o n s t r u c t o r */
283 /******************************************************************************/
284 
286  CallBack *cbP, void *cbArg)
287  : chPollXQ(pollP), chCB(cbP), chCBA(cbArg)
288 {
289  attList.next = attList.prev = this;
290  tmoList.next = tmoList.prev = this;
291  inTOQ = 0;
292  pollEnt = 0;
293  chStat = isClear;
294  Reset(&pollInit, fd);
295 
296  pollP->Attach(this);
297 }
298 
299 /******************************************************************************/
300 /* D e l e t e */
301 /******************************************************************************/
302 
304 {
305  Poller *myPoller;
306  bool isLocked = true;
307 
308 // Do some tracing
309 //
310  IF_TRACE(Delete,chFD,"status="<<STATUS);
311 
312 // Lock ourselves during the delete process. If the channel is disassociated
313 // or the real poller is set to the error poller then this channel is clean
314 // and can be deleted (i.e. the channel ran through Detach()).
315 //
316  chMutex.Lock();
317  if (!chPollXQ || chPollXQ == &pollErr1)
318  {chMutex.UnLock();
319  delete this;
320  return;
321  }
322 
323 // Disable and remove ourselves from all queues
324 //
325  myPoller = chPollXQ;
326  chPollXQ->Detach(this,isLocked,false);
327  if (!isLocked) chMutex.Lock();
328 
329 // If we are in callback mode then we will need to delay the destruction until
330 // after the callback completes unless this is the poller thread. In that case,
331 // we need to tell the poller that we have been destroyed in a shelf-stable way.
332 //
333  if (chStat)
334  {if (XrdSysThread::Same(XrdSysThread::ID(),myPoller->pollTid))
335  {myPoller->chDead = true;
336  chMutex.UnLock();
337  } else {
338  XrdSysSemaphore cbDone(0);
339  IF_TRACE(Delete,chFD,"waiting for callback");
340  chStat = isDead;
341  chCBA = (void *)&cbDone;
342  chMutex.UnLock();
343  cbDone.Wait();
344  }
345  } else chMutex.UnLock();
346 
347 // It is now safe to release the storage
348 //
349  IF_TRACE(Delete,chFD,"chan="<< std::hex<<(void *)this<< std::dec);
350  delete this;
351 }
352 
353 /******************************************************************************/
354 /* D i s a b l e */
355 /******************************************************************************/
356 
357 bool XrdSys::IOEvents::Channel::Disable(int events, const char **eText)
358 {
359  int eNum = 0, newev, curev;
360  bool retval = true, isLocked = true;
361 
362 // Lock this channel
363 //
364  chMutex.Lock();
365 
366 // Get correct current events; depending on the state of the channel
367 //
368  if (chPoller == &pollWait) curev = static_cast<int>(reMod);
369  else curev = static_cast<int>(chEvents);
370 
371 // Trace this entry
372 //
373  IF_TRACE(Disable,chFD,"->Disable(" <<events <<") chev=" <<curev);
374 
375 // Calculate new event mask
376 //
377  events &= allEvents;
378  newev = curev & ~events;
379 
380 // If something has changed, then modify the event mask in the poller. The
381 // poller may or may not unlock this channel during the process.
382 //
383  if (newev != curev)
384  {chEvents = newev;
385  retval = chPoller->Modify(this, eNum, eText, isLocked);
386  TRACE_MOD(Disable,chFD,newev);
387  } else {
388  TRACE_NOD(Disable,chFD,newev);
389  }
390  if (isLocked) chMutex.UnLock();
391 
392 // All done
393 //
394  if (!retval) errno = eNum;
395  return retval;
396 }
397 
398 /******************************************************************************/
399 /* E n a b l e */
400 /******************************************************************************/
401 
402 bool XrdSys::IOEvents::Channel::Enable(int events, int timeout,
403  const char **eText)
404 {
405  int eNum = 0, newev, curev, tmoSet = 0;
406  bool retval, setTO, isLocked = true;
407 
408 // Lock ourselves against any changes (this is a recursive mutex)
409 //
410  chMutex.Lock();
411 
412 // Get correct current events; depending on the state of the channel
413 //
414  if (chPoller == &pollWait) curev = static_cast<int>(reMod);
415  else curev = static_cast<int>(chEvents);
416 
417 // Trace this entry
418 //
419  IF_TRACE(Enable,chFD,"->Enable("<<events<<','<<timeout<<") chev="<<curev);
420 
421 // Establish events that should be enabled
422 //
423  events &= allEvents;
424  newev = (curev ^ events) & events;
425  chEvents = curev | events;
426 
427 // Handle timeout changes now
428 //
429  if (REVENTS(events))
430  { if (timeout > 0) chRTO = timeout;
431  else if (timeout < 0) chRTO = 0;
432  if (rdDL != Poller::maxTime || chRTO) tmoSet |= CallBack::ReadyToRead;
433  }
434 
435  if (WEVENTS(events))
436  { if (timeout > 0) chWTO = timeout;
437  else if (timeout < 0) chWTO = 0;
438  if (wrDL != Poller::maxTime || chWTO) tmoSet |= CallBack::ReadyToWrite;
439  }
440 
441 // Check if we have to reset the timeout. We need to hold the channel lock here.
442 //
443  if (tmoSet && chPoller != &pollErr1)
444  setTO = chPollXQ->TmoAdd(this, tmoSet);
445  else setTO = false;
446 
447 // Check if any modifcations needed here. If so, invoke the modifier. Note that
448 // the modify will unlock the channel if the operation causes a wait. So,
449 // we cannot depend on the channel being locked upon return. The reason we do
450 // not unlock here is because we must ensure the channel doesn't change while
451 // we call modify. We let modify determine what to do.
452 //
453  if (newev)
454  {retval = chPoller->Modify(this, eNum, eText, isLocked);
455  TRACE_MOD(Enable,chFD,(curev | events));
456  } else {
457  retval = true;
458  TRACE_NOD(Enable,chFD,(curev | events));
459  }
460 
461 // We need to notify the poller thread if the added deadline is the first in the
462 // queue and the poller is waiting. We also optimize for the case where the
463 // poller thread is always woken up to perform an action in which case it
464 // doesn't need a separate wakeup. We only do this if the enable succeeed. Note
465 // that we cannot hold the channel mutex for this call because it may wait.
466 //
467  if (isLocked) chMutex.UnLock();
468  bool isWakePend = CPP_ATOMIC_LOAD(chPollXQ->wakePend, std::memory_order_consume);
469  if (retval && !isWakePend && setTO && isLocked) chPollXQ->WakeUp();
470 
471 // All done
472 //
473  if (!retval) errno = eNum;
474  return retval;
475 }
476 
477 /******************************************************************************/
478 /* G e t C a l l B a c k */
479 /******************************************************************************/
480 
482 {
483  chMutex.Lock();
484  *cbP = chCB;
485  *cbArg = chCBA;
486  chMutex.UnLock();
487 }
488 
489 /******************************************************************************/
490 /* Private: R e s e t */
491 /******************************************************************************/
492 
493 void XrdSys::IOEvents::Channel::Reset(XrdSys::IOEvents::Poller *thePoller,
494  int fd, int eNum)
495 {
496  chPoller = thePoller;
497  chFD = fd;
498  chFault = eNum;
499  chRTO = 0;
500  chWTO = 0;
501  chEvents = 0;
502  dlType = 0;
503  inPSet = 0;
504  reMod = 0;
505  rdDL = Poller::maxTime;
506  wrDL = Poller::maxTime;
507  deadLine = Poller::maxTime;
508 }
509 
510 /******************************************************************************/
511 /* S e t C a l l B a c k */
512 /******************************************************************************/
513 
515 {
516 
517 // We only need to have the channel lock to set the callback. If the object
518 // is in the process of being destroyed, we do nothing.
519 //
520  chMutex.Lock();
521  if (chStat != isDead)
522  {chCB = cbP;
523  chCBA = cbArg;
524  }
525  chMutex.UnLock();
526 }
527 
528 /******************************************************************************/
529 /* S e t F D */
530 /******************************************************************************/
531 
533 {
534  bool isLocked = true;
535 
536 // Obtain the channel lock. If the object is in callback mode we have some
537 // extra work to do. If normal callback then indicate the channel transitioned
538 // to prevent it being automatically re-enabled. If it's being destroyed, then
539 // do nothing. Otherwise, this is a stupid double setFD call.
540 //
541  chMutex.Lock();
542  if (chStat == isDead)
543  {chMutex.UnLock();
544  return;
545  }
546 
547 // This is a tricky deal here because we need to protect ourselves from other
548 // threads as well as the poller trying to do a callback. We first, set the
549 // poller target. This means the channel is no longer ready and callbacks will
550 // be skipped. We then remove the current file descriptor. This may unlock the
551 // channel but at this point that's ok.
552 //
553  if (inPSet)
554  {chPoller = &pollWait;
555  chPollXQ->Detach(this, isLocked, true);
556  if (!isLocked) chMutex.Lock();
557  }
558 
559 // Indicate channel needs to be re-enabled then unlock the channel
560 //
561  Reset(&pollInit, fd);
562  chMutex.UnLock();
563 }
564 
565 /******************************************************************************/
566 /* C l a s s P o l l e r */
567 /******************************************************************************/
568 /******************************************************************************/
569 /* C o n s t r u c t o r */
570 /******************************************************************************/
571 
573 {
574 
575 // Now initialize local class members
576 //
577  attBase = 0;
578  tmoBase = 0;
579  cmdFD = cFD;
580  reqFD = rFD;
581  wakePend = false;
582  pipeBuff = 0;
583  pipeBlen = 0;
584  pipePoll.fd = rFD;
585  pipePoll.events = POLLIN | POLLRDNORM;
586  tmoMask = 255;
587 }
588 
589 /******************************************************************************/
590 /* A t t a c h */
591 /******************************************************************************/
592 
593 void XrdSys::IOEvents::Poller::Attach(XrdSys::IOEvents::Channel *cP)
594 {
595  Channel *pcP;
596 
597 // We allow only one attach at a time to simplify the processing
598 //
599  adMutex.Lock();
600 
601 // Chain this channel into the list of attached channels
602 //
603  if ((pcP = attBase)) {INSERT(attList, pcP, cP);}
604  else attBase = cP;
605 
606 // All done
607 //
608  adMutex.UnLock();
609 }
610 
611 /******************************************************************************/
612 /* C b k T M O */
613 /******************************************************************************/
614 
616 {
617  Channel *cP;
618 
619 // Process each element in the timeout queue, calling the callback function
620 // if the timeout has passed. As this method can be called with a lock on the
621 // channel mutex, we need to drop it prior to calling the callback.
622 //
623  toMutex.Lock();
624  while((cP = tmoBase) && cP->deadLine <= time(0))
625  {int dlType = cP->dlType;
626  toMutex.UnLock();
627  CbkXeq(cP, dlType, 0, 0);
628  toMutex.Lock();
629  }
630  toMutex.UnLock();
631 }
632 
633 /******************************************************************************/
634 /* C b k X e q */
635 /******************************************************************************/
636 
638  int eNum, const char *eTxt)
639 {
640  XrdSysMutexHelper cbkMHelp(cP->chMutex);
641  char oldEvents;
642  bool cbok, retval, isRead, isWrite, isLocked = true;
643 
644 // Perform any required tracing
645 //
646  if (TRACING)
647  {const char *cbtype = (cP->chPoller == cP->chPollXQ ? "norm" :
648  (cP->chPoller == &pollInit ? "init" :
649  (cP->chPoller == &pollWait ? "wait" : "err")));
650  DO_TRACE(CbkXeq,cP->chFD,"callback events=" <<events
651  <<" chev=" <<static_cast<int>(cP->chEvents)
652  <<" toq=" <<(cP->inTOQ != 0) <<" erc=" <<eNum
653  <<" callback " <<(cP->chCB ? "present" : "missing")
654  <<" poller=" <<cbtype);
655  }
656 
657 // Remove this from the timeout queue if there and reset the deadlines based
658 // on the event we are reflecting. This separates read and write deadlines
659 //
660  if (cP->inTOQ)
661  {TmoDel(cP);
662  cP->dlType |= (events & CallBack::ValidEvents) << 4;
663  isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut);
664  if (isRead) cP->rdDL = maxTime;
665  isWrite= events & (CallBack::ReadyToWrite | CallBack::WriteTimeOut);
666  if (isWrite) cP->wrDL = maxTime;
667  } else {
668  cP->dlType &= CallBack::ValidEvents;
669  isRead = isWrite = false;
670  }
671 
672 // Verify that there is a callback here and the channel is ready. If not,
673 // disable this channel for the events being refelcted unless the event is a
674 // fatal error. In this case we need to abandon the channel since error events
675 // may continue to be generated as we can't always disable them.
676 //
677  if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
678  {if (eNum)
679  {cP->chPoller = &pollErr1; cP->chFault = eNum;
680  cP->inPSet = 0;
681  return false;
682  }
683  oldEvents = cP->chEvents;
684  cP->chEvents = 0;
685  retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
686  TRACE_MOD(CbkXeq,cP->chFD,0);
687  if (!isLocked) cP->chMutex.Lock();
688  cP->chEvents = oldEvents;
689  return true;
690  }
691 
692 // Resolve the problem where we get an error event but the channel wants them
693 // presented as a read or write event. If neither is possible then defer the
694 // error until the channel is enabled again.
695 //
696  if (eNum)
697  {if (cP->chEvents & Channel::errorEvents)
698  {cP->chPoller = &pollErr1; cP->chFault = eNum;
699  cP->chStat = Channel::isCBMode;
700  chDead = false;
701  cbkMHelp.UnLock();
702  cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt);
703  if (chDead) return true;
704  cbkMHelp.Lock(&(cP->chMutex));
705  cP->inPSet = 0;
706  return false;
707  }
708  if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead;
709  else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite;
710  else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0;
711  return false;
712  }
713  }
714 
715 // Indicate that we are in callback mode then drop the channel lock and effect
716 // the callback. This allows the callback to freely manage locks.
717 //
718  cP->chStat = Channel::isCBMode;
719  chDead = false;
720  // Detach() may be called after unlocking the channel and would zero the
721  // callback pointer and argument. So keep a copy.
722  CallBack *cb = cP->chCB;
723  void *cba = cP->chCBA;
724  cbkMHelp.UnLock();
725  IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <<events);
726  cbok = cb->Event(cP,cba, events);
727  IF_TRACE(CbkXeq,cP->chFD,"callback returned " <<BOOLNAME(cbok));
728 
729 // If channel destroyed by the callback, bail really fast. Otherwise, regain
730 // the channel lock.
731 //
732  if (chDead) return true;
733  cbkMHelp.Lock(&(cP->chMutex));
734 
735 // If the channel is being destroyed; then another thread must have done so.
736 // Tell it the callback has finished and just return.
737 //
738  if (cP->chStat != Channel::isCBMode)
739  {if (cP->chStat == Channel::isDead)
740  {XrdSysSemaphore *theSem = (XrdSysSemaphore *)cP->chCBA;
741  // channel will be destroyed shortly after post, unlock mutex before
742  cbkMHelp.UnLock();
743  theSem->Post();
744  }
745  return true;
746  }
747  cP->chStat = Channel::isClear;
748 
749 // Handle enable or disable here. If we keep the channel enabled then reset
750 // the timeout if it hasn't been handled via a call from the callback.
751 //
752  if (!cbok) Detach(cP,isLocked,false);
753  else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
754  TmoAdd(cP, 0);
755 
756 // All done. While the mutex should not have been unlocked, we relock it if
757 // it has to keep the mutex helper from croaking.
758 //
759  if (!isLocked) cP->chMutex.Lock();
760  return true;
761 }
762 
763 /******************************************************************************/
764 /* Static: C r e a t e */
765 /******************************************************************************/
766 
768  const char **eTxt,
769  int crOpts)
770 {
771  int fildes[2];
772  struct pollArg pArg;
773  pthread_t tid;
774 
775 // Create a pipe used to break the poll wait loop
776 //
777  if (XrdSysFD_Pipe(fildes))
778  {eNum = errno;
779  if (eTxt) *eTxt = "creating poll pipe";
780  return 0;
781  }
782 
783 // Create an actual implementation of a poller
784 //
785  if (!(pArg.pollP = newPoller(fildes, eNum, eTxt)))
786  {close(fildes[0]);
787  close(fildes[1]);
788  return 0;
789  }
790 
791 // Now start a thread to handle this poller object
792 //
794  (void *)&pArg, XRDSYSTHREAD_BIND, "Poller")))
795  {if (eTxt) *eTxt = "creating poller thread"; return 0;}
796 
797 // Now wait for the thread to finish initializing before we allow use
798 // Note that the bootstrap takes ownership of the semaphore and will delete it
799 // once the thread positing the semaphore actually ends. This is to avoid
800 // semaphore bugs present in certain (e.g. Linux) kernels.
801 //
802  pArg.pollSync->Wait();
803 
804 // Check if all went well
805 //
806  if (pArg.retCode)
807  {if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller");
808  eNum = pArg.retCode;
809  delete pArg.pollP;
810  return 0;
811  }
812 
813 // Set creation options in the new poller
814 //
815  if (crOpts & optTOM)
817 
818 // All done
819 //
820  eNum = 0;
821  if (eTxt) *eTxt = "";
822  return pArg.pollP;
823 }
824 
825 /******************************************************************************/
826 /* D e t a c h */
827 /******************************************************************************/
828 
829 void XrdSys::IOEvents::Poller::Detach(XrdSys::IOEvents::Channel *cP,
830  bool &isLocked, bool keep)
831 {
832 // The caller must hold the channel lock!
833 //
834  bool detFD = (cP->inPSet != 0);
835 
836 // First remove the channel from the timeout queue
837 //
838  if (cP->inTOQ)
839  {toMutex.Lock();
840  REMOVE(tmoBase, tmoList, cP);
841  toMutex.UnLock();
842  }
843 
844 // Allow only one detach at a time
845 //
846  adMutex.Lock();
847 
848 // Preset channel to prohibit callback if we are not keeping this channel
849 //
850  if (!keep)
851  {cP->Reset(&pollErr1, cP->chFD);
852  cP->chPollXQ = &pollErr1;
853  cP->chCB = 0;
854  cP->chCBA = 0;
855  if (cP->attList.next != cP) {REMOVE(attBase, attList, cP);}
856  else if (attBase == cP) attBase = 0;
857  }
858 
859 // Exclude this channel from the associated poll set, don't hold the ad lock
860 //
861  adMutex.UnLock();
862  if (detFD)
863  {cP->inPSet = 0;
864  if (cmdFD >= 0) Exclude(cP, isLocked, !ISPOLLER);
865  }
866 }
867 
868 /******************************************************************************/
869 /* Protected: G e t R e q u e s t */
870 /******************************************************************************/
871 
872 // Warning: This method runs unlocked. The caller must have exclusive use of
873 // the reqBuff otherwise unpredictable results will occur.
874 
876 {
877  ssize_t rlen;
878  int rc;
879 
880 // See if we are to resume a read or start a fresh one
881 //
882  if (!pipeBlen)
883  {pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);}
884 
885 // Wait for the next request. Some OS's (like Linux) don't support non-blocking
886 // pipes. So, we must front the read with a poll.
887 //
888  do {rc = poll(&pipePoll, 1, 0);}
889  while(rc < 0 && (errno == EAGAIN || errno == EINTR));
890  if (rc < 1) return 0;
891 
892 // Now we can put up a read without a delay. Normally a full command will be
893 // present. Under some heavy conditions, this may not be the case.
894 //
895  do {rlen = read(reqFD, pipeBuff, pipeBlen);}
896  while(rlen < 0 && errno == EINTR);
897  if (rlen <= 0)
898  {std::cerr <<"Poll: "<<XrdSysE2T(errno)<<" reading from request pipe\n"<< std::flush;
899  return 0;
900  }
901 
902 // Check if all the data has arrived. If not all the data is present, defer
903 // this request until more data arrives.
904 //
905  if (!(pipeBlen -= rlen)) return 1;
906  pipeBuff += rlen;
907  return 0;
908 }
909 
910 /******************************************************************************/
911 /* Protected: I n i t */
912 /******************************************************************************/
913 
915  const char **eTxt, bool &isLocked)
916 {
917 // The channel must be locked upon entry!
918 //
919  bool retval;
920 
921 
922 // If we are already in progress then simply update the shadow events and
923 // resuppress all current events.
924 //
925  if (cP->chPoller == &pollWait)
926  {cP->reMod = cP->chEvents;
927  cP->chEvents = 0;
928  IF_TRACE(Init,cP->chFD,"defer events=" <<cP->reMod);
929  return true;
930  }
931 
932 // Trace this entry
933 //
934  IF_TRACE(Init,cP->chFD,"begin events=" <<int(cP->chEvents));
935 
936 // If no events are enabled at this point, just return
937 //
938  if (!(cP->chEvents)) return true;
939 
940 // Refuse to enable a channel without a callback function
941 //
942  if (!(cP->chCB))
943  {eNum = EDESTADDRREQ;
944  if (eTxt) *eTxt = "enabling without a callback";
945  return false;
946  }
947 
948 // So, now we can include the channel in the poll set. We will include it
949 // with no events enabled to prevent callbacks prior to completion here.
950 //
951  cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
952  retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
953  IF_TRACE(Init,cP->chFD,"Include() returned " <<BOOLNAME(retval) <<TRACE_LOK);
954  if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
955 
956 // Determine what future poller to use. If we can use the regular poller then
957 // set the correct event mask for the channel. Note that we could have lost
958 // control but the correct events will be reflected in the "reMod" member.
959 //
960  if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
961  else {cP->chPoller = cP->chPollXQ;
962  cP->inPSet = 1;
963  if (cP->reMod)
964  {cP->chEvents = cP->reMod;
965  retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
966  TRACE_MOD(Init,cP->chFD,int(cP->reMod));
967  if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
968  } else {
969  TRACE_NOD(Init,cP->chFD,0);
970  }
971  }
972 
973 // All done
974 //
975  cP->reMod = 0;
976  return retval;
977 }
978 
979 /******************************************************************************/
980 /* P o l l 2 E n u m */
981 /******************************************************************************/
982 
984 {
985  if (events & POLLERR) return EPIPE;
986 
987  if (events & POLLHUP) return ECONNRESET;
988 
989  if (events & POLLNVAL) return EBADF;
990 
991  return EOPNOTSUPP;
992 }
993 
994 /******************************************************************************/
995 /* S e n d C m d */
996 /******************************************************************************/
997 
999 {
1000  int wlen;
1001 
1002 // Pipe writes are atomic so we don't need locks. Some commands require
1003 // confirmation. We handle that here based on the command. Note that pipes
1004 // gaurantee that all of the data will be written or we will block.
1005 //
1006  if (cmd.req >= PipeData::Post)
1007  {XrdSysSemaphore mySem(0);
1008  cmd.theSem = &mySem;
1009  do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1010  while (wlen < 0 && errno == EINTR);
1011  if (wlen > 0) mySem.Wait();
1012  } else {
1013  do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1014  while (wlen < 0 && errno == EINTR);
1015  }
1016 
1017 // All done
1018 //
1019  return (wlen >= 0 ? 0 : errno);
1020 }
1021 
1022 /******************************************************************************/
1023 /* Protected: S e t P o l l E n t */
1024 /******************************************************************************/
1025 
1027 {
1028  cP->pollEnt = pe;
1029 }
1030 
1031 /******************************************************************************/
1032 /* S t o p */
1033 /******************************************************************************/
1034 
1036 {
1037  PipeData cmdbuff;
1038  CallBack *theCB;
1039  Channel *cP;
1040  void *cbArg;
1041  int doCB;
1042 
1043 // Initialize the pipdata structure
1044 //
1045  memset(static_cast<void*>( &cmdbuff ), 0, sizeof(cmdbuff));
1046  cmdbuff.req = PipeData::Stop;
1047 
1048 // Lock all of this
1049 //
1050  adMutex.Lock();
1051 
1052 // If we are already shutdown then we are done
1053 //
1054  if (cmdFD == -1) {adMutex.UnLock(); return;}
1055 
1056 // First we must stop the poller thread in an orderly fashion.
1057 //
1058  adMutex.UnLock();
1059  SendCmd(cmdbuff);
1060  adMutex.Lock();
1061 
1062 // Close the pipe communication mechanism
1063 //
1064  close(cmdFD); cmdFD = -1;
1065  close(reqFD); reqFD = -1;
1066 
1067 // Run through cleaning up the channels. While there should not be any other
1068 // operations happening on this poller, we take the conservative approach.
1069 //
1070  while((cP = attBase))
1071  {REMOVE(attBase, attList, cP);
1072  adMutex.UnLock();
1073  cP->chMutex.Lock();
1074  doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent);
1075  if (cP->inTOQ) TmoDel(cP);
1076  cP->Reset(&pollErr1, cP->chFD, EIDRM);
1077  cP->chPollXQ = &pollErr1;
1078  if (doCB)
1079  {cP->chStat = Channel::isClear;
1080  theCB = cP->chCB; cbArg = cP->chCBA;
1081  cP->chMutex.UnLock();
1082  theCB->Stop(cP, cbArg);
1083  } else cP->chMutex.UnLock();
1084  adMutex.Lock();
1085  }
1086 
1087 // Now invoke the poller specific shutdown
1088 //
1089  Shutdown();
1090  adMutex.UnLock();
1091 }
1092 
1093 /******************************************************************************/
1094 /* T m o A d d */
1095 /******************************************************************************/
1096 
1098 {
1099  XrdSysMutexHelper mHelper(toMutex);
1100  time_t tNow;
1101  Channel *ncP;
1102  bool setRTO, setWTO;
1103 
1104 // Do some tracing
1105 //
1106  IF_TRACE(TmoAdd,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1107  <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1108 
1109 // Remove element from timeout queue if it is there
1110 //
1111  if (cP->inTOQ)
1112  {REMOVE(tmoBase, tmoList, cP);
1113  cP->inTOQ = 0;
1114  }
1115 
1116 // Determine which timeouts need to be reset
1117 //
1118  tmoSet|= cP->dlType >> 4;
1119  setRTO = (tmoSet&tmoMask) & (CallBack::ReadyToRead |CallBack:: ReadTimeOut);
1120  setWTO = (tmoSet&tmoMask) & (CallBack::ReadyToWrite|CallBack::WriteTimeOut);
1121 
1122 // Reset the required deadlines
1123 //
1124  tNow = time(0);
1125  if (setRTO && REVENTS(cP->chEvents) && cP->chRTO)
1126  cP->rdDL = cP->chRTO + tNow;
1127  if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO)
1128  cP->wrDL = cP->chWTO + tNow;
1129 
1130 // Calculate the closest enabled deadline
1131 //
1132  if (cP->rdDL < cP->wrDL)
1133  {cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut;
1134  } else {
1135  cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut;
1136  if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut;
1137  }
1138  IF_TRACE(TmoAdd, cP->chFD, "t=" <<tNow <<" rdDL=" <<setRTO <<' ' <<cP->rdDL
1139  <<" wrDL=" <<setWTO <<' ' <<cP->wrDL);
1140 
1141 // If no timeout really applies, we are done
1142 //
1143  if (cP->deadLine == maxTime) return false;
1144 
1145 // Add the channel to the timeout queue in correct deadline position.
1146 //
1147  if ((ncP = tmoBase))
1148  {do {if (cP->deadLine < ncP->deadLine) break;
1149  ncP = ncP->tmoList.next;
1150  } while(ncP != tmoBase);
1151  INSERT(tmoList, ncP, cP);
1152  if (cP->deadLine < tmoBase->deadLine) tmoBase = cP;
1153  } else tmoBase = cP;
1154  cP->inTOQ = 1;
1155 
1156 // Indicate to the caller whether or not a wakeup is required
1157 //
1158  return (tmoBase == cP);
1159 }
1160 
1161 /******************************************************************************/
1162 /* T m o D e l */
1163 /******************************************************************************/
1164 
1166 {
1167 
1168 // Do some tracing
1169 //
1170  IF_TRACE(TmoDel,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1171  <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1172 
1173 // Get the timeout queue lock and remove the channel from the queue
1174 //
1175  toMutex.Lock();
1176  REMOVE(tmoBase, tmoList, cP);
1177  cP->inTOQ = 0;
1178  toMutex.UnLock();
1179 }
1180 
1181 /******************************************************************************/
1182 /* T m o G e t */
1183 /******************************************************************************/
1184 
1186 {
1187  int wtval;
1188 
1189 // Lock the timeout queue
1190 //
1191  toMutex.Lock();
1192 
1193 // Calculate wait time. If the deadline passed, invoke the timeout callback.
1194 // we will need to drop the timeout lock as we don't have the channel lock.
1195 //
1196  do {if (!tmoBase) {wtval = -1; break;}
1197  wtval = (tmoBase->deadLine - time(0)) * 1000;
1198  if (wtval > 0) break;
1199  toMutex.UnLock();
1200  CbkTMO();
1201  toMutex.Lock();
1202  } while(1);
1203 
1204 // Return the value
1205 //
1206  CPP_ATOMIC_STORE(wakePend, false, std::memory_order_release);
1207  toMutex.UnLock();
1208  return wtval;
1209 }
1210 
1211 /******************************************************************************/
1212 /* W a k e U p */
1213 /******************************************************************************/
1214 
1215 void XrdSys::IOEvents::Poller::WakeUp()
1216 {
1217  static PipeData cmdbuff(PipeData::NoOp);
1218 
1219 // Send it off to wakeup the poller thread, but only if here is no wakeup in
1220 // progress.
1221 //
1222 // We use a mutex here because we want to produce a synchronization point - all
1223 // threads that might be interested timeouts and wakeups are going to incur a
1224 // cache bounce for the page where wakePend resides; they will see a consistent
1225 // view of the wakePend flag. For those threads, this is equivalent to
1226 // an atomic with memory_order std::memory_order_seq_cst (the strongest ordering).
1227 // However, the threads that are not interested in timeouts will not get a flush
1228 // for their copy of the wakePend page. They will still have the weaker memory
1229 // ordering of consume/release (which is guaranteed anyway on all current architectures
1230 // except for DEC Alpha).
1231  toMutex.Lock();
1232  bool isWakePend = CPP_ATOMIC_LOAD(wakePend, std::memory_order_consume);
1233  if (isWakePend) {toMutex.UnLock();}
1234  else {CPP_ATOMIC_STORE(wakePend, true, std::memory_order_release);
1235  toMutex.UnLock();
1236  SendCmd(cmdbuff);
1237  }
1238 }
1239 
1240 /******************************************************************************/
1241 /* I m p l e m e n t a t i o n S p e c i f i c s */
1242 /******************************************************************************/
1243 
1244 #if defined( __solaris__ )
1246 #elif defined( __linux__ )
1248 #elif defined(__APPLE__)
1250 #else
1252 #endif
ssize_t read(int fildes, void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:48
#define write(a, b, c)
Definition: XrdPosix.hh:123
#define CPP_ATOMIC_LOAD(x, order)
#define CPP_ATOMIC_STORE(x, val, order)
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
#define IF_TRACE(x, fd, y)
#define TRACE_LOK
#define TRACING
#define REMOVE(dlbase, dlvar, curitem)
#define STATUS
#define TRACE_NOD(x, fd, y)
#define STATUSOF(x)
#define DO_TRACE(x, fd, y)
#define REVENTS(x)
#define BOOLNAME(x)
#define TRACE_MOD(x, fd, y)
#define ISPOLLER
#define INSERT(dlvar, curitem, newitem)
#define WEVENTS(x)
#define XRDSYSTHREAD_BIND
void Lock(XrdSysMutex *Mutex)
static int Same(pthread_t t1, pthread_t t2)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static pthread_t ID(void)
static void * Start(void *parg)
virtual void Fatal(Channel *chP, void *cbArg, int eNum, const char *eTxt)
virtual bool Event(Channel *chP, void *cbArg, int evFlags)=0
virtual void Stop(Channel *chP, void *cbArg)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ WriteTimeOut
Write timeout.
@ ValidEvents
Mask to test for valid events.
void SetCallBack(CallBack *cbP, void *cbArg=0)
void GetCallBack(CallBack **cbP, void **cbArg)
@ errorEvents
Error event non-r/w specific.
@ stopEvent
Poller stop event.
bool Enable(int events, int timeout=0, const char **eText=0)
Channel(Poller *pollP, int fd, CallBack *cbP=0, void *cbArg=0)
bool Disable(int events, const char **eText=0)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
virtual bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
virtual bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
int GetFault(Channel *cP)
Poller(int cFD, int rFD)
static Poller * Create(int &eNum, const char **eTxt=0, int crOpts=0)
void TmoDel(Channel *cP)
virtual void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)=0
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt)
int SendCmd(PipeData &cmd)
int Poll2Enum(short events)
bool TmoAdd(Channel *cP, int tmoSet)
void SetPollEnt(Channel *cP, int ptEnt)
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd)
@ dec
Definition: XrdSysTrace.hh:42
@ hex
Definition: XrdSysTrace.hh:42
XrdSysSemaphore * pollSync