XRootD
XrdPollPoll.icc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d P o l l P o l l . i c c */
4 /* */
5 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <signal.h>
31 #include <cstdlib>
32 
33 #include "Xrd/XrdLinkCtl.hh"
34 #include "Xrd/XrdPollPoll.hh"
35 #include "Xrd/XrdScheduler.hh"
36 
37 /******************************************************************************/
38 /* n e w P o l l e r */
39 /******************************************************************************/
40 
41 XrdPoll *XrdPoll::newPoller(int pollid, int maxfd)
42 {
43  int bytes, alignment, pagsz = getpagesize();
44  struct pollfd *pp;
45 
46 // Calculate the size of the poll table and allocate it
47 //
48  bytes = maxfd * sizeof(struct pollfd);
49  alignment = (bytes < pagsz ? 1024 : pagsz);
50  if (posix_memalign((void **)&pp, alignment, bytes))
51  {Log.Emsg("Poll", ENOMEM, "create poll table");
52  return 0;
53  }
54 
55 // Create new poll object
56 //
57  memset((void *)pp, 0, bytes);
58  return (XrdPoll *)new XrdPollPoll(pp, maxfd);
59 }
60 
61 /******************************************************************************/
62 /* C o n s t r c u t o r */
63 /******************************************************************************/
64 
65 XrdPollPoll::XrdPollPoll(struct pollfd *pp, int numfd)
66 {
67 
68 // Initialize the standard stuff
69 //
70  PollTab = pp;
71  PollTNum= 0;
72  PollQ = 0;
73  maxent = numfd;
74 }
75 
76 /******************************************************************************/
77 /* D e s t r u c t o r */
78 /******************************************************************************/
79 
81 {
82  if (PollTab) free(PollTab);
83 }
84 
85 /******************************************************************************/
86 /* I n c l u d e */
87 /******************************************************************************/
88 
90 {
91  struct pollfd *pfd;
92  int ptnum;
93 
94 // Lock down the poll data structure
95 //
96  PollMutex.Lock();
97 
98 // Get the next entry to be used
99 //
100  ptnum = 0;
101  while((ptnum < PollTNum) && (PollTab[ptnum].fd != -1)) ptnum++;
102 
103 // Make sure we have enough table entries to add this link
104 //
105  if (ptnum > maxent)
106  {Log.Emsg("Attach","Attach",pInfo.Link.ID,"failed; poll table overflow.");
107  PollMutex.UnLock();
108  return 0;
109  }
110 
111 // Initialize the polltable entry
112 //
113  pfd = &(PollTab[ptnum]);
114  pfd->fd = -pInfo.FD;
115  pfd->events = POLLIN | POLLRDNORM;
116  pfd->revents = 0;
117 
118 // Record relevant information in the link
119 //
120  pInfo.PollEnt = pfd;
121  if (ptnum == PollTNum) PollTNum++;
122 
123 // All done
124 //
125  PollMutex.UnLock();
126  return 1;
127 }
128 
129 /******************************************************************************/
130 /* D i s a b l e */
131 /******************************************************************************/
132 
133 void XrdPollPoll::Disable(XrdPollInfo &pInfo, const char *etxt)
134 {
135  XrdSysSemaphore mySem(0);
136  PipeData cmdbuff[2];
137  int myerrno = 0;
138 
139 // Check if this link is in the pollQ. If so, remove it.
140 //
141  if (pInfo.inQ) dqLink(&pInfo);
142 
143 // Simply return if the link is already disabled
144 //
145  if (!pInfo.isEnabled) return;
146 
147 // Trace this event
148 //
149  TRACEI(POLL, "Poller " <<PID <<" async disabling link FD " <<pInfo.FD);
150 
151 // Send a disable request to the poller thread handling this link. We need to
152 // wait until the operation is actually completed before returning.
153 //
154  memset(&cmdbuff, 0, sizeof(cmdbuff));
155  cmdbuff[0].req = PipeData::DiFD;
156  cmdbuff[0].Parms.Arg.fd = pInfo.FD;
157  cmdbuff[0].Parms.Arg.ent = pInfo.PollEnt - PollTab;
158  cmdbuff[1].req = PipeData::Post;
159  cmdbuff[1].Parms.theSem = &mySem;
160  PollPipe.Lock();
161  if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
162  PollPipe.UnLock();
163 
164 // Verify that all went well and if termination wanted, terminate the link
165 //
166  if (myerrno) Log.Emsg("Poll", myerrno, "disable link", pInfo.Link.ID);
167  else {mySem.Wait();
168  if (etxt && Finish(pInfo, etxt))
169  Sched.Schedule((XrdJob *)&pInfo.Link);
170  }
171 }
172 
173 /******************************************************************************/
174 /* E n a b l e */
175 /******************************************************************************/
176 
178 {
179  PipeData cmdbuff;
180  int myerrno = 0;
181 
182 // Simply return if the link is already enabled
183 //
184  if (pInfo.isEnabled) return 1;
185 
186 // Add this link element to the queue
187 //
188  PollMutex.Lock();
189  pInfo.Next = PollQ;
190  PollQ = &pInfo;
191  pInfo.inQ = true;
192  PollMutex.UnLock();
193 
194 // Send an enable request to the poller thread handling this link
195 //
196  TRACEI(POLL, "sending poller " <<PID <<" enable for link " <<pInfo.FD);
197  cmdbuff.req = PipeData::EnFD;
198  cmdbuff.Parms.Arg.fd = pInfo.FD;
199  cmdbuff.Parms.Arg.ent = pInfo.PollEnt - PollTab;
200  PollPipe.Lock();
201  if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
202  PollPipe.UnLock();
203 
204 // Verify that all went well. Note that the link stays in the pollQ.
205 //
206  if (myerrno)
207  {Log.Emsg("Poll", myerrno, "enable link", pInfo.Link.ID); return 0;}
208 
209 // All done
210 //
211  return 1;
212 }
213 
214 /******************************************************************************/
215 /* E x c l u d e */
216 /******************************************************************************/
217 
219 {
220  XrdSysSemaphore mySem(0);
221  PipeData cmdbuff[2];
222  int myerrno = 0;
223 
224 // Make sure this link is not enabled
225 //
226  if (pInfo.isEnabled)
227  {Log.Emsg("Poll", "Detach of enabled link", pInfo.Link.ID);
228  Disable(pInfo);
229  }
230  else if (pInfo.inQ) dqLink(&pInfo);
231 
232 // Send a deatch request to the poller thread handling this link
233 //
234  TRACEI(POLL, "sending poller " <<PID <<" detach for link " <<pInfo.FD);
235  cmdbuff[0].req = PipeData::RmFD;
236  cmdbuff[0].Parms.Arg.fd = pInfo.FD;
237  cmdbuff[0].Parms.Arg.ent = pInfo.PollEnt - PollTab;
238  cmdbuff[1].req = PipeData::Post;
239  cmdbuff[1].Parms.theSem = &mySem;
240  PollPipe.Lock();
241  if (write(CmdFD, &cmdbuff, sizeof(cmdbuff)) < 0) myerrno = errno;
242  PollPipe.UnLock();
243 
244 // Verify that all went well and if termination wanted, terminate the link
245 //
246  if (myerrno) Log.Emsg("Poll", myerrno, "detach link", pInfo.Link.ID);
247  else mySem.Wait();
248 }
249 
250 /******************************************************************************/
251 /* S t a r t */
252 /******************************************************************************/
253 
254 void XrdPollPoll::Start(XrdSysSemaphore *syncsem, int &retcode)
255 {
256  int numpolled, num2sched;
257  XrdJob *jfirst, *jlast;
258  XrdPollInfo *plp, *nlp, *pInfo;
259  XrdLink *lp;
260  short pollevents;
261  const short pollOK = POLLIN | POLLRDNORM;
262 
263 // Set up he first entry in the poll table to be our communications port
264 //
265  PollTab[0].fd = ReqFD;
266  PollTab[0].events = pollOK;
267  PollTab[0].revents = 0;
268  PollTNum = 1;
269 
270 // Signal the caller to continue
271 //
272  retcode = 0;
273  syncsem->Post();
274 
275 // Now do the main poll loop
276 //
277  do {do {numpolled = poll(PollTab, PollTNum, -1);}
278  while(numpolled < 0 && (errno == EAGAIN || errno == EINTR));
279 
280  // Check if we had a polling error
281  //
282  if (numpolled < 0)
283  {if (errno != EINTR) Restart(errno);
284  else numInterrupts++;
285  continue;
286  }
287  numEvents += numpolled;
288 
289  // Check out base poll table entry, we can do this without a lock
290  //
291  if (PollTab[0].revents & pollOK)
292  {doRequests(numpolled);
293  if (--numpolled <= 0) continue;
294  }
295 
296  // Checkout which links must be dispatched (do this locked)
297  //
298  PollMutex.Lock();
299  plp = 0; nlp = PollQ; jfirst = jlast = 0; num2sched = 0;
300  while ((pInfo = nlp) && numpolled > 0)
301  {if ((pollevents = pInfo->PollEnt->revents))
302  {pInfo->PollEnt->fd = -pInfo->PollEnt->fd;
303  if (plp) nlp = plp->Next = pInfo->Next;
304  else nlp = PollQ = pInfo->Next;
305  numpolled--; pInfo->inQ = false;
306  if (!(pollevents & pollOK))
307  Finish(*pInfo, Poll2Text(pollevents));
308  lp = &(pInfo->Link);
309  if (!(pInfo->isEnabled))
310  Log.Emsg("Poll", "Disabled event occurred for", lp->ID);
311  else {pInfo->isEnabled = false;
312  lp->NextJob = jfirst; jfirst = (XrdJob *)lp;
313  if (!jlast) jlast=(XrdJob *)lp;
314  num2sched++;
315  continue;
316  }
317  }
318  plp = pInfo; nlp = pInfo->Next;
319  }
320  if (numpolled) Recover(numpolled);
321  PollMutex.UnLock();
322 
323  // Schedule the polled links
324  //
325  if (num2sched == 1) Sched.Schedule(jfirst);
326  else if (num2sched) Sched.Schedule(num2sched, jfirst, jlast);
327  } while(1);
328 }
329 
330 /******************************************************************************/
331 /* P r i v a t e M e t h o d s */
332 /******************************************************************************/
333 /******************************************************************************/
334 /* d o D e t a c h */
335 /******************************************************************************/
336 
338 {
339  int lastent;
340 
341 // Get some starting values
342 //
343  PollMutex.Lock();
344  if ((lastent = PollTNum-1) < 0)
345  {Log.Emsg("Poll","Underflow during detach"); abort();}
346 
347  if (pti == lastent)
348  do {PollTNum--;} while(PollTNum && PollTab[PollTNum-1].fd == -1);
349  PollMutex.UnLock();
350 }
351 
352 /******************************************************************************/
353 /* d o R e q u e s t s */
354 /******************************************************************************/
355 
356 void XrdPollPoll::doRequests(int maxreq)
357 {
358  const char *act;
359  int pti, ptfd, num2do;
360  XrdPollInfo *piP;
361 
362 // To keep ourselves from being swamped, base request read-aheads on the number
363 // of pending poll events.
364 //
365  num2do = (maxreq < 3 ? -1 : maxreq);
366 
367 // Now process all poll table manipulation requests
368 //
369  while(num2do-- && getRequest())
370  { if (ReqBuff.req == PipeData::Post)
371  {ReqBuff.Parms.theSem->Post();
372  continue;
373  }
374  pti = ReqBuff.Parms.Arg.ent;
375  if ((ptfd = abs(PollTab[pti].fd)) != ReqBuff.Parms.Arg.fd)
376  {LogEvent(ReqBuff.req, PollTab[pti].fd, ReqBuff.Parms.Arg.fd);
377  continue;
378  }
379  if (!(piP = XrdLinkCtl::fd2PollInfo(ptfd)))
380  {LogEvent(ReqBuff.req, -1, ptfd); continue;}
381  if (ReqBuff.req == PipeData::EnFD)
382  {PollTab[pti].events = POLLIN | POLLRDNORM;
383  PollTab[pti].fd = ptfd;
384  piP->isEnabled = true; numEnabled++;
385  act = " enabled fd ";
386  }
387  else if (ReqBuff.req == PipeData::DiFD)
388  {PollTab[pti].fd = -ptfd;
389  act = " disabled fd ";
390  piP->isEnabled = false;
391  }
392  else if (ReqBuff.req == PipeData::RmFD)
393  {PollTab[pti].fd = -1;
394  doDetach(pti);
395  act = " detached fd ";
396  piP->isEnabled = false;
397  }
398  else {Log.Emsg("Poll", "Received an invalid poll pipe request");
399  continue;
400  }
401  TRACE(POLL, "Poller " <<PID <<act <<ReqBuff.Parms.Arg.fd
402  <<" entry " <<pti <<" now at " <<PollTNum);
403  }
404 }
405 
406 /******************************************************************************/
407 /* d q L i n k */
408 /******************************************************************************/
409 
410 void XrdPollPoll::dqLink(XrdPollInfo *pInfo)
411 {
412  XrdPollInfo *plp, *nlp;
413 
414 // Find matching link in the queue
415 //
416  PollMutex.Lock();
417  pInfo->inQ = false;
418  plp = 0; nlp = PollQ;
419  while (nlp && (pInfo != nlp)) {plp=nlp; nlp = nlp->Next;}
420 
421 // If we found the link, remove it. Otherwise complain
422 //
423  if (nlp) {if (plp) plp->Next = nlp->Next;
424  else PollQ = nlp->Next;
425  PollMutex.UnLock();
426  }
427  else {PollMutex.UnLock();
428  Log.Emsg("dqLink", "Link not found in Q", pInfo->Link.ID);
429  }
430 }
431 
432 /******************************************************************************/
433 /* L o g E v e n t */
434 /******************************************************************************/
435 
436 void XrdPollPoll::LogEvent(int req, int pollfd, int cmdfd)
437 {
438  const char *opn, *id1, *id2;
439  char buff[4096];
440  XrdLink *lp;
441 
442  if (ReqBuff.req == PipeData::EnFD) opn = "enable";
443  else if (ReqBuff.req == PipeData::DiFD) opn = "disable";
444  else if (ReqBuff.req == PipeData::RmFD) opn = "detach";
445  else opn = "???";
446 
447  if (pollfd < 0)
448  {sprintf(buff, "poll %d failed; FD %d", PID, cmdfd);
449  Log.Emsg("Poll", opn, buff, "does not map to a link");
450  return;
451  }
452 
453  if ((lp = XrdLinkCtl::fd2link(pollfd))) id1 = lp->ID;
454  else id1 = "unknown";
455  if ((lp = XrdLinkCtl::fd2link(cmdfd))) id2 = lp->ID;
456  else id2 = "unknown";
457  snprintf(buff, sizeof(buff)-1,
458  "%d poll fd=%d (%s) not equal %s cmd fd=%d (%s).",
459  PID, pollfd, id1, opn, cmdfd, id2);
460 
461  Log.Emsg("Poll", "cmd/poll mismatch:", buff);
462 }
463 
464 /******************************************************************************/
465 /* R e c o v e r */
466 /******************************************************************************/
467 
468 void XrdPollPoll::Recover(int numleft)
469 {
470  int i;
471  XrdPollInfo *piP;
472 
473 // Turn off any unaccounted links
474 //
475  for (i = 1; i < PollTNum; i++)
476  if (PollTab[i].revents)
477  {if (!(piP = XrdLinkCtl::fd2PollInfo(PollTab[i].fd)))
478  PollTab[i].fd = -1;
479  else {piP->isEnabled = false;
480  PollTab[i].fd = -PollTab[i].fd;
481  Log.Emsg("Poll","Improper poll event for",piP->Link.ID);
482  }
483  }
484 }
485 
486 /******************************************************************************/
487 /* R e s t a r t */
488 /******************************************************************************/
489 
490 void XrdPollPoll::Restart(int ecode)
491 {
492  XrdPollInfo *pInfo;
493 
494 // Issue error message
495 //
496  TRACE(POLL, PID <<'-' <<TID <<" Poll error " <<ecode);
497  Log.Emsg("Poll", errno, "poll");
498 
499 // For any outstanding link here, close the link and detach it
500 //
501  PollMutex.Lock();
502  while((pInfo = PollQ))
503  {PollQ = pInfo->Next;
504  pInfo->PollEnt->fd = -1;
505  Finish(*pInfo, "Unexpected polling error");
506  Sched.Schedule((XrdJob *)&(pInfo->Link));
507  }
508  PollMutex.UnLock();
509 }
ssize_t write(int fildes, const void *buf, size_t nbyte)
#define TRACE(act, x)
Definition: XrdTrace.hh:63
#define TRACEI(act, x)
Definition: XrdTrace.hh:66
Definition: XrdJob.hh:43
XrdJob * NextJob
Definition: XrdJob.hh:46
static XrdLink * fd2link(int fd)
Definition: XrdLinkCtl.hh:72
static XrdPollInfo * fd2PollInfo(int fd)
Definition: XrdLinkCtl.hh:103
struct pollfd * PollEnt
Definition: XrdPollInfo.hh:42
XrdPollInfo * Next
Definition: XrdPollInfo.hh:40
XrdLink & Link
Definition: XrdPollInfo.hh:41
bool isEnabled
Definition: XrdPollInfo.hh:46
int Include(XrdPollInfo &pInfo)
Definition: XrdPollPoll.icc:89
XrdPollPoll(struct pollfd *pp, int numfd)
Definition: XrdPollPoll.icc:65
void doDetach(int pti)
void Start(XrdSysSemaphore *syncp, int &rc)
void Disable(XrdPollInfo &pInfo, const char *etxt=0)
void Exclude(XrdPollInfo &pInfo)
int Enable(XrdPollInfo &pInfo)
int numInterrupts
Definition: XrdPoll.hh:134
pthread_t TID
Definition: XrdPoll.hh:83
int PID
Definition: XrdPoll.hh:82
XrdSysMutex PollPipe
Definition: XrdPoll.hh:115
int ReqFD
Definition: XrdPoll.hh:118
int numEvents
Definition: XrdPoll.hh:133
int getRequest()
Definition: XrdPoll.cc:232
PipeData ReqBuff
Definition: XrdPoll.hh:126
static char * Poll2Text(short events)
Definition: XrdPoll.cc:272
static XrdPoll * newPoller(int pollid, int numfd)
Definition: XrdPollE.icc:45
static int Finish(XrdPollInfo &pInfo, const char *etxt=0)
Definition: XrdPoll.cc:204
int numEnabled
Definition: XrdPoll.hh:132
int CmdFD
Definition: XrdPoll.hh:117
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
XrdSysError Log
Definition: XrdConfig.cc:112
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54
union XrdPoll::PipeData::@18 Parms