diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-08 08:51:54 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-08 14:51:54 +0200 |
| commit | 2f6a49f8d850806da94a92d0bd23e22735bf19a2 (patch) | |
| tree | ca11d34ade5cdded3d59c37294c1c2a299738800 /src | |
| parent | 0.2.20 (diff) | |
| download | zen-2f6a49f8d850806da94a92d0bd23e22735bf19a2.tar.xz zen-2f6a49f8d850806da94a92d0bd23e22735bf19a2.zip | |
Extend http client (#387)
* extend http client with configuration, headers, parameters and disk streaming upload/download
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/filesystem.cpp | 63 | ||||
| -rw-r--r-- | src/zencore/include/zencore/filesystem.h | 1 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 511 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/formatters.h | 40 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpclient.h | 101 |
5 files changed, 623 insertions, 93 deletions
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 3311ba1b9..8abe5af00 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -676,6 +676,69 @@ WriteFile(std::filesystem::path Path, CompositeBuffer InData) WriteFile(Path, DataPtrs.data(), DataPtrs.size()); } +bool +MoveToFile(std::filesystem::path Path, IoBuffer Data) +{ + if (!Data.IsWholeFile()) + { + return false; + } + IoBufferFileReference FileRef; + if (!Data.GetFileReference(/* out */ FileRef)) + { + return false; + } + +#if ZEN_PLATFORM_WINDOWS + const HANDLE ChunkFileHandle = FileRef.FileHandle; + std::wstring FileName = Path.native(); + const DWORD BufferSize = sizeof(FILE_RENAME_INFO) + gsl::narrow<DWORD>(FileName.size() * sizeof(WCHAR)); + FILE_RENAME_INFO* RenameInfo = reinterpret_cast<FILE_RENAME_INFO*>(Memory::Alloc(BufferSize)); + memset(RenameInfo, 0, BufferSize); + + RenameInfo->ReplaceIfExists = TRUE; + RenameInfo->FileNameLength = gsl::narrow<DWORD>(FileName.size()); + memcpy(RenameInfo->FileName, FileName.c_str(), FileName.size() * sizeof(WCHAR)); + RenameInfo->FileName[FileName.size()] = 0; + + // Try to move file into place + BOOL Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); + + if (!Success) + { + DWORD LastError = GetLastError(); + if (LastError == ERROR_PATH_NOT_FOUND) + { + zen::CreateDirectories(Path.parent_path()); + Success = SetFileInformationByHandle(ChunkFileHandle, FileRenameInfo, RenameInfo, BufferSize); + } + } + Memory::Free(RenameInfo); + if (!Success) + { + return false; + } +#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); + int Ret = link(SourcePath.c_str(), Path.c_str()); + if (Ret < 0) + { + int32_t err = errno; + if (err == ENOENT) + { + zen::CreateDirectories(Path.parent_path()); + Ret = link(SourcePath.c_str(), Path.c_str()); + } + } + if (Ret < 0) + { + return false; + } +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + Data.SetDeleteOnClose(false); + return true; +} + IoBuffer FileContents::Flatten() { diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h index 1a582672b..37a562664 100644 --- a/src/zencore/include/zencore/filesystem.h +++ b/src/zencore/include/zencore/filesystem.h @@ -66,6 +66,7 @@ ZENCORE_API bool ScanFile(std::filesystem::path Path, uint64_t ChunkSize, std::f ZENCORE_API void WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t BufferCount); ZENCORE_API void WriteFile(std::filesystem::path Path, IoBuffer Data); ZENCORE_API void WriteFile(std::filesystem::path Path, CompositeBuffer Data); +ZENCORE_API bool MoveToFile(std::filesystem::path Path, IoBuffer Data); struct CopyFileOptions { diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 744787201..f3a9ad71b 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -5,6 +5,9 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> +#include <zencore/compositebuffer.h> +#include <zencore/except.h> +#include <zencore/filesystem.h> #include <zencore/iobuffer.h> #include <zencore/logging.h> #include <zencore/session.h> @@ -12,6 +15,7 @@ #include <zencore/stream.h> #include <zencore/testing.h> #include <zencore/trace.h> +#include <zenhttp/formatters.h> #include <zenhttp/httpshared.h> ZEN_THIRD_PARTY_INCLUDES_START @@ -53,11 +57,11 @@ AsCprBody(const CompositeBuffer& Buffers) ////////////////////////////////////////////////////////////////////////// HttpClient::Response -ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode) +ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) { // This ends up doing a memcpy, would be good to get rid of it by streaming results // into buffer directly - IoBuffer ResponseBuffer = IoBuffer(IoBuffer::Clone, HttpResponse.text.data(), HttpResponse.text.size()); + IoBuffer ResponseBuffer = Payload ? std::move(Payload) : IoBuffer(IoBuffer::Clone, HttpResponse.text.data(), HttpResponse.text.size()); if (auto It = HttpResponse.header.find("Content-Type"); It != HttpResponse.header.end()) { @@ -66,30 +70,50 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp ResponseBuffer.SetContentType(ContentType); } - return HttpClient::Response{.StatusCode = WorkResponseCode, .ResponsePayload = std::move(ResponseBuffer)}; + if (!IsHttpSuccessCode(WorkResponseCode) && WorkResponseCode != HttpResponseCode::NotFound) + { + ZEN_WARN("HttpClient request failed: {}", HttpResponse); + } + + return HttpClient::Response{.StatusCode = WorkResponseCode, + .ResponsePayload = std::move(ResponseBuffer), + .Header = HttpClient::KeyValueMap(HttpResponse.header.begin(), HttpResponse.header.end()), + .UploadedBytes = gsl::narrow<int64_t>(HttpResponse.uploaded_bytes), + .DownloadedBytes = gsl::narrow<int64_t>(HttpResponse.downloaded_bytes), + .ElapsedSeconds = HttpResponse.elapsed}; } HttpClient::Response -CommonResponse(cpr::Response&& HttpResponse) +CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) { const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code); - - if (HttpResponse.status_code == 0) + if (HttpResponse.error) { - // Client side failure code + ZEN_WARN("HttpClient client error: {}", HttpResponse); + // Client side failure code return HttpClient::Response{ .StatusCode = WorkResponseCode, - .ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(HttpResponse.error.message.data(), HttpResponse.error.message.size())}; + .ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(HttpResponse.text.data(), HttpResponse.text.size()), + .Header = HttpClient::KeyValueMap(HttpResponse.header.begin(), HttpResponse.header.end()), + .UploadedBytes = gsl::narrow<int64_t>(HttpResponse.uploaded_bytes), + .DownloadedBytes = gsl::narrow<int64_t>(HttpResponse.downloaded_bytes), + .ElapsedSeconds = HttpResponse.elapsed, + .Error = HttpClient::ErrorContext{.ErrorCode = gsl::narrow<int>(HttpResponse.error.code), + .ErrorMessage = HttpResponse.error.message}}; } - if (WorkResponseCode == HttpResponseCode::NoContent || HttpResponse.text.empty()) + if (WorkResponseCode == HttpResponseCode::NoContent || (HttpResponse.text.empty() && !Payload)) { - return HttpClient::Response{.StatusCode = WorkResponseCode}; + return HttpClient::Response{.StatusCode = WorkResponseCode, + .Header = HttpClient::KeyValueMap(HttpResponse.header.begin(), HttpResponse.header.end()), + .UploadedBytes = gsl::narrow<int64_t>(HttpResponse.uploaded_bytes), + .DownloadedBytes = gsl::narrow<int64_t>(HttpResponse.downloaded_bytes), + .ElapsedSeconds = HttpResponse.elapsed}; } else { - return ResponseWithPayload(HttpResponse, WorkResponseCode); + return ResponseWithPayload(HttpResponse, WorkResponseCode, std::move(Payload)); } } @@ -108,6 +132,42 @@ struct HttpClient::Impl : public RefCounted ~Session() { Outer->ReleaseSession(CprSession); } inline cpr::Session* operator->() const { return CprSession; } + inline cpr::Response Get() + { + cpr::Response Result = CprSession->Get(); + ZEN_TRACE("GET {}", Result); + return Result; + } + inline cpr::Response Download(cpr::WriteCallback&& write) + { + cpr::Response Result = CprSession->Download(write); + ZEN_TRACE("GET {}", Result); + return Result; + } + inline cpr::Response Head() + { + cpr::Response Result = CprSession->Head(); + ZEN_TRACE("HEAD {}", Result); + return Result; + } + inline cpr::Response Put() + { + cpr::Response Result = CprSession->Put(); + ZEN_TRACE("PUT {}", Result); + return Result; + } + inline cpr::Response Post() + { + cpr::Response Result = CprSession->Post(); + ZEN_TRACE("POST {}", Result); + return Result; + } + inline cpr::Response Delete() + { + cpr::Response Result = CprSession->Delete(); + ZEN_TRACE("DELETE {}", Result); + return Result; + } private: Impl* Outer; @@ -117,7 +177,11 @@ struct HttpClient::Impl : public RefCounted Session& operator=(Session&&) = delete; }; - Session AllocSession(const std::string_view BaseUrl, const std::string_view Url); + Session AllocSession(const std::string_view BaseUrl, + const std::string_view Url, + const HttpClientSettings& ConnectionSettings, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters); private: RwLock m_SessionLock; @@ -142,38 +206,223 @@ HttpClient::Impl::~Impl() } HttpClient::Impl::Session -HttpClient::Impl::AllocSession(const std::string_view BaseUrl, const std::string_view ResourcePath) +HttpClient::Impl::AllocSession(const std::string_view BaseUrl, + const std::string_view ResourcePath, + const HttpClientSettings& ConnectionSettings, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters) { - RwLock::ExclusiveLockScope _(m_SessionLock); + bool IsNew = false; + cpr::Session* CprSession = nullptr; + m_SessionLock.WithExclusiveLock([&] { + if (m_Sessions.empty()) + { + CprSession = new cpr::Session(); + IsNew = true; + } + else + { + CprSession = m_Sessions.back(); + m_Sessions.pop_back(); + } + }); - ExtendableStringBuilder<128> UrlBuffer; - UrlBuffer << BaseUrl << ResourcePath; + if (IsNew) + { + CprSession->SetConnectTimeout(ConnectionSettings.ConnectTimeout); + CprSession->SetTimeout(ConnectionSettings.Timeout); + if (ConnectionSettings.AssumeHttp2) + { + CprSession->SetHttpVersion(cpr::HttpVersion{cpr::HttpVersionCode::VERSION_2_0_PRIOR_KNOWLEDGE}); + } + } - if (m_Sessions.empty()) + if (!AdditionalHeader->empty()) { - cpr::Session* NewSession = new cpr::Session(); - NewSession->SetUrl(UrlBuffer.c_str()); - return Session(this, NewSession); + CprSession->SetHeader(cpr::Header(AdditionalHeader->begin(), AdditionalHeader->end())); } else { - cpr::Session* NewSession = m_Sessions.back(); - m_Sessions.pop_back(); - - NewSession->SetUrl(UrlBuffer.c_str()); - return Session(this, NewSession); + CprSession->SetHeader({}); + } + if (!Parameters->empty()) + { + cpr::Parameters Tmp; + for (auto It = Parameters->begin(); It != Parameters->end(); It++) + { + Tmp.Add({It->first, It->second}); + } + CprSession->SetParameters(Tmp); + } + else + { + CprSession->SetParameters({}); } + + ExtendableStringBuilder<128> UrlBuffer; + UrlBuffer << BaseUrl << ResourcePath; + CprSession->SetUrl(UrlBuffer.c_str()); + + return Session(this, CprSession); } void HttpClient::Impl::ReleaseSession(cpr::Session* CprSession) { + CprSession->SetUrl({}); + CprSession->SetHeader({}); + CprSession->SetBody({}); m_SessionLock.WithExclusiveLock([&] { m_Sessions.push_back(CprSession); }); } +namespace detail { + + static std::atomic_uint32_t TempFileBaseIndex; + +} // namespace detail + +class TempPayloadFile +{ +public: + TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {} + ~TempPayloadFile() + { + if (m_FileHandle) + { +#if ZEN_PLATFORM_WINDOWS + // Mark file for deletion when final handle is closed + FILE_DISPOSITION_INFO Fdi{.DeleteFile = TRUE}; + + SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); + BOOL Success = CloseHandle(m_FileHandle); +#else + std::filesystem::path FilePath = zen::PathFromHandle(m_FileHandle); + unlink(FilePath.c_str()); + int Fd = int(uintptr_t(m_FileHandle)); + bool Success = (close(Fd) == 0); +#endif + if (!Success) + { + ZEN_WARN("Error reported on file handle close, reason '{}'", GetLastErrorAsString()); + } + + m_FileHandle = nullptr; + } + } + + std::error_code Open(const std::filesystem::path& TempFolderPath) + { + ZEN_ASSERT(m_FileHandle == nullptr); + + std::uint64_t TmpIndex = ((std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) & 0xffffffffu) << 32) | + detail::TempFileBaseIndex.fetch_add(1); + + std::filesystem::path FileName = TempFolderPath / fmt::to_string(TmpIndex); +#if ZEN_PLATFORM_WINDOWS + LPCWSTR lpFileName = FileName.c_str(); + const DWORD dwDesiredAccess = (GENERIC_READ | GENERIC_WRITE | DELETE); + const DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; + LPSECURITY_ATTRIBUTES lpSecurityAttributes = nullptr; + const DWORD dwCreationDisposition = CREATE_ALWAYS; + const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; + const HANDLE hTemplateFile = nullptr; + const HANDLE FileHandle = CreateFile(lpFileName, + dwDesiredAccess, + dwShareMode, + lpSecurityAttributes, + dwCreationDisposition, + dwFlagsAndAttributes, + hTemplateFile); + + if (FileHandle == INVALID_HANDLE_VALUE) + { + return MakeErrorCodeFromLastError(); + } +#else // ZEN_PLATFORM_WINDOWS + int OpenFlags = O_RDWR | O_CREAT | O_TRUNC | O_CLOEXEC; + int Fd = open(FileName.c_str(), OpenFlags, 0666); + if (Fd < 0) + { + return MakeErrorCodeFromLastError(); + } + fchmod(Fd, 0666); + + void* FileHandle = (void*)(uintptr_t(Fd)); +#endif // ZEN_PLATFORM_WINDOWS + m_FileHandle = FileHandle; + + return {}; + } + + std::error_code Write(std::string_view DataString) + { + ZEN_ASSERT(m_FileHandle != nullptr); + const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; + const void* Data = DataString.data(); + std::size_t Size = DataString.size(); + + while (Size) + { + const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize); + uint64_t NumberOfBytesWritten = 0; +#if ZEN_PLATFORM_WINDOWS + OVERLAPPED Ovl{}; + + Ovl.Offset = DWORD(m_WriteOffset & 0xffff'ffffu); + Ovl.OffsetHigh = DWORD(m_WriteOffset >> 32); + + DWORD dwNumberOfBytesWritten = 0; + + BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl); + if (Success) + { + NumberOfBytesWritten = static_cast<uint64_t>(dwNumberOfBytesWritten); + } +#else + static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files"); + int Fd = int(uintptr_t(m_FileHandle)); + int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, m_WriteOffset); + bool Success = (BytesWritten > 0); + if (Success) + { + NumberOfBytesWritten = static_cast<uint64_t>(BytesWritten); + } +#endif + + if (!Success) + { + return MakeErrorCodeFromLastError(); + } + + Size -= NumberOfBytesWritten; + m_WriteOffset += NumberOfBytesWritten; + Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesWritten; + } + return {}; + } + + IoBuffer DetachToIoBuffer() + { + ZEN_ASSERT(m_FileHandle != nullptr); + void* FileHandle = m_FileHandle; + IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true); + Buffer.SetDeleteOnClose(true); + m_FileHandle = 0; + m_WriteOffset = 0; + return Buffer; + } + +private: + void* m_FileHandle; + std::uint64_t m_WriteOffset; +}; + ////////////////////////////////////////////////////////////////////////// -HttpClient::HttpClient(std::string_view BaseUri) : m_BaseUri(BaseUri), m_Impl(new Impl) +HttpClient::HttpClient(std::string_view BaseUri, const HttpClientSettings& Connectionsettings) +: m_BaseUri(BaseUri) +, m_ConnectionSettings(Connectionsettings) +, m_Impl(new Impl) { StringBuilder<32> SessionId; GetSessionId().ToString(SessionId); @@ -185,11 +434,11 @@ HttpClient::~HttpClient() } HttpClient::Response -HttpClient::TransactPackage(std::string_view Url, CbPackage Package) +HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::TransactPackage"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); // First, list of offered chunks for filtering on the server end @@ -214,10 +463,10 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package) BinaryWriter MemWriter; Writer.Save(MemWriter); - Sess->SetHeader({{"Content-Type", "application/x-ue-offer"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}}); + Sess->UpdateHeader({{"Content-Type", "application/x-ue-offer"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}}); Sess->SetBody(cpr::Body{(const char*)MemWriter.Data(), MemWriter.Size()}); - cpr::Response FilterResponse = Sess->Post(); + cpr::Response FilterResponse = Sess.Post(); if (FilterResponse.status_code == 200) { @@ -256,10 +505,10 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package) CompositeBuffer Message = FormatPackageMessageBuffer(SendPackage); SharedBuffer FlatMessage = Message.Flatten(); - Sess->SetHeader({{"Content-Type", "application/x-ue-cbpkg"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}}); + Sess->UpdateHeader({{"Content-Type", "application/x-ue-cbpkg"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}}); Sess->SetBody(cpr::Body{(const char*)FlatMessage.GetData(), FlatMessage.GetSize()}); - cpr::Response FilterResponse = Sess->Post(); + cpr::Response FilterResponse = Sess.Post(); if (!IsHttpSuccessCode(FilterResponse.status_code)) { @@ -284,90 +533,199 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package) // HttpClient::Response -HttpClient::Put(std::string_view Url, const IoBuffer& Payload) +HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Put"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); Sess->SetBody(AsCprBody(Payload)); - Sess->SetHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); + Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); - return CommonResponse(Sess->Put()); + return CommonResponse(Sess.Put()); } HttpClient::Response -HttpClient::Get(std::string_view Url) +HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Get"); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters); + + return CommonResponse(Sess.Get()); +} - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url); +HttpClient::Response +HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) +{ + ZEN_TRACE_CPU("HttpClient::Head"); - return CommonResponse(Sess->Get()); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); + + return CommonResponse(Sess.Head()); } HttpClient::Response -HttpClient::Delete(std::string_view Url) +HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Delete"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - return CommonResponse(Sess->Delete()); + return CommonResponse(Sess.Delete()); } HttpClient::Response -HttpClient::Post(std::string_view Url) +HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::PostNoPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url); - return CommonResponse(Sess->Post()); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); + + return CommonResponse(Sess.Post()); } HttpClient::Response -HttpClient::Post(std::string_view Url, const IoBuffer& Payload) +HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::PostWithPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); Sess->SetBody(AsCprBody(Payload)); - Sess->SetHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); + Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); - return CommonResponse(Sess->Post()); + return CommonResponse(Sess.Post()); } HttpClient::Response -HttpClient::Post(std::string_view Url, CbObject Payload) +HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::PostObjectPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); Sess->SetBody(AsCprBody(Payload)); - Sess->SetHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbObject))}}); + Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbObject))}}); - return CommonResponse(Sess->Post()); + return CommonResponse(Sess.Post()); } HttpClient::Response -HttpClient::Post(std::string_view Url, CbPackage Pkg) +HttpClient::Post(std::string_view Url, CbPackage Pkg, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::PostPackage"); CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); Sess->SetBody(AsCprBody(Message)); - Sess->SetHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbPackage))}}); + Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbPackage))}}); - return CommonResponse(Sess->Post()); + return CommonResponse(Sess.Post()); +} + +HttpClient::Response +HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) +{ + ZEN_TRACE_CPU("HttpClient::Upload"); + + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); + Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); + + uint64_t Offset = 0; + if (Payload.IsWholeFile()) + { + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + } + else + { + Sess->SetBody(AsCprBody(Payload)); + } + return CommonResponse(Sess.Put()); +} + +HttpClient::Response +HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) +{ + ZEN_TRACE_CPU("HttpClient::Upload"); + + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); + Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ContentType))}}); + + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + + return CommonResponse(Sess.Put()); +} + +HttpClient::Response +HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFolderPath, const KeyValueMap& AdditionalHeader) +{ + ZEN_TRACE_CPU("HttpClient::Download"); + + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); + + std::string PayloadString; + std::unique_ptr<TempPayloadFile> PayloadFile; + + cpr::Response Response = Sess.Download(cpr::WriteCallback{[&](std::string data, intptr_t) { + if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) + { + PayloadFile = std::make_unique<TempPayloadFile>(); + std::error_code Ec = PayloadFile->Open(TempFolderPath); + if (Ec) + { + ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", TempFolderPath.string(), Ec.message()); + return false; + } + PayloadFile->Write(PayloadString); + PayloadString.clear(); + } + if (PayloadFile) + { + std::error_code Ec = PayloadFile->Write(data); + if (Ec) + { + ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}", + TempFolderPath.string(), + Ec.message()); + return false; + } + } + else + { + PayloadString.append(data); + } + return true; + }}); + + if (!PayloadString.empty()) + { + Response.text = std::move(PayloadString); + } + + return CommonResponse(std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}); } ////////////////////////////////////////////////////////////////////////// CbObject -HttpClient::Response::AsObject() +HttpClient::Response::AsObject() const { // TODO: sanity check the payload format etc @@ -380,7 +738,7 @@ HttpClient::Response::AsObject() } CbPackage -HttpClient::Response::AsPackage() +HttpClient::Response::AsPackage() const { // TODO: sanity checks and error handling if (ResponsePayload) @@ -392,7 +750,7 @@ HttpClient::Response::AsPackage() } std::string_view -HttpClient::Response::AsText() +HttpClient::Response::AsText() const { if (ResponsePayload) { @@ -403,7 +761,7 @@ HttpClient::Response::AsText() } std::string -HttpClient::Response::ToText() +HttpClient::Response::ToText() const { if (!ResponsePayload) return {}; @@ -438,24 +796,29 @@ HttpClient::Response::IsSuccess() const noexcept return !Error && IsHttpSuccessCode(StatusCode); } +std::string +HttpClient::Response::ErrorMessage(std::string_view Prefix) const +{ + if (Error.has_value()) + { + return fmt::format("{}: {}", Prefix, Error->ErrorMessage); + } + else if (StatusCode != HttpResponseCode::ImATeapot && (int)StatusCode) + { + return fmt::format("{}: HTTP error {} {} ({})", Prefix, (int)StatusCode, zen::ToString(StatusCode), AsText()); + } + else + { + return fmt::format("{}: {}", Prefix, "unknown error"); + } +} + void HttpClient::Response::ThrowError(std::string_view ErrorPrefix) { if (!IsSuccess()) { - if (Error.has_value()) - { - throw std::runtime_error(fmt::format("{}: {}", ErrorPrefix, Error->ErrorMessage)); - } - else if (StatusCode != HttpResponseCode::ImATeapot && (int)StatusCode) - { - throw std::runtime_error( - fmt::format("{}: HTTP error {} {} ({})", ErrorPrefix, (int)StatusCode, zen::ToString(StatusCode), AsText())); - } - else - { - throw std::runtime_error(fmt::format("{}: {}", ErrorPrefix, "unknown error")); - } + throw std::runtime_error(ErrorMessage(ErrorPrefix)); } } diff --git a/src/zenhttp/include/zenhttp/formatters.h b/src/zenhttp/include/zenhttp/formatters.h index 759df58d3..d45f5fbb2 100644 --- a/src/zenhttp/include/zenhttp/formatters.h +++ b/src/zenhttp/include/zenhttp/formatters.h @@ -6,6 +6,7 @@ #include <zencore/compactbinaryvalidation.h> #include <zencore/iobuffer.h> #include <zencore/string.h> +#include <zenhttp/httpclient.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> @@ -69,3 +70,42 @@ struct fmt::formatter<cpr::Response> } } }; + +template<> +struct fmt::formatter<zen::HttpClient::Response> +{ + constexpr auto parse(format_parse_context& Ctx) -> decltype(Ctx.begin()) { return Ctx.end(); } + + template<typename FormatContext> + auto format(const zen::HttpClient::Response& Response, FormatContext& Ctx) -> decltype(Ctx.out()) + { + using namespace std::literals; + + if (Response.IsSuccess()) + { + return fmt::format_to(Ctx.out(), + "OK: Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s", + ToString(Response.StatusCode), + Response.UploadedBytes, + Response.DownloadedBytes, + Response.ElapsedSeconds); + } + else if (Response.Error) + { + return fmt::format_to(Ctx.out(), + "Failed: Elapsed: {}s, Error: ({}) '{}", + Response.ElapsedSeconds, + Response.Error.value().ErrorCode, + Response.Error.value().ErrorMessage); + } + else + { + return fmt::format_to(Ctx.out(), + "Failed: Bytes: {}/{} (Up/Down), Elapsed: {}s, Reason: '{}", + Response.UploadedBytes, + Response.DownloadedBytes, + Response.ElapsedSeconds, + Response.ErrorMessage(""sv)); + } + } +}; diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index 9ff4910bf..18031b280 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -9,10 +9,12 @@ #include <zenhttp/httpcommon.h> #include <optional> +#include <unordered_map> namespace zen { class CbPackage; +class CompositeBuffer; /** HTTP client implementation for Zen use cases @@ -24,22 +26,68 @@ class CbPackage; */ +struct HttpClientSettings +{ + std::chrono::milliseconds ConnectTimeout{3000}; + std::chrono::milliseconds Timeout{}; + bool AssumeHttp2 = false; +}; + class HttpClient { public: - HttpClient(std::string_view BaseUri); + struct Settings + { + }; + HttpClient(std::string_view BaseUri, const HttpClientSettings& Connectionsettings = {}); ~HttpClient(); struct ErrorContext { + int ErrorCode; std::string ErrorMessage; }; + struct KeyValueMap + { + KeyValueMap() = default; + std::unordered_map<std::string, std::string> Entries; + + constexpr inline const std::unordered_map<std::string, std::string>* operator->() const { return &Entries; } + constexpr inline std::unordered_map<std::string, std::string>* operator->() { return &Entries; } + constexpr inline const std::unordered_map<std::string, std::string>& operator*() const { return Entries; } + constexpr inline std::unordered_map<std::string, std::string>& operator*() { return Entries; } + + template<typename T> + KeyValueMap(T Begin, T End) : Entries(Begin, End) + { + } + KeyValueMap(std::pair<std::string, std::string>&& Entry) : Entries({{Entry}}) {} + KeyValueMap(std::pair<std::string_view, std::string_view>&& Entry) + : Entries({{{std::string(Entry.first), std::string(Entry.second)}}}) + { + } + KeyValueMap(std::span<std::pair<std::string_view, std::string_view>>&& List) : Entries(List.begin(), List.end()) {} + KeyValueMap(std::initializer_list<std::pair<std::string_view, std::string_view>>&& List) : Entries(List.begin(), List.end()) {} + }; + struct Response { HttpResponseCode StatusCode = HttpResponseCode::ImATeapot; IoBuffer ResponsePayload; // Note: this also includes the content type + // Contains the reponse headers + KeyValueMap Header; + + // The number of bytes sent as part of the request + int64_t UploadedBytes; + + // The number of bytes recevied as part of the response + int64_t DownloadedBytes; + + // The elapsed time in seconds for the request to execute + double ElapsedSeconds; + // This contains any errors from the HTTP stack. It won't contain information on // why the server responded with a non-success HTTP status, that may be gleaned // from the response payload itself depending on what the server provides. @@ -47,19 +95,19 @@ public: // Return the response payload as a CbObject. Note that this does not attempt to // validate that the content type or content itself makes sense as a CbObject - CbObject AsObject(); + CbObject AsObject() const; // Return the response payload as a CbPackage. Note that this does not attempt to // validate that the content type or content itself makes sense as a CbPackage - CbPackage AsPackage(); + CbPackage AsPackage() const; // Return the response payload as a string. Note that this does not attempt to // validate that the content type or content itself makes sense as a string. - std::string_view AsText(); + std::string_view AsText() const; // Return text representation of the payload. Formats into JSON for structured // objects, returns text as-is for text types like Text, JSON, HTML etc - std::string ToText(); + std::string ToText() const; // Returns whether the HTTP status code is considered successful (i.e in the // 2xx range) @@ -67,26 +115,41 @@ public: inline explicit operator bool() const noexcept { return IsSuccess(); } void ThrowError(std::string_view ErrorPrefix = "error"); - }; - - [[nodiscard]] Response Put(std::string_view Url, const IoBuffer& Payload); - [[nodiscard]] Response Get(std::string_view Url); - [[nodiscard]] Response Delete(std::string_view Url); - [[nodiscard]] Response Post(std::string_view Url); - [[nodiscard]] Response Post(std::string_view Url, const IoBuffer& Payload); - [[nodiscard]] Response Post(std::string_view Url, CbObject Payload); - [[nodiscard]] Response Post(std::string_view Url, CbPackage Payload); - [[nodiscard]] Response TransactPackage(std::string_view Url, CbPackage Package); + std::string ErrorMessage(std::string_view Prefix) const; + }; - inline std::string GetBaseUri() const { return m_BaseUri; } + [[nodiscard]] Response Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Get(std::string_view Url, const KeyValueMap& AdditionalHeader = {}, const KeyValueMap& Parameters = {}); + [[nodiscard]] Response Head(std::string_view Url, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Delete(std::string_view Url, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Post(std::string_view Url, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Post(std::string_view Url, CbObject Payload, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Post(std::string_view Url, CbPackage Payload, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Upload(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Upload(std::string_view Url, + const CompositeBuffer& Payload, + ZenContentType ContentType, + const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Download(std::string_view Url, + const std::filesystem::path& TempFolderPath, + const KeyValueMap& AdditionalHeader = {}); + + [[nodiscard]] Response TransactPackage(std::string_view Url, CbPackage Package, const KeyValueMap& AdditionalHeader = {}); + + static std::pair<std::string_view, std::string_view> Accept(ZenContentType ContentType) + { + return std::make_pair("Accept", MapContentTypeToString(ContentType)); + } private: struct Impl; - std::string m_BaseUri; - std::string m_SessionId; - Ref<Impl> m_Impl; + std::string m_BaseUri; + std::string m_SessionId; + const HttpClientSettings m_ConnectionSettings; + Ref<Impl> m_Impl; }; void httpclient_forcelink(); // internal |