aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/httpclient.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-08 08:51:54 -0400
committerGitHub <[email protected]>2023-09-08 14:51:54 +0200
commit2f6a49f8d850806da94a92d0bd23e22735bf19a2 (patch)
treeca11d34ade5cdded3d59c37294c1c2a299738800 /src/zenhttp/httpclient.cpp
parent0.2.20 (diff)
downloadzen-2f6a49f8d850806da94a92d0bd23e22735bf19a2.tar.xz
zen-2f6a49f8d850806da94a92d0bd23e22735bf19a2.zip
Extend http client (#387)
* extend http client with configuration, headers, parameters and disk streaming upload/download
Diffstat (limited to 'src/zenhttp/httpclient.cpp')
-rw-r--r--src/zenhttp/httpclient.cpp511
1 files changed, 437 insertions, 74 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 744787201..f3a9ad71b 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -5,6 +5,9 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
+#include <zencore/compositebuffer.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
#include <zencore/session.h>
@@ -12,6 +15,7 @@
#include <zencore/stream.h>
#include <zencore/testing.h>
#include <zencore/trace.h>
+#include <zenhttp/formatters.h>
#include <zenhttp/httpshared.h>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -53,11 +57,11 @@ AsCprBody(const CompositeBuffer& Buffers)
//////////////////////////////////////////////////////////////////////////
HttpClient::Response
-ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode)
+ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload)
{
// This ends up doing a memcpy, would be good to get rid of it by streaming results
// into buffer directly
- IoBuffer ResponseBuffer = IoBuffer(IoBuffer::Clone, HttpResponse.text.data(), HttpResponse.text.size());
+ IoBuffer ResponseBuffer = Payload ? std::move(Payload) : IoBuffer(IoBuffer::Clone, HttpResponse.text.data(), HttpResponse.text.size());
if (auto It = HttpResponse.header.find("Content-Type"); It != HttpResponse.header.end())
{
@@ -66,30 +70,50 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp
ResponseBuffer.SetContentType(ContentType);
}
- return HttpClient::Response{.StatusCode = WorkResponseCode, .ResponsePayload = std::move(ResponseBuffer)};
+ if (!IsHttpSuccessCode(WorkResponseCode) && WorkResponseCode != HttpResponseCode::NotFound)
+ {
+ ZEN_WARN("HttpClient request failed: {}", HttpResponse);
+ }
+
+ return HttpClient::Response{.StatusCode = WorkResponseCode,
+ .ResponsePayload = std::move(ResponseBuffer),
+ .Header = HttpClient::KeyValueMap(HttpResponse.header.begin(), HttpResponse.header.end()),
+ .UploadedBytes = gsl::narrow<int64_t>(HttpResponse.uploaded_bytes),
+ .DownloadedBytes = gsl::narrow<int64_t>(HttpResponse.downloaded_bytes),
+ .ElapsedSeconds = HttpResponse.elapsed};
}
HttpClient::Response
-CommonResponse(cpr::Response&& HttpResponse)
+CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {})
{
const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code);
-
- if (HttpResponse.status_code == 0)
+ if (HttpResponse.error)
{
- // Client side failure code
+ ZEN_WARN("HttpClient client error: {}", HttpResponse);
+ // Client side failure code
return HttpClient::Response{
.StatusCode = WorkResponseCode,
- .ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(HttpResponse.error.message.data(), HttpResponse.error.message.size())};
+ .ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(HttpResponse.text.data(), HttpResponse.text.size()),
+ .Header = HttpClient::KeyValueMap(HttpResponse.header.begin(), HttpResponse.header.end()),
+ .UploadedBytes = gsl::narrow<int64_t>(HttpResponse.uploaded_bytes),
+ .DownloadedBytes = gsl::narrow<int64_t>(HttpResponse.downloaded_bytes),
+ .ElapsedSeconds = HttpResponse.elapsed,
+ .Error = HttpClient::ErrorContext{.ErrorCode = gsl::narrow<int>(HttpResponse.error.code),
+ .ErrorMessage = HttpResponse.error.message}};
}
- if (WorkResponseCode == HttpResponseCode::NoContent || HttpResponse.text.empty())
+ if (WorkResponseCode == HttpResponseCode::NoContent || (HttpResponse.text.empty() && !Payload))
{
- return HttpClient::Response{.StatusCode = WorkResponseCode};
+ return HttpClient::Response{.StatusCode = WorkResponseCode,
+ .Header = HttpClient::KeyValueMap(HttpResponse.header.begin(), HttpResponse.header.end()),
+ .UploadedBytes = gsl::narrow<int64_t>(HttpResponse.uploaded_bytes),
+ .DownloadedBytes = gsl::narrow<int64_t>(HttpResponse.downloaded_bytes),
+ .ElapsedSeconds = HttpResponse.elapsed};
}
else
{
- return ResponseWithPayload(HttpResponse, WorkResponseCode);
+ return ResponseWithPayload(HttpResponse, WorkResponseCode, std::move(Payload));
}
}
@@ -108,6 +132,42 @@ struct HttpClient::Impl : public RefCounted
~Session() { Outer->ReleaseSession(CprSession); }
inline cpr::Session* operator->() const { return CprSession; }
+ inline cpr::Response Get()
+ {
+ cpr::Response Result = CprSession->Get();
+ ZEN_TRACE("GET {}", Result);
+ return Result;
+ }
+ inline cpr::Response Download(cpr::WriteCallback&& write)
+ {
+ cpr::Response Result = CprSession->Download(write);
+ ZEN_TRACE("GET {}", Result);
+ return Result;
+ }
+ inline cpr::Response Head()
+ {
+ cpr::Response Result = CprSession->Head();
+ ZEN_TRACE("HEAD {}", Result);
+ return Result;
+ }
+ inline cpr::Response Put()
+ {
+ cpr::Response Result = CprSession->Put();
+ ZEN_TRACE("PUT {}", Result);
+ return Result;
+ }
+ inline cpr::Response Post()
+ {
+ cpr::Response Result = CprSession->Post();
+ ZEN_TRACE("POST {}", Result);
+ return Result;
+ }
+ inline cpr::Response Delete()
+ {
+ cpr::Response Result = CprSession->Delete();
+ ZEN_TRACE("DELETE {}", Result);
+ return Result;
+ }
private:
Impl* Outer;
@@ -117,7 +177,11 @@ struct HttpClient::Impl : public RefCounted
Session& operator=(Session&&) = delete;
};
- Session AllocSession(const std::string_view BaseUrl, const std::string_view Url);
+ Session AllocSession(const std::string_view BaseUrl,
+ const std::string_view Url,
+ const HttpClientSettings& ConnectionSettings,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters);
private:
RwLock m_SessionLock;
@@ -142,38 +206,223 @@ HttpClient::Impl::~Impl()
}
HttpClient::Impl::Session
-HttpClient::Impl::AllocSession(const std::string_view BaseUrl, const std::string_view ResourcePath)
+HttpClient::Impl::AllocSession(const std::string_view BaseUrl,
+ const std::string_view ResourcePath,
+ const HttpClientSettings& ConnectionSettings,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters)
{
- RwLock::ExclusiveLockScope _(m_SessionLock);
+ bool IsNew = false;
+ cpr::Session* CprSession = nullptr;
+ m_SessionLock.WithExclusiveLock([&] {
+ if (m_Sessions.empty())
+ {
+ CprSession = new cpr::Session();
+ IsNew = true;
+ }
+ else
+ {
+ CprSession = m_Sessions.back();
+ m_Sessions.pop_back();
+ }
+ });
- ExtendableStringBuilder<128> UrlBuffer;
- UrlBuffer << BaseUrl << ResourcePath;
+ if (IsNew)
+ {
+ CprSession->SetConnectTimeout(ConnectionSettings.ConnectTimeout);
+ CprSession->SetTimeout(ConnectionSettings.Timeout);
+ if (ConnectionSettings.AssumeHttp2)
+ {
+ CprSession->SetHttpVersion(cpr::HttpVersion{cpr::HttpVersionCode::VERSION_2_0_PRIOR_KNOWLEDGE});
+ }
+ }
- if (m_Sessions.empty())
+ if (!AdditionalHeader->empty())
{
- cpr::Session* NewSession = new cpr::Session();
- NewSession->SetUrl(UrlBuffer.c_str());
- return Session(this, NewSession);
+ CprSession->SetHeader(cpr::Header(AdditionalHeader->begin(), AdditionalHeader->end()));
}
else
{
- cpr::Session* NewSession = m_Sessions.back();
- m_Sessions.pop_back();
-
- NewSession->SetUrl(UrlBuffer.c_str());
- return Session(this, NewSession);
+ CprSession->SetHeader({});
+ }
+ if (!Parameters->empty())
+ {
+ cpr::Parameters Tmp;
+ for (auto It = Parameters->begin(); It != Parameters->end(); It++)
+ {
+ Tmp.Add({It->first, It->second});
+ }
+ CprSession->SetParameters(Tmp);
+ }
+ else
+ {
+ CprSession->SetParameters({});
}
+
+ ExtendableStringBuilder<128> UrlBuffer;
+ UrlBuffer << BaseUrl << ResourcePath;
+ CprSession->SetUrl(UrlBuffer.c_str());
+
+ return Session(this, CprSession);
}
void
HttpClient::Impl::ReleaseSession(cpr::Session* CprSession)
{
+ CprSession->SetUrl({});
+ CprSession->SetHeader({});
+ CprSession->SetBody({});
m_SessionLock.WithExclusiveLock([&] { m_Sessions.push_back(CprSession); });
}
+namespace detail {
+
+ static std::atomic_uint32_t TempFileBaseIndex;
+
+} // namespace detail
+
+class TempPayloadFile
+{
+public:
+ TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {}
+ ~TempPayloadFile()
+ {
+ if (m_FileHandle)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ // Mark file for deletion when final handle is closed
+ FILE_DISPOSITION_INFO Fdi{.DeleteFile = TRUE};
+
+ SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
+ BOOL Success = CloseHandle(m_FileHandle);
+#else
+ std::filesystem::path FilePath = zen::PathFromHandle(m_FileHandle);
+ unlink(FilePath.c_str());
+ int Fd = int(uintptr_t(m_FileHandle));
+ bool Success = (close(Fd) == 0);
+#endif
+ if (!Success)
+ {
+ ZEN_WARN("Error reported on file handle close, reason '{}'", GetLastErrorAsString());
+ }
+
+ m_FileHandle = nullptr;
+ }
+ }
+
+ std::error_code Open(const std::filesystem::path& TempFolderPath)
+ {
+ ZEN_ASSERT(m_FileHandle == nullptr);
+
+ std::uint64_t TmpIndex = ((std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) & 0xffffffffu) << 32) |
+ detail::TempFileBaseIndex.fetch_add(1);
+
+ std::filesystem::path FileName = TempFolderPath / fmt::to_string(TmpIndex);
+#if ZEN_PLATFORM_WINDOWS
+ LPCWSTR lpFileName = FileName.c_str();
+ const DWORD dwDesiredAccess = (GENERIC_READ | GENERIC_WRITE | DELETE);
+ const DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE;
+ LPSECURITY_ATTRIBUTES lpSecurityAttributes = nullptr;
+ const DWORD dwCreationDisposition = CREATE_ALWAYS;
+ const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL;
+ const HANDLE hTemplateFile = nullptr;
+ const HANDLE FileHandle = CreateFile(lpFileName,
+ dwDesiredAccess,
+ dwShareMode,
+ lpSecurityAttributes,
+ dwCreationDisposition,
+ dwFlagsAndAttributes,
+ hTemplateFile);
+
+ if (FileHandle == INVALID_HANDLE_VALUE)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+#else // ZEN_PLATFORM_WINDOWS
+ int OpenFlags = O_RDWR | O_CREAT | O_TRUNC | O_CLOEXEC;
+ int Fd = open(FileName.c_str(), OpenFlags, 0666);
+ if (Fd < 0)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+ fchmod(Fd, 0666);
+
+ void* FileHandle = (void*)(uintptr_t(Fd));
+#endif // ZEN_PLATFORM_WINDOWS
+ m_FileHandle = FileHandle;
+
+ return {};
+ }
+
+ std::error_code Write(std::string_view DataString)
+ {
+ ZEN_ASSERT(m_FileHandle != nullptr);
+ const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
+ const void* Data = DataString.data();
+ std::size_t Size = DataString.size();
+
+ while (Size)
+ {
+ const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize);
+ uint64_t NumberOfBytesWritten = 0;
+#if ZEN_PLATFORM_WINDOWS
+ OVERLAPPED Ovl{};
+
+ Ovl.Offset = DWORD(m_WriteOffset & 0xffff'ffffu);
+ Ovl.OffsetHigh = DWORD(m_WriteOffset >> 32);
+
+ DWORD dwNumberOfBytesWritten = 0;
+
+ BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl);
+ if (Success)
+ {
+ NumberOfBytesWritten = static_cast<uint64_t>(dwNumberOfBytesWritten);
+ }
+#else
+ static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files");
+ int Fd = int(uintptr_t(m_FileHandle));
+ int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, m_WriteOffset);
+ bool Success = (BytesWritten > 0);
+ if (Success)
+ {
+ NumberOfBytesWritten = static_cast<uint64_t>(BytesWritten);
+ }
+#endif
+
+ if (!Success)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+
+ Size -= NumberOfBytesWritten;
+ m_WriteOffset += NumberOfBytesWritten;
+ Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesWritten;
+ }
+ return {};
+ }
+
+ IoBuffer DetachToIoBuffer()
+ {
+ ZEN_ASSERT(m_FileHandle != nullptr);
+ void* FileHandle = m_FileHandle;
+ IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true);
+ Buffer.SetDeleteOnClose(true);
+ m_FileHandle = 0;
+ m_WriteOffset = 0;
+ return Buffer;
+ }
+
+private:
+ void* m_FileHandle;
+ std::uint64_t m_WriteOffset;
+};
+
//////////////////////////////////////////////////////////////////////////
-HttpClient::HttpClient(std::string_view BaseUri) : m_BaseUri(BaseUri), m_Impl(new Impl)
+HttpClient::HttpClient(std::string_view BaseUri, const HttpClientSettings& Connectionsettings)
+: m_BaseUri(BaseUri)
+, m_ConnectionSettings(Connectionsettings)
+, m_Impl(new Impl)
{
StringBuilder<32> SessionId;
GetSessionId().ToString(SessionId);
@@ -185,11 +434,11 @@ HttpClient::~HttpClient()
}
HttpClient::Response
-HttpClient::TransactPackage(std::string_view Url, CbPackage Package)
+HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::TransactPackage");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url);
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
// First, list of offered chunks for filtering on the server end
@@ -214,10 +463,10 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package)
BinaryWriter MemWriter;
Writer.Save(MemWriter);
- Sess->SetHeader({{"Content-Type", "application/x-ue-offer"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}});
+ Sess->UpdateHeader({{"Content-Type", "application/x-ue-offer"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}});
Sess->SetBody(cpr::Body{(const char*)MemWriter.Data(), MemWriter.Size()});
- cpr::Response FilterResponse = Sess->Post();
+ cpr::Response FilterResponse = Sess.Post();
if (FilterResponse.status_code == 200)
{
@@ -256,10 +505,10 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package)
CompositeBuffer Message = FormatPackageMessageBuffer(SendPackage);
SharedBuffer FlatMessage = Message.Flatten();
- Sess->SetHeader({{"Content-Type", "application/x-ue-cbpkg"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}});
+ Sess->UpdateHeader({{"Content-Type", "application/x-ue-cbpkg"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}});
Sess->SetBody(cpr::Body{(const char*)FlatMessage.GetData(), FlatMessage.GetSize()});
- cpr::Response FilterResponse = Sess->Post();
+ cpr::Response FilterResponse = Sess.Post();
if (!IsHttpSuccessCode(FilterResponse.status_code))
{
@@ -284,90 +533,199 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package)
//
HttpClient::Response
-HttpClient::Put(std::string_view Url, const IoBuffer& Payload)
+HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::Put");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url);
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
Sess->SetBody(AsCprBody(Payload));
- Sess->SetHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}});
+ Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}});
- return CommonResponse(Sess->Put());
+ return CommonResponse(Sess.Put());
}
HttpClient::Response
-HttpClient::Get(std::string_view Url)
+HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
{
ZEN_TRACE_CPU("HttpClient::Get");
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters);
+
+ return CommonResponse(Sess.Get());
+}
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url);
+HttpClient::Response
+HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader)
+{
+ ZEN_TRACE_CPU("HttpClient::Head");
- return CommonResponse(Sess->Get());
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
+
+ return CommonResponse(Sess.Head());
}
HttpClient::Response
-HttpClient::Delete(std::string_view Url)
+HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::Delete");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url);
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
- return CommonResponse(Sess->Delete());
+ return CommonResponse(Sess.Delete());
}
HttpClient::Response
-HttpClient::Post(std::string_view Url)
+HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::PostNoPayload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url);
- return CommonResponse(Sess->Post());
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
+
+ return CommonResponse(Sess.Post());
}
HttpClient::Response
-HttpClient::Post(std::string_view Url, const IoBuffer& Payload)
+HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::PostWithPayload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url);
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
Sess->SetBody(AsCprBody(Payload));
- Sess->SetHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}});
+ Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}});
- return CommonResponse(Sess->Post());
+ return CommonResponse(Sess.Post());
}
HttpClient::Response
-HttpClient::Post(std::string_view Url, CbObject Payload)
+HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::PostObjectPayload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url);
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
Sess->SetBody(AsCprBody(Payload));
- Sess->SetHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbObject))}});
+ Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbObject))}});
- return CommonResponse(Sess->Post());
+ return CommonResponse(Sess.Post());
}
HttpClient::Response
-HttpClient::Post(std::string_view Url, CbPackage Pkg)
+HttpClient::Post(std::string_view Url, CbPackage Pkg, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::PostPackage");
CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg);
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url);
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
Sess->SetBody(AsCprBody(Message));
- Sess->SetHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbPackage))}});
+ Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbPackage))}});
- return CommonResponse(Sess->Post());
+ return CommonResponse(Sess.Post());
+}
+
+HttpClient::Response
+HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader)
+{
+ ZEN_TRACE_CPU("HttpClient::Upload");
+
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
+ Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}});
+
+ uint64_t Offset = 0;
+ if (Payload.IsWholeFile())
+ {
+ auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(PayloadRange.GetView());
+ Offset += size;
+ return true;
+ };
+ Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ }
+ else
+ {
+ Sess->SetBody(AsCprBody(Payload));
+ }
+ return CommonResponse(Sess.Put());
+}
+
+HttpClient::Response
+HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader)
+{
+ ZEN_TRACE_CPU("HttpClient::Upload");
+
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
+ Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ContentType))}});
+
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+
+ return CommonResponse(Sess.Put());
+}
+
+HttpClient::Response
+HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFolderPath, const KeyValueMap& AdditionalHeader)
+{
+ ZEN_TRACE_CPU("HttpClient::Download");
+
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
+
+ std::string PayloadString;
+ std::unique_ptr<TempPayloadFile> PayloadFile;
+
+ cpr::Response Response = Sess.Download(cpr::WriteCallback{[&](std::string data, intptr_t) {
+ if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
+ {
+ PayloadFile = std::make_unique<TempPayloadFile>();
+ std::error_code Ec = PayloadFile->Open(TempFolderPath);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", TempFolderPath.string(), Ec.message());
+ return false;
+ }
+ PayloadFile->Write(PayloadString);
+ PayloadString.clear();
+ }
+ if (PayloadFile)
+ {
+ std::error_code Ec = PayloadFile->Write(data);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}",
+ TempFolderPath.string(),
+ Ec.message());
+ return false;
+ }
+ }
+ else
+ {
+ PayloadString.append(data);
+ }
+ return true;
+ }});
+
+ if (!PayloadString.empty())
+ {
+ Response.text = std::move(PayloadString);
+ }
+
+ return CommonResponse(std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{});
}
//////////////////////////////////////////////////////////////////////////
CbObject
-HttpClient::Response::AsObject()
+HttpClient::Response::AsObject() const
{
// TODO: sanity check the payload format etc
@@ -380,7 +738,7 @@ HttpClient::Response::AsObject()
}
CbPackage
-HttpClient::Response::AsPackage()
+HttpClient::Response::AsPackage() const
{
// TODO: sanity checks and error handling
if (ResponsePayload)
@@ -392,7 +750,7 @@ HttpClient::Response::AsPackage()
}
std::string_view
-HttpClient::Response::AsText()
+HttpClient::Response::AsText() const
{
if (ResponsePayload)
{
@@ -403,7 +761,7 @@ HttpClient::Response::AsText()
}
std::string
-HttpClient::Response::ToText()
+HttpClient::Response::ToText() const
{
if (!ResponsePayload)
return {};
@@ -438,24 +796,29 @@ HttpClient::Response::IsSuccess() const noexcept
return !Error && IsHttpSuccessCode(StatusCode);
}
+std::string
+HttpClient::Response::ErrorMessage(std::string_view Prefix) const
+{
+ if (Error.has_value())
+ {
+ return fmt::format("{}: {}", Prefix, Error->ErrorMessage);
+ }
+ else if (StatusCode != HttpResponseCode::ImATeapot && (int)StatusCode)
+ {
+ return fmt::format("{}: HTTP error {} {} ({})", Prefix, (int)StatusCode, zen::ToString(StatusCode), AsText());
+ }
+ else
+ {
+ return fmt::format("{}: {}", Prefix, "unknown error");
+ }
+}
+
void
HttpClient::Response::ThrowError(std::string_view ErrorPrefix)
{
if (!IsSuccess())
{
- if (Error.has_value())
- {
- throw std::runtime_error(fmt::format("{}: {}", ErrorPrefix, Error->ErrorMessage));
- }
- else if (StatusCode != HttpResponseCode::ImATeapot && (int)StatusCode)
- {
- throw std::runtime_error(
- fmt::format("{}: HTTP error {} {} ({})", ErrorPrefix, (int)StatusCode, zen::ToString(StatusCode), AsText()));
- }
- else
- {
- throw std::runtime_error(fmt::format("{}: {}", ErrorPrefix, "unknown error"));
- }
+ throw std::runtime_error(ErrorMessage(ErrorPrefix));
}
}