XRootD
XrdClHttpOpPut.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3 /* */
4 /* This file is part of the XrdClHttp client plugin for XRootD. */
5 /* */
6 /* XRootD is free software: you can redistribute it and/or modify it under */
7 /* the terms of the GNU Lesser General Public License as published by the */
8 /* Free Software Foundation, either version 3 of the License, or (at your */
9 /* option) any later version. */
10 /* */
11 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14 /* License for more details. */
15 /* */
16 /* The copyright holder's institutional names and contributor's names may not */
17 /* be used to endorse or promote products derived from this software without */
18 /* specific prior written permission of the institution or contributor. */
19 /******************************************************************************/
20 
21 #include "XrdClHttpOps.hh"
22 
23 #include <XrdCl/XrdClLog.hh>
24 
25 using namespace XrdClHttp;
26 
27 CurlPutOp::CurlPutOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
28  const std::string &url, const char *buffer, size_t buffer_size, struct timespec timeout,
29  XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
30  : CurlOperation(handler, url, timeout, logger, callout, header_callout),
31  m_data(buffer, buffer_size),
32  m_default_handler(default_handler)
33 {
34 }
35 
36 CurlPutOp::CurlPutOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
37  const std::string &url, XrdCl::Buffer &&buffer, struct timespec timeout,
38  XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
39  : CurlOperation(handler, url, timeout, logger, callout, header_callout),
40  m_owned_buffer(std::move(buffer)),
41  m_data(buffer.GetBuffer(), buffer.GetSize()),
42  m_default_handler(default_handler)
43 {
44 
45 }
46 
47 void
48 CurlPutOp::Fail(uint16_t errCode, uint32_t errNum, const std::string &msg)
49 {
50  SetDone(true);
51  if (m_handler == nullptr && m_default_handler == nullptr) {return;}
52  if (!msg.empty()) {
53  m_logger->Debug(kLogXrdClHttp, "PUT operation at offset %llu failed with message: %s", static_cast<long long unsigned>(m_offset), msg.c_str());
54  } else {
55  m_logger->Debug(kLogXrdClHttp, "PUT operation at offset %llu failed with status code %d", static_cast<long long unsigned>(m_offset), errNum);
56  }
57 
58  auto status = new XrdCl::XRootDStatus(XrdCl::stError, errCode, errNum, msg);
59  auto handle = m_handler;
60  m_handler = nullptr;
61  if (handle) handle->HandleResponse(status, nullptr);
62  else m_default_handler->HandleResponse(status, nullptr);
63 }
64 
65 bool
67 {
68  m_curl_handle = curl;
69  if (!CurlOperation::Setup(curl, worker)) return false;
70 
71  curl_easy_setopt(m_curl.get(), CURLOPT_UPLOAD, 1);
72  curl_easy_setopt(m_curl.get(), CURLOPT_READDATA, this);
73  curl_easy_setopt(m_curl.get(), CURLOPT_READFUNCTION, CurlPutOp::ReadCallback);
74  if (m_object_size >= 0) {
75  curl_easy_setopt(m_curl.get(), CURLOPT_INFILESIZE_LARGE, m_object_size);
76  }
77  return true;
78 }
79 
80 void
82 {
83  curl_easy_setopt(m_curl.get(), CURLOPT_READFUNCTION, nullptr);
84  curl_easy_setopt(m_curl.get(), CURLOPT_READDATA, nullptr);
85  curl_easy_setopt(m_curl.get(), CURLOPT_UPLOAD, 0);
86  // If one uses just `-1` here -- instead of casting it to `curl_off_t`, then on Linux
87  // we have observed compilers casting the `-1` to an unsigned, resulting in the file
88  // size being set to 4294967295 instead of "unknown". This causes the second use of the
89  // handle to claim to upload a large file, resulting in the client hanging while waiting
90  // for more input data (which will never come).
91  curl_easy_setopt(m_curl.get(), CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(-1));
93 }
94 
95 void
97 {
98  SetPaused(true);
99  if (m_handler == nullptr && m_default_handler == nullptr) {
100  m_logger->Warning(kLogXrdClHttp, "Put operation paused with no callback handler");
101  return;
102  }
103  auto handle = m_handler;
104  auto status = new XrdCl::XRootDStatus();
105  m_handler = nullptr;
106  m_owned_buffer.Free();
107  // Note: As soon as this is invoked, another thread may continue and start to manipulate
108  // the CurlPutOp object. To avoid race conditions, all reads/writes to member data must
109  // be done *before* the callback is invoked.
110  if (handle) handle->HandleResponse(status, nullptr);
111  else m_default_handler->HandleResponse(status, nullptr);
112 }
113 
114 void
116 {
117  SetDone(false);
118  if (m_handler == nullptr) {
119  m_logger->Warning(kLogXrdClHttp, "Put operation succeeded with no callback handler");
120  return;
121  }
122  auto status = new XrdCl::XRootDStatus();
123  auto handle = m_handler;
124  m_handler = nullptr;
125  handle->HandleResponse(status, nullptr);
126 }
127 
128 bool
130 {
131  if (!m_curl_handle) {
132  return false;
133  }
134 
135  CURLcode rc;
136  if ((rc = curl_easy_pause(m_curl_handle, CURLPAUSE_CONT)) != CURLE_OK) {
137  m_logger->Error(kLogXrdClHttp, "Failed to continue a paused handle: %s", curl_easy_strerror(rc));
138  return false;
139  }
140  SetPaused(false);
141  return m_curl_handle;
142 }
143 
144 bool
145 CurlPutOp::Continue(std::shared_ptr<CurlOperation> op, XrdCl::ResponseHandler *handler, const char *buffer, size_t buffer_size)
146 {
147  if (op.get() != this) {
148  Fail(XrdCl::errInternal, 0, "Interface error: must provide shared pointer to self");
149  return false;
150  }
151  m_handler = handler;
152  m_data = std::string_view(buffer, buffer_size);
153  if (!buffer_size)
154  {
155  m_final = true;
156  }
157 
158  try {
159  m_continue_queue->Produce(op);
160  } catch (...) {
161  Fail(XrdCl::errInternal, ENOMEM, "Failed to continue the curl operation");
162  return false;
163  }
164  return true;
165 }
166 
167 bool
168 CurlPutOp::Continue(std::shared_ptr<CurlOperation> op, XrdCl::ResponseHandler *handler, XrdCl::Buffer &&buffer)
169 {
170  if (op.get() != this) {
171  Fail(XrdCl::errInternal, 0, "Interface error: must provide shared pointer to self");
172  return false;
173  }
174  m_handler = handler;
175  m_data = std::string_view(buffer.GetBuffer(), buffer.GetSize());
176  if (!buffer.GetSize())
177  {
178  m_final = true;
179  }
180 
181  try {
182  m_continue_queue->Produce(op);
183  } catch (...) {
184  Fail(XrdCl::errInternal, ENOMEM, "Failed to continue the curl operation");
185  return false;
186  }
187  return true;
188 }
189 
190 size_t CurlPutOp::ReadCallback(char *buffer, size_t size, size_t n, void *v) {
191  // The callback gets the void pointer that we set with CURLOPT_READDATA. In
192  // this case, it's a pointer to an HTTPRequest::Payload struct that contains
193  // the data to be sent, along with the offset of the data that has already
194  // been sent.
195  auto op = static_cast<CurlPutOp*>(v);
196  //op->m_logger->Debug(kLogXrdClHttp, "Read callback with buffer %ld and avail data %ld", size*n, op->m_data.size());
197 
198  // TODO: Check for timeouts. If there was one, abort the callback function
199  // and cause the curl worker thread to handle it.
200 
201  if (op->m_data.empty()) {
202  if (op->m_final) {
203  return 0;
204  } else {
205  op->Pause();
206  return CURL_READFUNC_PAUSE;
207  }
208  }
209 
210  size_t request = size * n;
211  op->UpdateBytes(request);
212  if (request > op->m_data.size()) {
213  request = op->m_data.size();
214  }
215 
216  memcpy(buffer, op->m_data.data(), request);
217  op->m_data = op->m_data.substr(request);
218 
219  return request;
220 }
void CURL
void SetDone(bool has_failed)
std::unique_ptr< CURL, void(*)(CURL *)> m_curl
virtual void ReleaseHandle()
XrdCl::ResponseHandler * m_handler
void SetPaused(bool paused)
virtual bool Setup(CURL *curl, CurlWorker &)
bool ContinueHandle() override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
bool Setup(CURL *curl, CurlWorker &) override
bool Continue(std::shared_ptr< CurlOperation > op, XrdCl::ResponseHandler *handler, const char *buffer, size_t buffer_size)
CurlPutOp(XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, const char *buffer, size_t buffer_size, struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
void Success() override
void ReleaseHandle() override
Binary blob representation.
Definition: XrdClBuffer.hh:34
void Free()
Free the buffer.
Definition: XrdClBuffer.hh:99
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
const uint64_t kLogXrdClHttp