xrootd
XrdClAsyncWriter.hh
Go to the documentation of this file.
1 /*
2  * XrdClSocketWriter.h
3  *
4  * Created on: 3 May 2021
5  * Author: simonm
6  */
7 
8 #ifndef SRC_XRDCL_XRDCLASYNCWRITER_HH_
9 #define SRC_XRDCL_XRDCLASYNCWRITER_HH_
10 
11 #include "XrdCl/XrdClStatus.hh"
12 #include "XrdCl/XrdClSocket.hh"
13 #include "XrdCl/XrdClMessage.hh"
14 #include "XrdCl/XrdClLog.hh"
15 #include "XrdCl/XrdClDefaultEnv.hh"
16 #include "XrdCl/XrdClConstants.hh"
17 
18 namespace XrdCl
19 {
20 
22  {
23  public:
24 
26  const std::string &streamName ) : socket( socket ),
29  {
30  }
31 
32  virtual ~AsyncWriter()
33  {
34  }
35 
37  {
38  if( !status.IsOK() ) return status;
39  switch( status.code )
40  {
41  case suDone : return status;
42  case suPartial : return status;
43  case suAlreadyDone : return status;
44  case suNotStarted : return ( status = WriteImpl() );
45  case suRetry : return ( status = WriteImpl() );
46  case suContinue : return ( status = WriteImpl() );
47  default: return status;
48  }
49  }
50 
51  protected:
52 
53  virtual Status WriteImpl() = 0;
54 
56  std::string streamName;
58  };
59 
60  class MsgWriter : public AsyncWriter
61  {
62  public:
64  const std::string &streamName ) : AsyncWriter( socket, streamName ),
65  msg( nullptr )
66  {
67  }
68 
69  inline bool HasMsg()
70  {
71  return bool( msg );
72  }
73 
74  void Reset( Message *msg = nullptr )
75  {
76  this->msg.reset( msg );
78  }
79 
80  void Replay()
81  {
82  if( !msg ) return;
83  msg->SetCursor( 0 );
85  }
86 
88  {
89  if( !msg ) return status;
90  Log *log = DefaultEnv::GetLog();
91 
92  //--------------------------------------------------------------------------
93  // Try to write down the current message
94  //--------------------------------------------------------------------------
95  size_t leftToBeWritten = msg->GetSize()-msg->GetCursor();
96  while( leftToBeWritten )
97  {
98  int bytesWritten = 0;
99  status = socket.Send( msg->GetBufferAtCursor(), leftToBeWritten, bytesWritten );
100  if( !status.IsOK() )
101  {
102  msg->SetCursor( 0 );
103  return status;
104  }
105  if( status.code == suRetry ) return status;
106  msg->AdvanceCursor( bytesWritten );
107  leftToBeWritten -= bytesWritten;
108  }
109 
110  //--------------------------------------------------------------------------
111  // We have written the message successfully
112  //--------------------------------------------------------------------------
113  log->Dump( AsyncSockMsg, "[%s] Wrote a message: %s (0x%x), %d bytes",
114  streamName.c_str(), msg->GetDescription().c_str(),
115  msg.get(), msg->GetSize() );
116  return XRootDStatus();
117  }
118 
119  private:
120  std::unique_ptr<Message> msg;
121  };
122 
123  class ChunkWriter : public AsyncWriter // TODO
124  {
125 
126  };
127 
128  class KBuffWriter : public AsyncWriter // TODO
129  {
130 
131  };
132 
133 } /* namespace XrdCl */
134 
135 #endif /* SRC_XRDCL_XRDCLASYNCWRITER_HH_ */
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
Definition: XrdClAsyncWriter.hh:60
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
virtual Status WriteImpl()=0
Status WriteImpl()
Definition: XrdClAsyncWriter.hh:87
Definition: XrdClAsyncWriter.hh:123
const uint64_t AsyncSockMsg
Definition: XrdClConstants.hh:41
const uint16_t suDone
Definition: XrdClStatus.hh:38
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:145
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:122
const uint16_t suPartial
Definition: XrdClStatus.hh:41
Socket & socket
Definition: XrdClAsyncWriter.hh:55
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Procedure execution status.
Definition: XrdClStatus.hh:112
Definition: XrdClAsyncWriter.hh:21
static Log * GetLog()
Get default log.
Status Write()
Definition: XrdClAsyncWriter.hh:36
Definition: XrdClAsyncWriter.hh:128
AsyncWriter(Socket &socket, const std::string &streamName)
Definition: XrdClAsyncWriter.hh:25
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
const uint16_t suNotStarted
Definition: XrdClStatus.hh:43
Request status.
Definition: XrdClXRootDResponses.hh:218
Definition: XrdClAnyObject.hh:25
void Replay()
Definition: XrdClAsyncWriter.hh:80
bool HasMsg()
Definition: XrdClAsyncWriter.hh:69
std::unique_ptr< Message > msg
Definition: XrdClAsyncWriter.hh:120
std::string streamName
Definition: XrdClAsyncWriter.hh:56
const uint16_t suContinue
Definition: XrdClStatus.hh:39
Status status
Definition: XrdClAsyncWriter.hh:57
virtual ~AsyncWriter()
Definition: XrdClAsyncWriter.hh:32
MsgWriter(Socket &socket, const std::string &streamName)
Definition: XrdClAsyncWriter.hh:63
void Reset(Message *msg=nullptr)
Definition: XrdClAsyncWriter.hh:74
const uint16_t suRetry
Definition: XrdClStatus.hh:40
A network socket.
Definition: XrdClSocket.hh:41
Handle diagnostics.
Definition: XrdClLog.hh:100
XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)