XRootD
XrdClS3DownloadHandler.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3 /* */
4 /* This file is part of the XrdClS3 client plugin for XRootD. */
5 /* */
6 /* XRootD is free software: you can redistribute it and/or modify it under */
7 /* the terms of the GNU Lesser General Public License as published by the */
8 /* Free Software Foundation, either version 3 of the License, or (at your */
9 /* option) any later version. */
10 /* */
11 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14 /* License for more details. */
15 /* */
16 /* The copyright holder's institutional names and contributor's names may not */
17 /* be used to endorse or promote products derived from this software without */
18 /* specific prior written permission of the institution or contributor. */
19 /******************************************************************************/
20 
22 #include "XrdClS3Filesystem.hh"
23 
24 #include <XrdCl/XrdClConstants.hh>
25 #include <XrdCl/XrdClDefaultEnv.hh>
26 #include <XrdCl/XrdClFile.hh>
27 
28 #include <charconv>
29 
30 using namespace XrdClS3;
31 
32 namespace {
33 
34 class S3DownloadHandler : public XrdCl::ResponseHandler {
35 public:
36  S3DownloadHandler(std::unique_ptr<XrdCl::File> file, XrdCl::ResponseHandler *handler, time_t timeout)
37  : m_expiry(time(NULL) + timeout), m_file(std::move(file)), m_handler(handler), m_buffer(new XrdCl::Buffer(kReadSize))
38  {
39  if (timeout == 0) {
41  XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
42  m_expiry += val;
43  }
44  }
45 
46  virtual ~S3DownloadHandler() noexcept = default;
47 
48  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override;
49 
50 private:
51  time_t m_expiry; // Expiration time for the download operation
52  std::unique_ptr<XrdCl::File> m_file; // File we are reading from
53  XrdCl::ResponseHandler *m_handler; // Handler to call with the final result buffer (or failure).
54  std::unique_ptr<XrdCl::Buffer> m_buffer; // Buffer to hold the data read from the file
55  static constexpr size_t kReadSize = 32 * 1024; // Size of each read operation (32 KB)
56 
57  std::pair<time_t, bool> GetTimeout() const {
58  // Calculate the timeout based on the current time and the expiry time
59  time_t now = time(NULL);
60  if (now >= m_expiry) {
61  return {0, false}; // No time left, return 0 timeout
62  }
63  return {m_expiry - now, true};
64  }
65 
66  class ReadHandler : public XrdCl::ResponseHandler {
67  public:
68  ReadHandler(std::unique_ptr<S3DownloadHandler> parent) : m_parent(std::move(parent)) {}
69  virtual ~ReadHandler() noexcept = default;
70 
71  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override;
72  private:
73  std::unique_ptr<S3DownloadHandler> m_parent; // Pointer to the parent handler to access its members
74  };
75 
76  class CloseHandler : public XrdCl::ResponseHandler {
77  public:
78  CloseHandler(std::unique_ptr<S3DownloadHandler> parent, std::unique_ptr<XrdCl::XRootDStatus> status) : m_parent(std::move(parent)), m_read_status(std::move(status)) {}
79  virtual ~CloseHandler() noexcept = default;
80 
81  virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override;
82 
83  private:
84  std::unique_ptr<S3DownloadHandler> m_parent; // Pointer to the parent handler to access its members
85  std::unique_ptr<XrdCl::XRootDStatus> m_read_status; // Status from the read operation; if nullptr, the read was successful
86  };
87 };
88 
89 void
90 S3DownloadHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw)
91 {
92  std::unique_ptr<S3DownloadHandler> self(this);
93  std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
94  std::unique_ptr<XrdCl::AnyObject> response(response_raw);
95 
96  // If the open failed, we pass the status up the chain.
97  if (!status || !status->IsOK()) {
98  if (m_handler) m_handler->HandleResponse(status.release(), response.release());
99  return;
100  }
101  auto [timeout, ok] = GetTimeout();
102  if (!ok) {
103  // If we have no time left, we cannot proceed with the read.
104  if (m_handler) {
105  m_handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOperationExpired, 0, "Download operation timed out"), nullptr);
106  }
107  return;
108  }
109 
110  // Open succeeded, so we can now read the file.
111  auto st = m_file->Read(0, S3DownloadHandler::kReadSize, m_buffer->GetBufferAtCursor(), new ReadHandler(std::move(self)), timeout);
112  if (!st.IsOK()) {
113  // If the read request failed, we close the file and return the error.
114  std::unique_ptr<CloseHandler> closeHandler(new CloseHandler(std::move(self), std::unique_ptr<XrdCl::XRootDStatus>(new XrdCl::XRootDStatus(st))));
115  auto close_st = m_file->Close(closeHandler.get(), timeout);
116  if (close_st.IsOK()) {
117  closeHandler.release(); // The close handler now owns itself
118  } else {
119  if (m_handler) {
120  m_handler->HandleResponse(new XrdCl::XRootDStatus(close_st), nullptr);
121  }
122  }
123  return;
124  }
125 }
126 
127 void
128 S3DownloadHandler::ReadHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) {
129  std::unique_ptr<ReadHandler> self(this);
130  std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
131  std::unique_ptr<XrdCl::AnyObject> response(response_raw);
132 
133  auto [timeout, ok] = m_parent->GetTimeout();
134  if (!ok) {
135  // If we have no time left, we cannot proceed with the read.
136  if (m_parent->m_handler) {
137  m_parent->m_handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOperationExpired, 0, "Download operation timed out"), nullptr);
138  }
139  return;
140  }
141 
142  if (!status || !status->IsOK()) {
143  auto parent = m_parent.get();
144  std::unique_ptr<CloseHandler> closeHandler(new CloseHandler(std::move(m_parent), std::move(status)));
145  auto st = parent->m_file->Close(closeHandler.get(), timeout);
146  if (st.IsOK()) {
147  closeHandler.release();
148  } else if (parent->m_handler) {
149  parent->m_handler->HandleResponse(new XrdCl::XRootDStatus(st), nullptr);
150  }
151  return;
152  }
153 
154  XrdCl::ChunkInfo *chunkInfo = nullptr;
155  response->Get(chunkInfo);
156  if (!chunkInfo) {
157  // If we didn't get a chunk, we can close the file and return.
158  auto parent = m_parent.get();
159  std::unique_ptr<CloseHandler> closeHandler(new CloseHandler(std::move(m_parent),
160  std::unique_ptr<XrdCl::XRootDStatus>(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInternal, 0, "No chunk info received"))));
161  auto st = parent->m_file->Close(closeHandler.get(), timeout);
162  if (st.IsOK()) {
163  closeHandler.release();
164  } else if (parent->m_handler) {
165  parent->m_handler->HandleResponse(new XrdCl::XRootDStatus(st), nullptr);
166  }
167  return;
168  }
169 
170  // If we got a chunk but the length is zero, that is the end of the file;
171  // we can close the file and return.
172  if (chunkInfo->GetLength() == 0) {
173  m_parent->m_buffer->ReAllocate(m_parent->m_buffer->GetCursor());
174  auto parent = m_parent.get();
175  std::unique_ptr<CloseHandler> closeHandler(new CloseHandler(std::move(m_parent), nullptr));
176  auto st = parent->m_file->Close(closeHandler.get(), timeout);
177  if (st.IsOK()) {
178  closeHandler.release();
179  } else if (parent->m_handler) {
180  parent->m_handler->HandleResponse(new XrdCl::XRootDStatus(st), nullptr);
181  }
182  return;
183  }
184 
185  // Read was successful; read additional data if available.
186  m_parent->m_buffer->AdvanceCursor(chunkInfo->GetLength());
187  m_parent->m_buffer->ReAllocate(m_parent->m_buffer->GetCursor() + S3DownloadHandler::kReadSize);
188  auto st = m_parent->m_file->Read(m_parent->m_buffer->GetCursor(), kReadSize, m_parent->m_buffer->GetBufferAtCursor(), self.release(), timeout);
189  if (!st.IsOK()) {
190  // If the read request failed, close or delete the parent handler.
191  auto parent = m_parent.get();
192  std::unique_ptr<CloseHandler> closeHandler(new CloseHandler(std::move(m_parent), nullptr));
193  auto close_st = parent->m_file->Close(closeHandler.get(), timeout);
194  if (close_st.IsOK()) {
195  closeHandler.release();
196  } else if (parent->m_handler) {
197  parent->m_handler->HandleResponse(new XrdCl::XRootDStatus(close_st), nullptr);
198  }
199  }
200 }
201 
202 void
203 S3DownloadHandler::CloseHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) {
204  std::unique_ptr<CloseHandler> self(this);
205  std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
206  std::unique_ptr<XrdCl::AnyObject> response(response_raw);
207 
208  // If there was a read error, then we report that to the handler and ignore the close status.
209  if (m_read_status) {
210  // If we had a read status, we pass it up the chain.
211  if (m_parent->m_handler) {
212  m_parent->m_handler->HandleResponse(m_read_status.release(), nullptr);
213  }
214  return;
215  }
216 
217  if (!status || !status->IsOK()) {
218  if (m_parent->m_handler) {
219  m_parent->m_handler->HandleResponse(status.release(), nullptr);
220  }
221  return;
222  }
223 
224  // If the close was successful, we can pass the buffer to the handler.
225  response.reset(new XrdCl::AnyObject());
226  response->Set(m_parent->m_buffer.release(), true); // Take ownership of the buffer
227  if (m_parent->m_handler) {
228  m_parent->m_handler->HandleResponse(status.release(), response.release());
229  }
230 }
231 
232 } // namespace
233 
235 XrdClS3::DownloadUrl(const std::string &url, XrdClHttp::HeaderCallout *header_callout, XrdCl::ResponseHandler *handler, time_t timeout)
236 {
237  std::unique_ptr<XrdCl::File> http_file(new XrdCl::File());
238  // Hack - we need to set a few properties on the file object before the open occurs.
239  // However, the "real" (plugin) file object is not created until the open call.
240  // This forces the plugin object to be created, so we can set the properties and Open later.
241  auto status = http_file->Open(url, XrdCl::OpenFlags::Compress, XrdCl::Access::None, nullptr, time_t(0));
242  if (!status.IsOK()) {
243  return status;
244  }
245 
246 
247  if (header_callout) {
248  auto callout_loc = reinterpret_cast<long long>(header_callout);
249  size_t buf_size = 16;
250  char callout_buf[buf_size];
251  std::to_chars_result result = std::to_chars(callout_buf, callout_buf + buf_size - 1, callout_loc, 16);
252  if (result.ec == std::errc{}) {
253  std::string callout_str(callout_buf, result.ptr - callout_buf);
254  http_file->SetProperty("XrdClHttpHeaderCallout", callout_str);
255  }
256  }
257  http_file->SetProperty("XrdClHttpFullDownload", "true");
258 
259  auto http_file_raw = http_file.get();
260  S3DownloadHandler *downloadHandler = new S3DownloadHandler(std::move(http_file), handler, timeout);
261 
262  return http_file_raw->Open(url, XrdCl::OpenFlags::Read, XrdCl::Access::None, downloadHandler, timeout);
263 }
static void parent()
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
A file.
Definition: XrdClFile.hh:52
Handle an async response.
XrdCl::XRootDStatus DownloadUrl(const std::string &url, XrdClHttp::HeaderCallout *header_callout, XrdCl::ResponseHandler *handler, time_t timeout)
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
const int DefaultRequestTimeout
Describe a data chunk for vector read.
uint32_t GetLength() const
Get the data length.
@ Read
Open only for reading.