xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClAsyncVectorReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCVECTORREADER_HH_
20 #define SRC_XRDCL_XRDCLASYNCVECTORREADER_HH_
21 
23 #include "XrdCl/XrdClSocket.hh"
24 #include "XrdCl/XrdClStream.hh"
25 
26 namespace XrdCl
27 {
28 
29  //----------------------------------------------------------------------------
31  //----------------------------------------------------------------------------
33  {
34  public:
35  //------------------------------------------------------------------------
39  //------------------------------------------------------------------------
42  url( url ),
43  chunks( nullptr ),
44  dlen( 0 ),
45  totalbtsrd( 0 ),
46  chidx( 0 ),
47  choff( 0 ),
48  chlen( 0 ),
49  rdlstoff( 0 ),
50  rdlstlen( 0 )
51  {
52  memset( &rdlst, 0, sizeof( readahead_list ) );
53  }
54 
55  //------------------------------------------------------------------------
57  //------------------------------------------------------------------------
59  {
60  }
61 
62  //------------------------------------------------------------------------
64  //------------------------------------------------------------------------
65  void SetDataLength( int dlen )
66  {
67  this->dlen = dlen;
68  this->totalbtsrd = 0;
69  this->readstage = ReadStart;
70  }
71 
72  //------------------------------------------------------------------------
74  //------------------------------------------------------------------------
76  {
77  this->chunks = chunks;
78  this->chstatus.resize( chunks->size() );
79  }
80 
81  //------------------------------------------------------------------------
87  //------------------------------------------------------------------------
88  XRootDStatus Read( Socket &socket, uint32_t &btsret )
89  {
90  Log *log = DefaultEnv::GetLog();
91 
92  while( true )
93  {
94  switch( readstage )
95  {
96  //------------------------------------------------------------------
97  // Prepare to readout a new response
98  //------------------------------------------------------------------
99  case ReadStart:
100  {
101  rdlstoff = 0;
102  rdlstlen = sizeof( readahead_list );
104  continue;
105  }
106 
107  //------------------------------------------------------------------
108  // Readout the read_list
109  //------------------------------------------------------------------
110  case ReadRdLst:
111  {
112  //----------------------------------------------------------------
113  // We cannot afford to read the next header from the stream
114  // because we will cross the message boundary
115  //----------------------------------------------------------------
116  if( totalbtsrd + rdlstlen > dlen )
117  {
118  uint32_t btsleft = dlen - totalbtsrd;
119  log->Error( XRootDMsg, "[%s] ReadRawReadV: No enough data to read "
120  "another chunk header. Discarding %d bytes.",
121  url.GetHostId().c_str(), btsleft );
123  continue;
124  }
125 
126  //----------------------------------------------------------------
127  // Let's readout the read list record from the socket
128  //----------------------------------------------------------------
129  uint32_t btsrd = 0;
130  char *buff = reinterpret_cast<char*>( &rdlst );
131  Status st = ReadBytesAsync( socket, buff + rdlstoff, rdlstlen, btsrd );
132  rdlstoff += btsrd;
133  rdlstlen -= btsrd;
134  totalbtsrd += btsrd;
135  btsret += btsrd;
136 
137  if( !st.IsOK() || st.code == suRetry )
138  return st;
139 
140  //----------------------------------------------------------------
141  // We have a complete read list record, now we need to marshal it
142  //----------------------------------------------------------------
143  rdlst.rlen = ntohl( rdlst.rlen );
144  rdlst.offset = ntohll( rdlst.offset );
145  choff = 0;
146  chlen = rdlst.rlen;
147 
148  //----------------------------------------------------------------
149  // Find the buffer corresponding to the chunk
150  //----------------------------------------------------------------
151  bool chfound = false;
152  for( size_t i = 0; i < chunks->size(); ++i )
153  {
154  if( ( *chunks )[i].offset == uint64_t( rdlst.offset ) &&
155  ( *chunks )[i].length == uint32_t( rdlst.rlen ) )
156  {
157  chfound = true;
158  chidx = i;
159  break;
160  }
161  }
162 
163  //----------------------------------------------------------------
164  // If the chunk was not found this is a bogus response, switch
165  // to discard mode
166  //----------------------------------------------------------------
167  if( !chfound )
168  {
169  log->Error( XRootDMsg, "[%s] ReadRawReadV: Impossible to find chunk "
170  "buffer corresponding to %d bytes at %ld",
171  url.GetHostId().c_str(), rdlst.rlen, rdlst.offset );
172  uint32_t btsleft = dlen - totalbtsrd;
173  log->Dump( XRootDMsg, "[%s] ReadRawReadV: Discarding %d bytes",
174  url.GetHostId().c_str(), btsleft );
176  continue;
177  }
178 
180  continue;
181  }
182 
183  //------------------------------------------------------------------
184  // Readout the raw data
185  //------------------------------------------------------------------
186  case ReadChunk:
187  {
188  //----------------------------------------------------------------
189  // The chunk was found, but reading all the data will cross the
190  // message boundary
191  //----------------------------------------------------------------
192  if( totalbtsrd + chlen > dlen )
193  {
194  uint32_t btsleft = dlen - totalbtsrd;
195  log->Error( XRootDMsg, "[%s] ReadRawReadV: Malformed chunk header: "
196  "reading %d bytes from message would cross the message "
197  "boundary, discarding %d bytes.", url.GetHostId().c_str(),
198  rdlst.rlen, btsleft );
199  chstatus[chidx].sizeerr = true;
201  continue;
202  }
203 
204  //----------------------------------------------------------------
205  // Readout the raw data from the socket
206  //----------------------------------------------------------------
207  uint32_t btsrd = 0;
208  char *buff = static_cast<char*>( ( *chunks )[chidx].buffer );
209  Status st = ReadBytesAsync( socket, buff + choff, chlen, btsrd );
210  choff += btsrd;
211  chlen -= btsrd;
212  totalbtsrd += btsrd;
213  btsret += btsrd;
214 
215  if( !st.IsOK() || st.code == suRetry )
216  return st;
217 
218  log->Dump( XRootDMsg, "[%s] ReadRawReadV: read buffer for chunk %d@%ld",
219  url.GetHostId().c_str(), rdlst.rlen, rdlst.offset );
220 
221  //----------------------------------------------------------------
222  // Mark chunk as done
223  //----------------------------------------------------------------
224  chstatus[chidx].done = true;
225 
226  //----------------------------------------------------------------
227  // There is still data to be read, we need to readout the next
228  // read list record.
229  //----------------------------------------------------------------
230  if( totalbtsrd < dlen )
231  {
232  rdlstoff = 0;
233  rdlstlen = sizeof( readahead_list );
235  continue;
236  }
237 
239  continue;
240  }
241 
242  //------------------------------------------------------------------
243  // We've had an error and we are in the discarding mode
244  //------------------------------------------------------------------
245  case ReadDiscard:
246  {
247  uint32_t btsleft = dlen - totalbtsrd;
248  // allocate the discard buffer if necessary
249  if( discardbuff.size() < btsleft )
250  discardbuff.resize( btsleft );
251 
252  //----------------------------------------------------------------
253  // We need to readout the data from the socket in order to keep
254  // the stream sane.
255  //----------------------------------------------------------------
256  uint32_t btsrd = 0;
257  Status st = ReadBytesAsync( socket, discardbuff.data(), btsleft, btsrd );
258  totalbtsrd += btsrd;
259  btsret += btsrd;
260 
261  log->Warning( XRootDMsg, "[%s] ReadRawReadV: Discarded %d bytes",
262  url.GetHostId().c_str(), btsrd );
263 
264  if( !st.IsOK() || st.code == suRetry )
265  return st;
266 
268  continue;
269  }
270 
271  //------------------------------------------------------------------
272  // Finalize the read
273  //------------------------------------------------------------------
274  case ReadDone:
275  {
276  chidx = 0;
277  choff = 0;
278  chlen = 0;
279  rdlstoff = 0;
280  rdlstlen = 0;
281  break;
282  }
283  }
284 
285  // just in case
286  break;
287  }
288  //----------------------------------------------------------------------
289  // We are done
290  //----------------------------------------------------------------------
291  return XRootDStatus();
292  }
293 
295  {
296  //--------------------------------------------------------------------------
297  // See if all the chunks are OK and put them in the response
298  //--------------------------------------------------------------------------
299  std::unique_ptr<VectorReadInfo> ptr( new VectorReadInfo() );
300  uint32_t size = 0;
301  for( uint32_t i = 0; i < chunks->size(); ++i )
302  {
303  if( !chstatus[i].done )
304  return Status( stFatal, errInvalidResponse );
305  ptr->GetChunks().emplace_back( ( *chunks )[i].offset,
306  ( *chunks )[i].length, ( *chunks )[i].buffer );
307  size += ( *chunks )[i].length;
308  }
309  ptr->SetSize( size );
310  info = ptr.release();
311  return Status();
312  }
313 
314  private:
315 
316  //--------------------------------------------------------------------------
317  // Read a buffer asynchronously - depends on pAsyncBuffer, pAsyncSize
318  // and pAsyncOffset
319  //--------------------------------------------------------------------------
320  Status ReadBytesAsync( Socket &socket, char *buffer, uint32_t toBeRead, uint32_t &bytesRead )
321  {
322  size_t shift = 0;
323  while( toBeRead > 0 )
324  {
325  int btsRead = 0;
326  Status status = socket.Read( buffer + shift, toBeRead, btsRead );
327 
328  if( !status.IsOK() || status.code == suRetry )
329  return status;
330 
331  bytesRead += btsRead;
332  toBeRead -= btsRead;
333  shift += btsRead;
334  }
335  return Status( stOK, suDone );
336  }
337 
338  //------------------------------------------------------------------------
339  // Helper struct for async reading of chunks
340  //------------------------------------------------------------------------
341  struct ChunkStatus
342  {
343  ChunkStatus(): sizeerr( false ), done( false ) {}
344  bool sizeerr;
345  bool done;
346  };
347 
348  //------------------------------------------------------------------------
349  // internal buffer type
350  //------------------------------------------------------------------------
351  using buffer_t = std::vector<char>;
352 
353  //------------------------------------------------------------------------
355  //------------------------------------------------------------------------
356  enum Stage
357  {
358  ReadStart, //< the next step is to initialize the read
359  ReadRdLst, //< the next step is to read the read_list
360  ReadChunk, //< the next step is to read the raw data
361  ReadDiscard, //< there was an error, we are in discard mode
362  ReadDone //< the next step is to finalize the read
363  };
364 
365  //------------------------------------------------------------------------
366  // Current read stage
367  //------------------------------------------------------------------------
369 
370  //------------------------------------------------------------------------
371  // The context of the read operation
372  //------------------------------------------------------------------------
373  const URL &url; //< for logging purposes
374 
375  ChunkList *chunks; //< list of data chunks to be filled with user data
376  std::vector<ChunkStatus> chstatus; //< status per chunk
377  uint32_t dlen; //< size of the data in the message
378  uint32_t totalbtsrd; //< total number of bytes read out from the socket
379 
380  size_t chidx; //< index of the current data buffer
381  size_t choff; //< offset within the current buffer
382  size_t chlen; //< bytes left to be readout into the current chunk
383 
384  size_t rdlstoff; //< offset within the current read_list
385  readahead_list rdlst; //< the readahead list for the current chunk
386  size_t rdlstlen; //< bytes left to be readout into read list
387 
388  buffer_t discardbuff; //< buffer for discarding data in case of an error
389  };
390 
391 } /* namespace XrdCl */
392 
393 #endif /* SRC_XRDCL_XRDCLASYNCVECTORREADER_HH_ */
const uint16_t errInvalidResponse
Definition: XrdClStatus.hh:99
kXR_int64 offset
Definition: XProtocol.hh:656
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
std::vector< ChunkInfo > ChunkList
List of chunks.
Definition: XrdClXRootDResponses.hh:1046
Definition: XProtocol.hh:653
readahead_list rdlst
Definition: XrdClAsyncVectorReader.hh:385
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Status GetVectorReadInfo(VectorReadInfo *&info)
Definition: XrdClAsyncVectorReader.hh:294
const uint16_t suDone
Definition: XrdClStatus.hh:38
kXR_int32 rlen
Definition: XProtocol.hh:655
size_t chlen
Definition: XrdClAsyncVectorReader.hh:382
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:146
Definition: XrdClAsyncVectorReader.hh:341
bool sizeerr
Definition: XrdClAsyncVectorReader.hh:344
XRootDStatus Read(Socket &socket, uint32_t &btsret)
Definition: XrdClAsyncVectorReader.hh:88
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Procedure execution status.
Definition: XrdClStatus.hh:113
const uint64_t XRootDMsg
Definition: XrdClConstants.hh:39
Definition: XrdClAsyncVectorReader.hh:360
static Log * GetLog()
Get default log.
buffer_t discardbuff
Definition: XrdClAsyncVectorReader.hh:388
uint32_t totalbtsrd
Definition: XrdClAsyncVectorReader.hh:378
Object for reading out data from the VectorRead response.
Definition: XrdClAsyncVectorReader.hh:32
const uint16_t stFatal
Fatal error, it&#39;s still an error.
Definition: XrdClStatus.hh:33
Stage
Stages of reading out a response from the socket.
Definition: XrdClAsyncVectorReader.hh:356
size_t rdlstlen
Definition: XrdClAsyncVectorReader.hh:386
Request status.
Definition: XrdClXRootDResponses.hh:218
Stage readstage
Definition: XrdClAsyncVectorReader.hh:368
bool done
Definition: XrdClAsyncVectorReader.hh:345
virtual ~AsyncVectorReader()
Destructor.
Definition: XrdClAsyncVectorReader.hh:58
Status ReadBytesAsync(Socket &socket, char *buffer, uint32_t toBeRead, uint32_t &bytesRead)
Definition: XrdClAsyncVectorReader.hh:320
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition: XrdClURL.hh:94
AsyncVectorReader(const URL &url)
Definition: XrdClAsyncVectorReader.hh:40
size_t choff
Definition: XrdClAsyncVectorReader.hh:381
ChunkList * chunks
Definition: XrdClAsyncVectorReader.hh:375
Definition: XrdClAsyncVectorReader.hh:362
size_t rdlstoff
Definition: XrdClAsyncVectorReader.hh:384
std::vector< char > buffer_t
Definition: XrdClAsyncVectorReader.hh:351
ChunkStatus()
Definition: XrdClAsyncVectorReader.hh:343
Vector read info.
Definition: XrdClXRootDResponses.hh:1051
URL representation.
Definition: XrdClURL.hh:30
void Error(uint64_t topic, const char *format,...)
Report an error.
uint32_t dlen
Definition: XrdClAsyncVectorReader.hh:377
Definition: XrdClAsyncVectorReader.hh:359
Definition: XrdClAsyncVectorReader.hh:361
size_t chidx
Definition: XrdClAsyncVectorReader.hh:380
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const URL & url
Definition: XrdClAsyncVectorReader.hh:373
XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:123
void SetDataLength(int dlen)
Sets response data length.
Definition: XrdClAsyncVectorReader.hh:65
void SetChunkList(ChunkList *chunks)
Sets the chunk list with user buffers.
Definition: XrdClAsyncVectorReader.hh:75
A network socket.
Definition: XrdClSocket.hh:42
Handle diagnostics.
Definition: XrdClLog.hh:100
std::vector< ChunkStatus > chstatus
Definition: XrdClAsyncVectorReader.hh:376
Definition: XrdClAsyncVectorReader.hh:358