XRootD
XrdHttpTpcStream.hh
Go to the documentation of this file.
1 
11 
12 #include <memory>
13 #include <vector>
14 #include <string>
15 
16 #include <cstring>
17 
18 struct stat;
19 
20 class XrdSysError;
21 
22 namespace TPC {
23 class Stream {
24 public:
25  Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
26  : m_open_for_write(false),
27  m_avail_count(max_blocks),
28  m_fh(std::move(fh)),
29  m_offset(0),
30  m_log(log)
31  {
32  m_buffers.reserve(max_blocks);
33  for (size_t idx=0; idx < max_blocks; idx++) {
34  m_buffers.push_back(new Entry(buffer_size));
35  }
36  m_open_for_write = true;
37  }
38 
39  ~Stream();
40 
41  int Stat(struct stat *);
42 
43  int Read(off_t offset, char *buffer, size_t size);
44 
45  // Writes a buffer of a given size to an offset.
46  // This will often keep the buffer in memory in to present the underlying
47  // filesystem with a single stream of data (required for HDFS); further,
48  // it will also buffer to align the writes on a 1MB boundary (required
49  // for some RADOS configurations). When force is set to true, it will
50  // skip the buffering and always write (this should only be done at the
51  // end of a stream!).
52  //
53  // Returns the number of bytes written; on error, returns -1 and sets
54  // the error code and error message for the stream
55  ssize_t Write(off_t offset, const char *buffer, size_t size, bool force);
56 
57  size_t AvailableBuffers() const {return m_avail_count;}
58 
59  void DumpBuffers() const;
60 
61  // Flush and finalize the stream. If all data has been sent to the underlying
62  // file handle, close() will be invoked on the file handle.
63  //
64  // Further write operations on this stream will result in an error.
65  // If any memory buffers remain, an error occurs.
66  //
67  // Returns true on success; false otherwise.
68  bool Finalize();
69 
70  std::string GetErrorMessage() const {return m_error_buf;}
71 
72 private:
73 
74  class Entry {
75  public:
76  Entry(size_t capacity) :
77  m_offset(-1),
78  m_capacity(capacity),
79  m_size(0)
80  {}
81 
82  bool Available() const {return m_offset == -1;}
83 
84  int Write(Stream &stream, bool force) {
85  if (Available() || !CanWrite(stream)) {return 0;}
86  // Only full buffer writes are accepted unless the stream forces a flush
87  // (i.e., we are at EOF) because the multistream code uses buffer occupancy
88  // to determine how many streams are currently in-flight. If we do an early
89  // write, then the buffer will be empty and the multistream code may decide
90  // to start another request (which we don't have the capacity to serve!).
91  if (!force && (m_size != m_capacity)) {
92  return 0;
93  }
94  ssize_t retval = stream.WriteImpl(m_offset, &m_buffer[0], m_size);
95  // Currently the only valid negative value is SFS_ERROR (-1); checking for
96  // all negative values to future-proof the code.
97  if ((retval < 0) || (static_cast<size_t>(retval) != m_size)) {
98  return -1;
99  }
100  m_offset = -1;
101  m_size = 0;
102  m_buffer.clear();
103  return retval;
104  }
105 
106  size_t Accept(off_t offset, const char *buf, size_t size) {
107  // Validate acceptance criteria.
108  if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
109  return 0;
110  }
111  size_t to_accept = m_capacity - m_size;
112  if (to_accept == 0) {return 0;}
113  if (size > to_accept) {
114  size = to_accept;
115  }
116 
117  // Inflate the underlying buffer if needed.
118  ssize_t new_bytes_needed = (m_size + size) - m_buffer.size();
119  if (new_bytes_needed > 0) {
120  m_buffer.resize(m_capacity);
121  }
122 
123  // Finally, do the copy.
124  memcpy(&m_buffer[0] + m_size, buf, size);
125  m_size += size;
126  if (m_offset == -1) {
127  m_offset = offset;
128  }
129  return size;
130  }
131 
132  void ShrinkIfUnused() {
133  if (!Available()) {return;}
134 #if __cplusplus > 199711L
135  m_buffer.shrink_to_fit();
136 #endif
137  }
138 
139  void Move(Entry &other) {
140  m_buffer.swap(other.m_buffer);
141  m_offset = other.m_offset;
142  m_size = other.m_size;
143  }
144 
145  off_t GetOffset() const {return m_offset;}
146  size_t GetCapacity() const {return m_capacity;}
147  size_t GetSize() const {return m_size;}
148 
149  private:
150 
151  Entry(const Entry&) = delete;
152 
153  bool CanWrite(Stream &stream) const {
154  return (m_size > 0) && (m_offset == stream.m_offset);
155  }
156 
157  off_t m_offset; // Offset within file that m_buffer[0] represents.
158  size_t m_capacity;
159  size_t m_size; // Number of bytes held in buffer.
160  std::vector<char> m_buffer;
161  };
162 
163  ssize_t WriteImpl(off_t offset, const char *buffer, size_t size);
164 
165  bool m_open_for_write;
166  size_t m_avail_count;
167  std::unique_ptr<XrdSfsFile> m_fh;
168  off_t m_offset;
169  std::vector<Entry*> m_buffers;
170  XrdSysError &m_log;
171  std::string m_error_buf;
172 };
173 }
int stat(const char *path, struct stat *buf)
int Read(off_t offset, char *buffer, size_t size)
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
Stream(std::unique_ptr< XrdSfsFile > fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
void DumpBuffers() const
std::string GetErrorMessage() const
size_t AvailableBuffers() const
int Stat(struct stat *)
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)