xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClAsyncPageReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
20 #define SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
21 
23 #include "XrdCl/XrdClSocket.hh"
25 #include "XrdSys/XrdSysPageSize.hh"
26 
27 #include <sys/uio.h>
28 #include <memory>
29 #include <arpa/inet.h>
30 
31 namespace XrdCl
32 {
33 
34 //------------------------------------------------------------------------------
36 //------------------------------------------------------------------------------
38 {
39  public:
40 
41  //--------------------------------------------------------------------------
46  //--------------------------------------------------------------------------
48  std::vector<uint32_t> &digests ) :
49  chunks( chunks ),
50  digests( digests ),
51  dlen( 0 ),
52  rspoff( 0 ),
53  chindex( 0 ),
54  choff( 0 ),
55  dgindex( 0 ),
56  dgoff( 0 ),
57  iovcnt( 0 ),
58  iovindex( 0 )
59  {
60  uint64_t rdoff = chunks.front().offset;
61  uint32_t rdlen = 0;
62  for( auto &ch : chunks )
63  rdlen += ch.length;
64  int fpglen, lpglen;
65  int pgcnt = XrdOucPgrwUtils::csNum( rdoff, rdlen, fpglen, lpglen);
66  digests.resize( pgcnt );
67  }
68 
69  //--------------------------------------------------------------------------
71  //--------------------------------------------------------------------------
72  virtual ~AsyncPageReader()
73  {
74  }
75 
76  //--------------------------------------------------------------------------
78  //--------------------------------------------------------------------------
79  void SetRsp( ServerResponseV2 *rsp )
80  {
81  dlen = rsp->status.bdy.dlen;
82  rspoff = rsp->info.pgread.offset;
83 
84  uint64_t bufoff = rspoff - chunks[0].offset;
85  chindex = 0;
86 
87  for( chindex = 0; chindex < chunks.size(); ++chindex )
88  {
89  if( chunks[chindex].length < bufoff )
90  {
91  bufoff -= chunks[chindex].length;
92  continue;
93  }
94  break;
95  }
96  choff = bufoff;
97  }
98 
99  //--------------------------------------------------------------------------
104  //--------------------------------------------------------------------------
105  XRootDStatus Read( Socket &socket, uint32_t &btsread )
106  {
107  if( dlen == 0 || chindex >= chunks.size() )
108  return XRootDStatus();
109  btsread = 0;
110  int nbbts = 0;
111  do
112  {
113  // Prepare the IO vector for receiving the data
114  if( iov.empty() )
115  InitIOV();
116  // read the data into the buffer
117  nbbts = 0;
118  auto st = socket.ReadV( iov.data() + iovindex, iovcnt, nbbts );
119  if( !st.IsOK() )
120  return st;
121  btsread += nbbts;
122  dlen -= nbbts;
123  ShiftIOV( nbbts );
124  if( st.code == suRetry )
125  return st;
126  }
127  while( nbbts > 0 && dlen > 0 && chindex < chunks.size() );
128 
129  return XRootDStatus();
130  }
131 
132  private:
133 
134  //--------------------------------------------------------------------------
136  //--------------------------------------------------------------------------
137  struct iovmax_t
138  {
140  {
141 #ifdef _SC_IOV_MAX
142  value = sysconf(_SC_IOV_MAX);
143  if (value == -1)
144 #endif
145 #ifdef IOV_MAX
146  value = IOV_MAX;
147 #else
148  value = 1024;
149 #endif
150  value &= ~uint32_t( 1 ); // make sure it is an even number
151  }
152  int32_t value;
153  };
154 
155  //--------------------------------------------------------------------------
157  //--------------------------------------------------------------------------
158  inline static int max_iovcnt()
159  {
160  static iovmax_t iovmax;
161  return iovmax.value;
162  }
163 
164  //--------------------------------------------------------------------------
166  //--------------------------------------------------------------------------
167  inline void addiov( char *&buf, size_t len )
168  {
169  iov.emplace_back();
170  iov.back().iov_base = buf;
171  iov.back().iov_len = len;
172  buf += len;
173  ++iovcnt;
174  }
175 
176  //--------------------------------------------------------------------------
178  //--------------------------------------------------------------------------
179  inline void addiov( char *&buf, uint32_t len, uint32_t &dleft )
180  {
181  if( len > dleft ) len = dleft;
182  addiov( buf, len );
183  dleft -= len;
184  }
185 
186  //--------------------------------------------------------------------------
189  //--------------------------------------------------------------------------
190  inline static uint32_t CalcIOVSize( uint32_t dleft )
191  {
192  uint32_t ret = ( dleft / PageWithDigest + 2 ) * 2;
193  return ( ret > uint32_t( max_iovcnt() ) ? max_iovcnt() : ret );
194  }
195 
196  //--------------------------------------------------------------------------
198  //--------------------------------------------------------------------------
199  uint32_t CalcRdSize()
200  {
201  // data size in the server response (including digests)
202  uint32_t dleft = dlen;
203  // space in our page buffer
204  uint32_t pgspace = chunks[chindex].length - choff;
205  // space in our digest buffer
206  uint32_t dgspace = sizeof( uint32_t ) * (digests.size() - dgindex ) - dgoff;
207  if( dleft > pgspace + dgspace )
208  dleft = pgspace + dgspace;
209  return dleft;
210  }
211 
212  //--------------------------------------------------------------------------
214  //--------------------------------------------------------------------------
215  void InitIOV()
216  {
217  iovindex = 0;
218  // figure out the number of data we can read in one go
219  uint32_t dleft = CalcRdSize();
220  // and reset the I/O vector
221  iov.clear();
222  iovcnt = 0;
223  iov.reserve( CalcIOVSize( dleft ) );
224  // now prepare the page and digest buffers
225  ChunkInfo ch = chunks[chindex];
226  char* pgbuf = static_cast<char*>( ch.buffer ) + choff;
227  uint64_t rdoff = ch.offset + choff;
228  char* dgbuf = reinterpret_cast<char*>( digests.data() + dgindex ) + dgoff;
229  // handle the first digest
230  uint32_t fdglen = sizeof( uint32_t ) - dgoff;
231  addiov( dgbuf, fdglen, dleft );
232  if( dleft == 0 || iovcnt >= max_iovcnt() )
233  return;
234  // handle the first page
235  uint32_t fpglen = XrdSys::PageSize - rdoff % XrdSys::PageSize;
236  addiov( pgbuf, fpglen, dleft );
237  if( dleft == 0 || iovcnt >= max_iovcnt() )
238  return;
239  // handle all the subsequent aligned pages
240  size_t fullpgs = dleft / PageWithDigest;
241  for( size_t i = 0; i < fullpgs; ++i )
242  {
243  addiov( dgbuf, sizeof( uint32_t ), dleft );
244  if( dleft == 0 || iovcnt >= max_iovcnt() )
245  return;
246  addiov( pgbuf, XrdSys::PageSize, dleft );
247  if( dleft == 0 || iovcnt >= max_iovcnt() )
248  return;
249  }
250  // handle the last digest
251  uint32_t ldglen = sizeof( uint32_t );
252  addiov( dgbuf, ldglen, dleft );
253  if( dleft == 0 || iovcnt >= max_iovcnt() )
254  return;
255  // handle the last page
256  addiov( pgbuf, dleft, dleft );
257  }
258 
259  //--------------------------------------------------------------------------
261  //--------------------------------------------------------------------------
262  inline void shift( void *&buffer, size_t nbbts )
263  {
264  char *buf = static_cast<char*>( buffer );
265  buf += nbbts;
266  buffer = buf;
267  }
268 
269  //--------------------------------------------------------------------------
273  //--------------------------------------------------------------------------
274  inline void shiftdgbuf( uint32_t &btsread )
275  {
276  if( iov[iovindex].iov_len > btsread )
277  {
278  iov[iovindex].iov_len -= btsread;
279  shift( iov[iovindex].iov_base, btsread );
280  dgoff += btsread;
281  btsread = 0;
282  return;
283  }
284 
285  btsread -= iov[iovindex].iov_len;
286  iov[iovindex].iov_len = 0;
287  dgoff = 0;
288  digests[dgindex] = ntohl( digests[dgindex] );
289  ++dgindex;
290  ++iovindex;
291  --iovcnt;
292  }
293 
294  //--------------------------------------------------------------------------
298  //--------------------------------------------------------------------------
299  inline void shiftpgbuf( uint32_t &btsread )
300  {
301  if( iov[iovindex].iov_len > btsread )
302  {
303  iov[iovindex].iov_len -= btsread;
304  shift( iov[iovindex].iov_base, btsread );
305  choff += btsread;
306  btsread = 0;
307  return;
308  }
309 
310  btsread -= iov[iovindex].iov_len;
311  choff += iov[iovindex].iov_len;
312  iov[iovindex].iov_len = 0;
313  ++iovindex;
314  --iovcnt;
315  }
316 
317  //--------------------------------------------------------------------------
319  //--------------------------------------------------------------------------
320  void ShiftIOV( uint32_t btsread )
321  {
322  // if iovindex is even it point to digest, otherwise it points to a page
323  if( iovindex % 2 == 0 )
324  shiftdgbuf( btsread );
325  // adjust as many I/O buffers as necessary
326  while( btsread > 0 )
327  {
328  // handle page
329  shiftpgbuf( btsread );
330  if( btsread == 0 ) break;
331  // handle digest
332  shiftdgbuf( btsread );
333  }
334  // if we filled the buffer, move to the next one
335  if( iovcnt == 0 )
336  iov.clear();
337  // do we need to move to the next chunk?
338  if( choff >= chunks[chindex].length )
339  {
340  ++chindex;
341  choff = 0;
342  }
343  }
344 
345  ChunkList &chunks; //< list of data chunks to be filled with user data
346  std::vector<uint32_t> &digests; //< list of crc32c digests for every 4KB page of data
347  uint32_t dlen; //< size of the data in the message
348  uint64_t rspoff; //< response offset
349 
350  size_t chindex; //< index of the current data buffer
351  size_t choff; //< offset within the current buffer
352  size_t dgindex; //< index of the current digest buffer
353  size_t dgoff; //< offset within the current digest buffer
354 
355  std::vector<iovec> iov; //< I/O vector
356  int iovcnt; //< size of the I/O vector
357  size_t iovindex; //< index of the first valid element in the I/O vector
358 
359  static const int PageWithDigest = XrdSys::PageSize + sizeof( uint32_t );
360 };
361 
362 } /* namespace XrdEc */
363 
364 #endif /* SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_ */
ServerResponseStatus status
Definition: XProtocol.hh:1305
uint32_t dlen
Definition: XrdClAsyncPageReader.hh:347
std::vector< ChunkInfo > ChunkList
List of chunks.
Definition: XrdClXRootDResponses.hh:1055
void addiov(char *&buf, uint32_t len, uint32_t &dleft)
Add I/O buffer to the vector and update number of bytes left to be read.
Definition: XrdClAsyncPageReader.hh:179
Helper class for retrieving the maximum size of the I/O vector.
Definition: XrdClAsyncPageReader.hh:137
char * data
Definition: XrdOucIOVec.hh:45
ChunkList & chunks
Definition: XrdClAsyncPageReader.hh:345
void InitIOV()
Initialize the I/O vector.
Definition: XrdClAsyncPageReader.hh:215
static const int PageWithDigest
Definition: XrdClAsyncPageReader.hh:359
ServerResponseBody_pgRead pgread
Definition: XProtocol.hh:1308
XRootDStatus Read(Socket &socket, uint32_t &btsread)
Definition: XrdClAsyncPageReader.hh:105
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1257
static uint32_t CalcIOVSize(uint32_t dleft)
Definition: XrdClAsyncPageReader.hh:190
iovmax_t()
Definition: XrdClAsyncPageReader.hh:139
Definition: XProtocol.hh:1303
kXR_int64 offset
Definition: XProtocol.hh:1056
size_t choff
Definition: XrdClAsyncPageReader.hh:351
void * buffer
length of the chunk
Definition: XrdClXRootDResponses.hh:950
void SetRsp(ServerResponseV2 *rsp)
Sets message data size.
Definition: XrdClAsyncPageReader.hh:79
Definition: XrdOucIOVec.hh:65
std::vector< iovec > iov
Definition: XrdClAsyncPageReader.hh:355
void shift(void *&buffer, size_t nbbts)
Shift buffer by a number of bytes.
Definition: XrdClAsyncPageReader.hh:262
kXR_int32 dlen
Definition: XProtocol.hh:1236
void shiftpgbuf(uint32_t &btsread)
Definition: XrdClAsyncPageReader.hh:299
static const int PageSize
Definition: XrdSysPageSize.hh:36
void addiov(char *&buf, size_t len)
Add I/O buffer to the vector.
Definition: XrdClAsyncPageReader.hh:167
Describe a data chunk for vector read.
Definition: XrdClXRootDResponses.hh:916
Request status.
Definition: XrdClXRootDResponses.hh:218
size_t chindex
Definition: XrdClAsyncPageReader.hh:350
void ShiftIOV(uint32_t btsread)
shift the I/O vector by the number of bytes read
Definition: XrdClAsyncPageReader.hh:320
void shiftdgbuf(uint32_t &btsread)
Definition: XrdClAsyncPageReader.hh:274
std::vector< uint32_t > & digests
Definition: XrdClAsyncPageReader.hh:346
size_t iovindex
Definition: XrdClAsyncPageReader.hh:357
uint64_t rspoff
Definition: XrdClAsyncPageReader.hh:348
uint32_t CalcRdSize()
Calculate the size of the data to be read.
Definition: XrdClAsyncPageReader.hh:199
uint64_t offset
Definition: XrdClXRootDResponses.hh:948
int32_t value
Definition: XrdClAsyncPageReader.hh:152
AsyncPageReader(ChunkList &chunks, std::vector< uint32_t > &digests)
Definition: XrdClAsyncPageReader.hh:47
union ServerResponseV2::@1 info
int iovcnt
Definition: XrdClAsyncPageReader.hh:356
const uint16_t suRetry
Definition: XrdClStatus.hh:40
Object for reading out data from the PgRead response.
Definition: XrdClAsyncPageReader.hh:37
XRootDStatus ReadV(iovec *iov, int iocnt, int &bytesRead)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset &amp; length.
A network socket.
Definition: XrdClSocket.hh:42
static int max_iovcnt()
Definition: XrdClAsyncPageReader.hh:158
size_t dgindex
Definition: XrdClAsyncPageReader.hh:352
size_t dgoff
Definition: XrdClAsyncPageReader.hh:353
virtual ~AsyncPageReader()
Destructor.
Definition: XrdClAsyncPageReader.hh:72