xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClAsyncPageReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
20 #define SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
21 
23 #include "XrdCl/XrdClSocket.hh"
25 #include "XrdSys/XrdSysPageSize.hh"
26 
27 #include <sys/uio.h>
28 #include <memory>
29 #include <arpa/inet.h>
30 
31 namespace XrdCl
32 {
33 
34 //------------------------------------------------------------------------------
36 //------------------------------------------------------------------------------
38 {
39  public:
40 
41  //--------------------------------------------------------------------------
49  //--------------------------------------------------------------------------
51  std::vector<uint32_t> &digests ) :
52  chunks( chunks ),
53  digests( digests ),
54  dlen( 0 ),
55  rspoff( 0 ),
56  chindex( 0 ),
57  choff( 0 ),
58  dgindex( 0 ),
59  dgoff( 0 ),
60  iovcnt( 0 ),
61  iovindex( 0 )
62  {
63  uint64_t rdoff = chunks.front().offset;
64  uint32_t rdlen = 0;
65  for( auto &ch : chunks )
66  rdlen += ch.length;
67  int fpglen, lpglen;
68  int pgcnt = XrdOucPgrwUtils::csNum( rdoff, rdlen, fpglen, lpglen);
69  digests.resize( pgcnt );
70  }
71 
72  //--------------------------------------------------------------------------
74  //--------------------------------------------------------------------------
75  virtual ~AsyncPageReader()
76  {
77  }
78 
79  //--------------------------------------------------------------------------
81  //--------------------------------------------------------------------------
82  void SetRsp( ServerResponseV2 *rsp )
83  {
84  dlen = rsp->status.bdy.dlen;
85  rspoff = rsp->info.pgread.offset;
86 
87  uint64_t bufoff = rspoff - chunks[0].offset;
88  chindex = 0;
89 
90  for( chindex = 0; chindex < chunks.size(); ++chindex )
91  {
92  if( chunks[chindex].length < bufoff )
93  {
94  bufoff -= chunks[chindex].length;
95  continue;
96  }
97  break;
98  }
99  choff = bufoff;
100  }
101 
102  //--------------------------------------------------------------------------
106  //--------------------------------------------------------------------------
107  XRootDStatus Read( Socket &socket, uint32_t &btsread )
108  {
109  if( dlen == 0 || chindex >= chunks.size() )
110  return XRootDStatus();
111  btsread = 0;
112  int nbbts = 0;
113  do
114  {
115  // Prepare the IO vector for receiving the data
116  if( iov.empty() )
117  InitIOV();
118  // read the data into the buffer
119  nbbts = 0;
120  auto st = socket.ReadV( iov.data() + iovindex, iovcnt, nbbts );
121  if( !st.IsOK() )
122  return st;
123  btsread += nbbts;
124  dlen -= nbbts;
125  ShiftIOV( nbbts );
126  if( st.code == suRetry )
127  return st;
128  }
129  while( nbbts > 0 && dlen > 0 && chindex < chunks.size() );
130 
131  return XRootDStatus();
132  }
133 
134  private:
135 
136  //--------------------------------------------------------------------------
138  //--------------------------------------------------------------------------
139  struct iovmax_t
140  {
142  {
143 #ifdef _SC_IOV_MAX
144  value = sysconf(_SC_IOV_MAX);
145  if (value == -1)
146 #endif
147 #ifdef IOV_MAX
148  value = IOV_MAX;
149 #else
150  value = 1024;
151 #endif
152  value &= ~uint32_t( 1 ); // make sure it is an even number
153  }
154  int32_t value;
155  };
156 
157  //--------------------------------------------------------------------------
159  //--------------------------------------------------------------------------
160  inline static int max_iovcnt()
161  {
162  static iovmax_t iovmax;
163  return iovmax.value;
164  }
165 
166  //--------------------------------------------------------------------------
168  //--------------------------------------------------------------------------
169  inline void addiov( char *&buf, size_t len )
170  {
171  iov.emplace_back();
172  iov.back().iov_base = buf;
173  iov.back().iov_len = len;
174  buf += len;
175  ++iovcnt;
176  }
177 
178  //--------------------------------------------------------------------------
180  //--------------------------------------------------------------------------
181  inline void addiov( char *&buf, uint32_t len, uint32_t &dleft )
182  {
183  if( len > dleft ) len = dleft;
184  addiov( buf, len );
185  dleft -= len;
186  }
187 
188  //--------------------------------------------------------------------------
191  //--------------------------------------------------------------------------
192  inline static uint32_t CalcIOVSize( uint32_t dleft )
193  {
194  uint32_t ret = ( dleft / PageWithDigest + 2 ) * 2;
195  return ( ret > uint32_t( max_iovcnt() ) ? max_iovcnt() : ret );
196  }
197 
198  //--------------------------------------------------------------------------
200  //--------------------------------------------------------------------------
201  uint32_t CalcRdSize()
202  {
203  // data size in the server response (including digests)
204  uint32_t dleft = dlen;
205  // space in our page buffer
206  uint32_t pgspace = chunks[chindex].length - choff;
207  // space in our digest buffer
208  uint32_t dgspace = sizeof( uint32_t ) * (digests.size() - dgindex ) - dgoff;
209  if( dleft > pgspace + dgspace )
210  dleft = pgspace + dgspace;
211  return dleft;
212  }
213 
214  //--------------------------------------------------------------------------
216  //--------------------------------------------------------------------------
217  void InitIOV()
218  {
219  iovindex = 0;
220  // figure out the number of data we can read in one go
221  uint32_t dleft = CalcRdSize();
222  // and reset the I/O vector
223  iov.clear();
224  iovcnt = 0;
225  iov.reserve( CalcIOVSize( dleft ) );
226  // now prepare the page and digest buffers
227  ChunkInfo ch = chunks[chindex];
228  char* pgbuf = static_cast<char*>( ch.buffer ) + choff;
229  uint64_t rdoff = ch.offset + choff;
230  char* dgbuf = reinterpret_cast<char*>( digests.data() + dgindex ) + dgoff;
231  // handle the first digest
232  uint32_t fdglen = sizeof( uint32_t ) - dgoff;
233  addiov( dgbuf, fdglen, dleft );
234  if( dleft == 0 || iovcnt >= max_iovcnt() )
235  return;
236  // handle the first page
237  uint32_t fpglen = XrdSys::PageSize - rdoff % XrdSys::PageSize;
238  addiov( pgbuf, fpglen, dleft );
239  if( dleft == 0 || iovcnt >= max_iovcnt() )
240  return;
241  // handle all the subsequent aligned pages
242  size_t fullpgs = dleft / PageWithDigest;
243  for( size_t i = 0; i < fullpgs; ++i )
244  {
245  addiov( dgbuf, sizeof( uint32_t ), dleft );
246  if( dleft == 0 || iovcnt >= max_iovcnt() )
247  return;
248  addiov( pgbuf, XrdSys::PageSize, dleft );
249  if( dleft == 0 || iovcnt >= max_iovcnt() )
250  return;
251  }
252  // handle the last digest
253  uint32_t ldglen = sizeof( uint32_t );
254  addiov( dgbuf, ldglen, dleft );
255  if( dleft == 0 || iovcnt >= max_iovcnt() )
256  return;
257  // handle the last page
258  addiov( pgbuf, dleft, dleft );
259  }
260 
261  //--------------------------------------------------------------------------
263  //--------------------------------------------------------------------------
264  inline void shift( void *&buffer, size_t nbbts )
265  {
266  char *buf = static_cast<char*>( buffer );
267  buf += nbbts;
268  buffer = buf;
269  }
270 
271  //--------------------------------------------------------------------------
275  //--------------------------------------------------------------------------
276  inline void shiftdgbuf( uint32_t &btsread )
277  {
278  if( iov[iovindex].iov_len > btsread )
279  {
280  iov[iovindex].iov_len -= btsread;
281  shift( iov[iovindex].iov_base, btsread );
282  dgoff += btsread;
283  btsread = 0;
284  return;
285  }
286 
287  btsread -= iov[iovindex].iov_len;
288  iov[iovindex].iov_len = 0;
289  dgoff = 0;
290  digests[dgindex] = ntohl( digests[dgindex] );
291  ++dgindex;
292  ++iovindex;
293  --iovcnt;
294  }
295 
296  //--------------------------------------------------------------------------
300  //--------------------------------------------------------------------------
301  inline void shiftpgbuf( uint32_t &btsread )
302  {
303  if( iov[iovindex].iov_len > btsread )
304  {
305  iov[iovindex].iov_len -= btsread;
306  shift( iov[iovindex].iov_base, btsread );
307  choff += btsread;
308  btsread = 0;
309  return;
310  }
311 
312  btsread -= iov[iovindex].iov_len;
313  choff += iov[iovindex].iov_len;
314  iov[iovindex].iov_len = 0;
315  ++iovindex;
316  --iovcnt;
317  }
318 
319  //--------------------------------------------------------------------------
321  //--------------------------------------------------------------------------
322  void ShiftIOV( uint32_t btsread )
323  {
324  // if iovindex is even it point to digest, otherwise it points to a page
325  if( iovindex % 2 == 0 )
326  shiftdgbuf( btsread );
327  // adjust as many I/O buffers as necessary
328  while( btsread > 0 )
329  {
330  // handle page
331  shiftpgbuf( btsread );
332  if( btsread == 0 ) break;
333  // handle digest
334  shiftdgbuf( btsread );
335  }
336  // if we filled the buffer, move to the next one
337  if( iovcnt == 0 )
338  iov.clear();
339  // do we need to move to the next chunk?
340  if( choff >= chunks[chindex].length )
341  {
342  ++chindex;
343  choff = 0;
344  }
345  }
346 
347  ChunkList &chunks; //< list of data chunks to be filled with user data
348  std::vector<uint32_t> &digests; //< list of crc32c digests for every 4KB page of data
349  uint32_t dlen; //< size of the data in the message
350  uint64_t rspoff; //< response offset
351 
352  size_t chindex; //< index of the current data buffer
353  size_t choff; //< offset within the current buffer
354  size_t dgindex; //< index of the current digest buffer
355  size_t dgoff; //< offset within the current digest buffer
356 
357  std::vector<iovec> iov; //< I/O vector
358  int iovcnt; //< size of the I/O vector
359  size_t iovindex; //< index of the first valid element in the I/O vector
360 
361  static const int PageWithDigest = XrdSys::PageSize + sizeof( uint32_t );
362 };
363 
364 } /* namespace XrdEc */
365 
366 #endif /* SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_ */
ServerResponseStatus status
Definition: XProtocol.hh:1305
uint32_t dlen
Definition: XrdClAsyncPageReader.hh:349
std::vector< ChunkInfo > ChunkList
List of chunks.
Definition: XrdClXRootDResponses.hh:1046
void addiov(char *&buf, uint32_t len, uint32_t &dleft)
Add I/O buffer to the vector and update number of bytes left to be read.
Definition: XrdClAsyncPageReader.hh:181
Helper class for retrieving the maximum size of the I/O vector.
Definition: XrdClAsyncPageReader.hh:139
char * data
Definition: XrdOucIOVec.hh:45
ChunkList & chunks
Definition: XrdClAsyncPageReader.hh:347
void InitIOV()
Initialize the I/O vector.
Definition: XrdClAsyncPageReader.hh:217
static const int PageWithDigest
Definition: XrdClAsyncPageReader.hh:361
ServerResponseBody_pgRead pgread
Definition: XProtocol.hh:1308
XRootDStatus Read(Socket &socket, uint32_t &btsread)
Definition: XrdClAsyncPageReader.hh:107
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1257
static uint32_t CalcIOVSize(uint32_t dleft)
Definition: XrdClAsyncPageReader.hh:192
iovmax_t()
Definition: XrdClAsyncPageReader.hh:141
Definition: XProtocol.hh:1303
kXR_int64 offset
Definition: XProtocol.hh:1056
size_t choff
Definition: XrdClAsyncPageReader.hh:353
void * buffer
length of the chunk
Definition: XrdClXRootDResponses.hh:941
void SetRsp(ServerResponseV2 *rsp)
Sets message data size.
Definition: XrdClAsyncPageReader.hh:82
Definition: XrdOucIOVec.hh:65
std::vector< iovec > iov
Definition: XrdClAsyncPageReader.hh:357
void shift(void *&buffer, size_t nbbts)
Shift buffer by a number of bytes.
Definition: XrdClAsyncPageReader.hh:264
kXR_int32 dlen
Definition: XProtocol.hh:1236
void shiftpgbuf(uint32_t &btsread)
Definition: XrdClAsyncPageReader.hh:301
static const int PageSize
Definition: XrdSysPageSize.hh:36
void addiov(char *&buf, size_t len)
Add I/O buffer to the vector.
Definition: XrdClAsyncPageReader.hh:169
Describe a data chunk for vector read.
Definition: XrdClXRootDResponses.hh:907
Request status.
Definition: XrdClXRootDResponses.hh:218
size_t chindex
Definition: XrdClAsyncPageReader.hh:352
void ShiftIOV(uint32_t btsread)
shift the I/O vector by the number of bytes read
Definition: XrdClAsyncPageReader.hh:322
void shiftdgbuf(uint32_t &btsread)
Definition: XrdClAsyncPageReader.hh:276
std::vector< uint32_t > & digests
Definition: XrdClAsyncPageReader.hh:348
size_t iovindex
Definition: XrdClAsyncPageReader.hh:359
uint64_t rspoff
Definition: XrdClAsyncPageReader.hh:350
uint32_t CalcRdSize()
Calculate the size of the data to be read.
Definition: XrdClAsyncPageReader.hh:201
uint64_t offset
Definition: XrdClXRootDResponses.hh:939
int32_t value
Definition: XrdClAsyncPageReader.hh:154
AsyncPageReader(ChunkList &chunks, std::vector< uint32_t > &digests)
Definition: XrdClAsyncPageReader.hh:50
union ServerResponseV2::@1 info
int iovcnt
Definition: XrdClAsyncPageReader.hh:358
const uint16_t suRetry
Definition: XrdClStatus.hh:40
Object for reading out data from the PgRead response.
Definition: XrdClAsyncPageReader.hh:37
XRootDStatus ReadV(iovec *iov, int iocnt, int &bytesRead)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset &amp; length.
A network socket.
Definition: XrdClSocket.hh:42
static int max_iovcnt()
Definition: XrdClAsyncPageReader.hh:160
size_t dgindex
Definition: XrdClAsyncPageReader.hh:354
size_t dgoff
Definition: XrdClAsyncPageReader.hh:355
virtual ~AsyncPageReader()
Destructor.
Definition: XrdClAsyncPageReader.hh:75