xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
XrdClAsyncMsgReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
20 #define SRC_XRDCL_XRDCLASYNCMSGREADER_HH_
21 
22 #include "XrdCl/XrdClMessage.hh"
25 #include "XrdCl/XrdClSocket.hh"
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClStream.hh"
28 
29 #include <memory>
30 
31 namespace XrdCl
32 {
33  //----------------------------------------------------------------------------
35  //----------------------------------------------------------------------------
37  {
38  public:
39  //------------------------------------------------------------------------
47  //------------------------------------------------------------------------
49  Socket &socket,
50  const std::string &strmname,
51  Stream &strm,
52  uint16_t substrmnb) : readstage( ReadStart ),
53  xrdTransport( xrdTransport ),
54  socket( socket ),
55  strmname( strmname ),
56  strm( strm ),
57  substrmnb( substrmnb ),
58  inmsgsize( 0 ),
59  inhandler( nullptr )
60  {
61  }
62 
63  //------------------------------------------------------------------------
65  //------------------------------------------------------------------------
66  virtual ~AsyncMsgReader(){ }
67 
68  //------------------------------------------------------------------------
70  //------------------------------------------------------------------------
71  inline void Reset()
72  {
74  inmsg.reset();
75  inmsgsize = 0;
76  inhandler = nullptr;
77  }
78 
79  //------------------------------------------------------------------------
81  //------------------------------------------------------------------------
83  {
84  Log *log = DefaultEnv::GetLog();
85 
86  while( true )
87  {
88  switch( readstage )
89  {
90  //------------------------------------------------------------------
91  // There is no incoming message currently being processed so we
92  // create a new one
93  //------------------------------------------------------------------
94  case ReadStart:
95  {
96  inmsg = std::make_shared<Message>();
97  //----------------------------------------------------------------
98  // The next step is to read the header
99  //----------------------------------------------------------------
101  continue;
102  }
103  //------------------------------------------------------------------
104  // We need to read the header
105  //------------------------------------------------------------------
106  case ReadHeader:
107  {
109  if( !st.IsOK() || st.code == suRetry )
110  return st;
111 
112  log->Dump( AsyncSockMsg, "[%s] Received message header for 0x%x size: %d",
113  strmname.c_str(), inmsg.get(), inmsg->GetCursor() );
114 
115  ServerResponse *rsp = (ServerResponse*)inmsg->GetBuffer();
116  if( rsp->hdr.status == kXR_attn )
117  {
118  log->Dump( AsyncSockMsg, "[%s] Will readout the attn action code "
119  "of message 0x%x", strmname.c_str(), inmsg.get() );
120  inmsg->ReAllocate( 16 ); // header (bytes 8) + action code (8 bytes)
122  continue;
123  }
124 
125  inmsgsize = inmsg->GetCursor();
127 
128  if( inhandler )
129  {
130  log->Dump( AsyncSockMsg, "[%s] Will use the raw handler to read body "
131  "of message 0x%x", strmname.c_str(), inmsg.get() );
132  //--------------------------------------------------------------
133  // The next step is to read raw data
134  //--------------------------------------------------------------
136  continue;
137  }
138  //----------------------------------------------------------------
139  // The next step is to read the message body
140  //----------------------------------------------------------------
142  continue;
143  }
144  //------------------------------------------------------------------
145  // Before proceeding we need to figure out the attn action code
146  //------------------------------------------------------------------
147  case ReadAttn:
148  {
150  if( !st.IsOK() || st.code == suRetry )
151  return st;
152 
153  //----------------------------------------------------------------
154  // There is an embedded response, overwrite the message with that
155  //----------------------------------------------------------------
156  if( HasEmbeddedRsp() )
157  {
158  inmsg->Free();
160  continue;
161  }
162 
163  //----------------------------------------------------------------
164  // Readout the rest of the body
165  //----------------------------------------------------------------
166  inmsgsize = inmsg->GetCursor();
168  continue;
169  }
170  //------------------------------------------------------------------
171  // We need to call a raw message handler to get the data from the
172  // socket
173  //------------------------------------------------------------------
174  case ReadRawData:
175  {
176  uint32_t bytesRead = 0;
177  XRootDStatus st = inhandler->ReadMessageBody( inmsg.get(), &socket, bytesRead );
178  if( !st.IsOK() )
179  return st;
180  inmsgsize += bytesRead;
181  if( st.code == suRetry )
182  return st;
183  //----------------------------------------------------------------
184  // The next step is to finalize the read
185  //----------------------------------------------------------------
187  continue;
188  }
189  //------------------------------------------------------------------
190  // No raw handler, so we read the message to the buffer
191  //------------------------------------------------------------------
192  case ReadMsgBody:
193  {
195  if( !st.IsOK() || st.code == suRetry )
196  return st;
197  inmsgsize = inmsg->GetCursor();
198 
199  //----------------------------------------------------------------
200  // Now check if there are some additional raw data to be read
201  //----------------------------------------------------------------
202  if( inhandler )
203  {
204  //--------------------------------------------------------------
205  // The next step is to finalize the read
206  //--------------------------------------------------------------
208  continue;
209  }
210 
211  uint16_t action = strm.InspectStatusRsp( substrmnb,
212  inhandler );
213 
214  if( action & MsgHandler::Corrupted )
216 
217  if( action & MsgHandler::Raw )
218  {
219  //--------------------------------------------------------------
220  // The next step is to read the raw data
221  //--------------------------------------------------------------
223  continue;
224  }
225 
226  if( action & MsgHandler::More )
227  {
228  //--------------------------------------------------------------
229  // The next step is to read the additional data in the message
230  // body
231  //--------------------------------------------------------------
233  continue;
234  }
235 
236  //----------------------------------------------------------------
237  // Unless we've got a kXR_status message and no handler the
238  // read is done
239  //----------------------------------------------------------------
240  ServerResponse *rsphdr = (ServerResponse *)inmsg->GetBuffer();
241  if( !( action & MsgHandler::RemoveHandler ) ||
242  rsphdr->hdr.status != kXR_status ||
243  inmsg->GetSize() < sizeof( ServerResponseStatus ) )
244  {
246  continue;
247  }
248 
249  //----------------------------------------------------------------
250  // There is no handler and we have a kXR_status message. If we
251  // have already read all the message then we're done.
252  //----------------------------------------------------------------
253  ServerResponseStatus *rspst = (ServerResponseStatus*)inmsg->GetBuffer();
254  const uint32_t hdrSize = rspst->hdr.dlen;
255  if( inmsg->GetSize() != hdrSize + 8 )
256  {
258  continue;
259  }
260 
261  //----------------------------------------------------------------
262  // Only the header of kXR_status has been read. Unmarshall the
263  // header and if if there is more body data call GetBody() again.
264  //----------------------------------------------------------------
265  const uint16_t reqType = rspst->bdy.requestid + kXR_1stRequest;
267 
268  if( !st.IsOK() && st.code == errDataError )
269  {
270  log->Error( AsyncSockMsg, "[%s] Failed to unmarshall "
271  "corrupted status body in message 0x%x.",
272  strmname.c_str(), inmsg.get() );
274  }
275  if( !st.IsOK() )
276  {
277  log->Error( AsyncSockMsg, "[%s] Failed to unmarshall "
278  "status body of message 0x%x.",
279  strmname.c_str(), inmsg.get() );
281  continue;
282  }
283  if ( rspst->bdy.dlen != 0 )
284  {
286  continue;
287  }
288 
289  //----------------------------------------------------------------
290  // The next step is to finalize the read
291  //----------------------------------------------------------------
293  continue;
294  }
295 
296  case ReadDone:
297  {
298  //----------------------------------------------------------------
299  // Report the incoming message
300  //----------------------------------------------------------------
301  log->Dump( AsyncSockMsg, "[%s] Received message 0x%x of %d bytes",
302  strmname.c_str(), inmsg.get(), inmsgsize );
303 
304  strm.OnIncoming( substrmnb, std::move( inmsg ), inmsgsize );
305  }
306  }
307  // just in case
308  break;
309  }
310 
311  //----------------------------------------------------------------------
312  // We are done
313  //----------------------------------------------------------------------
314  return XRootDStatus();
315  }
316 
317  private:
318 
320  {
321  //----------------------------------------------------------------------
322  // Readout the action code from the socket. We are reading out 8 bytes
323  // into the message, the 8 byte header is already there.
324  //----------------------------------------------------------------------
325  size_t btsleft = 8 - ( inmsg->GetCursor() - 8 );
326  while( btsleft > 0 )
327  {
328  int btsrd = 0;
329  XRootDStatus st = socket.Read( inmsg->GetBufferAtCursor(), btsleft, btsrd );
330  if( !st.IsOK() || st.code == suRetry )
331  return st;
332  btsleft -= btsrd;
333  inmsg->AdvanceCursor( btsrd );
334  }
335 
336  //----------------------------------------------------------------------
337  // Marshal the action code
338  //----------------------------------------------------------------------
339  ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
340  attn->actnum = ntohl( attn->actnum );
341 
342  return XRootDStatus();
343  }
344 
345  inline bool HasEmbeddedRsp()
346  {
347  ServerResponseBody_Attn *attn = (ServerResponseBody_Attn*)inmsg->GetBuffer( 8 );
348  return ( attn->actnum == kXR_asynresp );
349  }
350 
351  //------------------------------------------------------------------------
353  //------------------------------------------------------------------------
354  enum Stage
355  {
356  ReadStart, //< the next step is to initialize the read
357  ReadHeader, //< the next step is to read the header
358  ReadAttn, //< the next step is to read attn action code
359  ReadMsgBody, //< the next step is to read the body
360  ReadRawData, //< the next step is to read the raw data
361  ReadDone //< the next step is to finalize the read
362  };
363 
364  //------------------------------------------------------------------------
365  // Current read stage
366  //------------------------------------------------------------------------
368 
369  //------------------------------------------------------------------------
370  // The context of the read operation
371  //------------------------------------------------------------------------
374  const std::string &strmname;
376  uint16_t substrmnb;
377 
378 
379  //------------------------------------------------------------------------
380  // The internal state of the the reader
381  //------------------------------------------------------------------------
382  std::shared_ptr<Message> inmsg; //< the ownership is shared with MsgHandler
383  uint32_t inmsgsize;
385 
386  };
387 
388 } /* namespace XrdCl */
389 
390 #endif /* SRC_XRDCL_XRDCLASYNCMSGREADER_HH_ */
Definition: XrdClAsyncMsgReader.hh:357
Definition: XrdClAsyncMsgReader.hh:358
Definition: XProtocol.hh:936
std::shared_ptr< Message > inmsg
Definition: XrdClAsyncMsgReader.hh:382
XRootDStatus Read()
Read out the response from the socket.
Definition: XrdClAsyncMsgReader.hh:82
TransportHandler & xrdTransport
Definition: XrdClAsyncMsgReader.hh:372
Socket & socket
Definition: XrdClAsyncMsgReader.hh:373
const uint64_t AsyncSockMsg
Definition: XrdClConstants.hh:41
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
bool HasEmbeddedRsp()
Definition: XrdClAsyncMsgReader.hh:345
Message handler.
Definition: XrdClPostMasterInterfaces.hh:50
virtual ~AsyncMsgReader()
Destructor.
Definition: XrdClAsyncMsgReader.hh:66
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
Definition: XrdClPostMasterInterfaces.hh:138
kXR_unt16 status
Definition: XProtocol.hh:913
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
Stage readstage
Definition: XrdClAsyncMsgReader.hh:367
AsyncMsgReader(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb)
Definition: XrdClAsyncMsgReader.hh:48
Definition: XProtocol.hh:1255
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1257
const std::string & strmname
Definition: XrdClAsyncMsgReader.hh:374
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClPostMasterInterfaces.hh:69
Definition: XrdClAsyncMsgReader.hh:360
XRootDStatus ReadAttnActnum()
Definition: XrdClAsyncMsgReader.hh:319
MsgHandler * inhandler
Definition: XrdClAsyncMsgReader.hh:384
static Log * GetLog()
Get default log.
Definition: XProtocol.hh:899
kXR_int32 dlen
Definition: XProtocol.hh:1236
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:309
Definition: XProtocol.hh:111
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
Request status.
Definition: XrdClXRootDResponses.hh:218
const uint16_t errCorruptedHeader
Definition: XrdClStatus.hh:103
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
kXR_char requestid
Definition: XProtocol.hh:1233
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
Definition: XProtocol.hh:1281
Definition: XrdClAsyncMsgReader.hh:359
uint16_t substrmnb
Definition: XrdClAsyncMsgReader.hh:376
struct ServerResponseHeader hdr
Definition: XProtocol.hh:1256
kXR_int32 dlen
Definition: XProtocol.hh:914
void Error(uint64_t topic, const char *format,...)
Report an error.
Utility class encapsulating reading response message logic.
Definition: XrdClAsyncMsgReader.hh:36
Definition: XrdClAsyncMsgReader.hh:361
void Reset()
Reset the state of the object (makes it ready to read out next msg)
Definition: XrdClAsyncMsgReader.hh:71
ServerResponseHeader hdr
Definition: XProtocol.hh:1283
const uint16_t suRetry
Definition: XrdClStatus.hh:40
Stream.
Definition: XrdClStream.hh:50
Definition: XrdClPostMasterInterfaces.hh:61
Stage
Stages of reading out a response from the socket.
Definition: XrdClAsyncMsgReader.hh:354
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:124
Definition: XrdClAsyncMsgReader.hh:356
Stream & strm
Definition: XrdClAsyncMsgReader.hh:375
virtual XRootDStatus GetBody(Message &message, Socket *socket)=0
virtual XRootDStatus GetHeader(Message &message, Socket *socket)=0
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
there are more (non-raw) data to be read
Definition: XrdClPostMasterInterfaces.hh:72
Definition: XProtocol.hh:905
kXR_int32 actnum
Definition: XProtocol.hh:940
A network socket.
Definition: XrdClSocket.hh:42
uint32_t inmsgsize
Definition: XrdClAsyncMsgReader.hh:383
Handle diagnostics.
Definition: XrdClLog.hh:100
Definition: XProtocol.hh:939
Definition: XrdClPostMasterInterfaces.hh:63
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)