diff options
| author | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
| commit | bb298631ba35a323827dda0b8cd6158e276b5f61 (patch) | |
| tree | 7ba8db91c44ce83f2c518f80f80ab14910eefa6f /src/zenhttp | |
| parent | Change to PutResult structure (diff) | |
| parent | 5.6.14 (diff) | |
| download | zen-bb298631ba35a323827dda0b8cd6158e276b5f61.tar.xz zen-bb298631ba35a323827dda0b8cd6158e276b5f61.zip | |
Merge branch 'main' into zs/put-overwrite-policy
Diffstat (limited to 'src/zenhttp')
| -rw-r--r-- | src/zenhttp/auth/authmgr.cpp | 2 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 836 | ||||
| -rw-r--r-- | src/zenhttp/httpclientauth.cpp | 111 | ||||
| -rw-r--r-- | src/zenhttp/httpserver.cpp | 193 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/formatters.h | 99 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpclient.h | 8 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpclientauth.h | 4 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpserver.h | 23 | ||||
| -rw-r--r-- | src/zenhttp/packageformat.cpp | 69 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpasio.cpp | 21 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpmulti.cpp | 4 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpsys.cpp | 31 | ||||
| -rw-r--r-- | src/zenhttp/transports/dlltransport.cpp | 92 | ||||
| -rw-r--r-- | src/zenhttp/transports/dlltransport.h | 2 |
14 files changed, 1096 insertions, 399 deletions
diff --git a/src/zenhttp/auth/authmgr.cpp b/src/zenhttp/auth/authmgr.cpp index 1a9892d5c..8f7befc80 100644 --- a/src/zenhttp/auth/authmgr.cpp +++ b/src/zenhttp/auth/authmgr.cpp @@ -379,7 +379,7 @@ private: AuthState.EndArray(); } - std::filesystem::create_directories(m_Config.RootDirectory); + CreateDirectories(m_Config.RootDirectory); std::optional<std::string> Reason; diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 8052a8fd5..a2d323b5e 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -12,13 +12,19 @@ #include <zencore/filesystem.h> #include <zencore/iobuffer.h> #include <zencore/logging.h> +#include <zencore/memory/memory.h> #include <zencore/session.h> #include <zencore/sharedbuffer.h> #include <zencore/stream.h> #include <zencore/string.h> -#include <zencore/testing.h> #include <zencore/trace.h> +#if ZEN_WITH_TESTS +# include <zencore/basicfile.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif // ZEN_WITH_TESTS + ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -42,9 +48,13 @@ namespace detail { class TempPayloadFile { public: + TempPayloadFile(const TempPayloadFile&) = delete; + TempPayloadFile& operator=(const TempPayloadFile&) = delete; + TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {} ~TempPayloadFile() { + ZEN_TRACE_CPU("TempPayloadFile::Close"); try { if (m_FileHandle) @@ -85,8 +95,9 @@ namespace detail { } } - std::error_code Open(const std::filesystem::path& TempFolderPath) + std::error_code Open(const std::filesystem::path& TempFolderPath, uint64_t FinalSize) { + ZEN_TRACE_CPU("TempPayloadFile::Open"); ZEN_ASSERT(m_FileHandle == nullptr); std::uint64_t TmpIndex = ((std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) & 0xffffffffu) << 32) | @@ -126,11 +137,14 @@ namespace detail { #endif // ZEN_PLATFORM_WINDOWS m_FileHandle = FileHandle; + PrepareFileForScatteredWrite(m_FileHandle, FinalSize); + return {}; } std::error_code Write(std::string_view DataString) { + ZEN_TRACE_CPU("TempPayloadFile::Write"); const uint8_t* DataPtr = (const uint8_t*)DataString.data(); size_t DataSize = DataString.size(); if (DataSize >= CacheBufferSize) @@ -165,6 +179,7 @@ namespace detail { IoBuffer DetachToIoBuffer() { + ZEN_TRACE_CPU("TempPayloadFile::DetachToIoBuffer"); if (std::error_code Ec = Flush(); Ec) { ThrowSystemError(Ec.value(), Ec.message()); @@ -180,6 +195,7 @@ namespace detail { IoBuffer BorrowIoBuffer() { + ZEN_TRACE_CPU("TempPayloadFile::BorrowIoBuffer"); if (std::error_code Ec = Flush(); Ec) { ThrowSystemError(Ec.value(), Ec.message()); @@ -193,6 +209,7 @@ namespace detail { uint64_t GetSize() const { return m_WriteOffset; } void ResetWritePos(uint64_t WriteOffset) { + ZEN_TRACE_CPU("TempPayloadFile::ResetWritePos"); Flush(); m_WriteOffset = WriteOffset; } @@ -200,6 +217,7 @@ namespace detail { private: std::error_code Flush() { + ZEN_TRACE_CPU("TempPayloadFile::Flush"); if (m_CacheBufferOffset == 0) { return {}; @@ -211,6 +229,7 @@ namespace detail { std::error_code AppendData(const void* Data, uint64_t Size) { + ZEN_TRACE_CPU("TempPayloadFile::AppendData"); ZEN_ASSERT(m_FileHandle != nullptr); const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; @@ -261,6 +280,167 @@ namespace detail { std::uint64_t m_CacheBufferOffset = 0; }; + class BufferedReadFileStream + { + public: + BufferedReadFileStream(const BufferedReadFileStream&) = delete; + BufferedReadFileStream& operator=(const BufferedReadFileStream&) = delete; + + BufferedReadFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize, uint64_t BufferSize) + : m_FileHandle(FileHandle) + , m_FileSize(FileSize) + , m_FileEnd(FileOffset + FileSize) + , m_BufferSize(Min(BufferSize, FileSize)) + , m_FileOffset(FileOffset) + { + } + + ~BufferedReadFileStream() { Memory::Free(m_Buffer); } + void Read(void* Data, uint64_t Size) + { + ZEN_ASSERT(Data != nullptr); + if (Size > m_BufferSize) + { + Read(Data, Size, m_FileOffset); + m_FileOffset += Size; + return; + } + uint8_t* WritePtr = ((uint8_t*)Data); + uint64_t Begin = m_FileOffset; + uint64_t End = m_FileOffset + Size; + ZEN_ASSERT(m_FileOffset >= m_BufferStart); + if (m_FileOffset < m_BufferEnd) + { + ZEN_ASSERT(m_Buffer != nullptr); + uint64_t Count = Min(m_BufferEnd, End) - m_FileOffset; + memcpy(WritePtr + Begin - m_FileOffset, m_Buffer + Begin - m_BufferStart, Count); + Begin += Count; + if (Begin == End) + { + m_FileOffset = End; + return; + } + } + if (End == m_FileEnd) + { + Read(WritePtr + Begin - m_FileOffset, End - Begin, Begin); + } + else + { + if (!m_Buffer) + { + m_BufferSize = Min(m_FileEnd - m_FileOffset, m_BufferSize); + m_Buffer = (uint8_t*)Memory::Alloc(gsl::narrow<size_t>(m_BufferSize)); + } + m_BufferStart = Begin; + m_BufferEnd = Min(Begin + m_BufferSize, m_FileEnd); + Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart); + uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart; + memcpy(WritePtr + Begin - m_FileOffset, m_Buffer, Count); + ZEN_ASSERT(Begin + Count == End); + } + m_FileOffset = End; + } + + private: + void Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset) + { + const uint64_t MaxChunkSize = 1u * 1024 * 1024; + std::error_code Ec; + ReadFile(m_FileHandle, Data, BytesToRead, FileOffset, MaxChunkSize, Ec); + + if (Ec) + { + std::error_code DummyEc; + throw std::system_error( + Ec, + fmt::format( + "HttpClient::BufferedReadFileStream ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})", + FileOffset, + BytesToRead, + PathFromHandle(m_FileHandle, DummyEc).generic_string(), + m_FileSize)); + } + } + + void* m_FileHandle = nullptr; + const uint64_t m_FileSize = 0; + const uint64_t m_FileEnd = 0; + uint64_t m_BufferSize = 0; + uint8_t* m_Buffer = nullptr; + uint64_t m_BufferStart = 0; + uint64_t m_BufferEnd = 0; + uint64_t m_FileOffset = 0; + }; + + class CompositeBufferReadStream + { + public: + CompositeBufferReadStream(const CompositeBuffer& Data, uint64_t BufferSize) + : m_Data(Data) + , m_BufferSize(BufferSize) + , m_SegmentIndex(0) + , m_BytesLeftInSegment(0) + { + } + uint64_t Read(void* Data, uint64_t Size) + { + uint64_t Result = 0; + uint8_t* WritePtr = (uint8_t*)Data; + while ((Size > 0) && (m_SegmentIndex < m_Data.GetSegments().size())) + { + if (m_BytesLeftInSegment == 0) + { + const SharedBuffer& Segment = m_Data.GetSegments()[m_SegmentIndex]; + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if (Segment.AsIoBuffer().GetFileReference(FileRef)) + { + m_SegmentDiskBuffer = std::make_unique<BufferedReadFileStream>(FileRef.FileHandle, + FileRef.FileChunkOffset, + FileRef.FileChunkSize, + m_BufferSize); + } + else + { + m_SegmentMemoryBuffer = Segment.GetView(); + } + m_BytesLeftInSegment = Segment.GetSize(); + } + uint64_t BytesToRead = Min(m_BytesLeftInSegment, Size); + if (m_SegmentDiskBuffer) + { + m_SegmentDiskBuffer->Read(WritePtr, BytesToRead); + } + else + { + ZEN_ASSERT_SLOW(m_SegmentMemoryBuffer.GetSize() >= BytesToRead); + memcpy(WritePtr, m_SegmentMemoryBuffer.GetData(), BytesToRead); + m_SegmentMemoryBuffer.MidInline(BytesToRead); + } + WritePtr += BytesToRead; + Size -= BytesToRead; + Result += BytesToRead; + + m_BytesLeftInSegment -= BytesToRead; + if (m_BytesLeftInSegment == 0) + { + m_SegmentDiskBuffer.reset(); + m_SegmentMemoryBuffer.Reset(); + m_SegmentIndex++; + } + } + return Result; + } + + private: + const CompositeBuffer& m_Data; + const uint64_t m_BufferSize; + size_t m_SegmentIndex; + std::unique_ptr<BufferedReadFileStream> m_SegmentDiskBuffer; + MemoryView m_SegmentMemoryBuffer; + uint64_t m_BytesLeftInSegment; + }; + } // namespace detail ////////////////////////////////////////////////////////////////////////// @@ -282,7 +462,7 @@ AsCprBody(const IoBuffer& Obj) ////////////////////////////////////////////////////////////////////////// static HttpClient::Response -ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) +ResponseWithPayload(std::string_view SessionId, cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) { // This ends up doing a memcpy, would be good to get rid of it by streaming results // into buffer directly @@ -297,7 +477,7 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp if (!IsHttpSuccessCode(WorkResponseCode) && WorkResponseCode != HttpResponseCode::NotFound) { - ZEN_WARN("HttpClient request failed: {}", HttpResponse); + ZEN_WARN("HttpClient request failed (session: {}): {}", SessionId, HttpResponse); } return HttpClient::Response{.StatusCode = WorkResponseCode, @@ -309,12 +489,16 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp } static HttpClient::Response -CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) +CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) { const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code); if (HttpResponse.error) { - ZEN_WARN("HttpClient client error: {}", HttpResponse); + if (HttpResponse.error.code != cpr::ErrorCode::OPERATION_TIMEDOUT && + HttpResponse.error.code != cpr::ErrorCode::CONNECTION_FAILURE && HttpResponse.error.code != cpr::ErrorCode::REQUEST_CANCELLED) + { + ZEN_WARN("HttpClient client failure (session: {}): {}", SessionId, HttpResponse); + } // Client side failure code return HttpClient::Response{ @@ -339,6 +523,7 @@ CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) else { return ResponseWithPayload( + SessionId, HttpResponse, WorkResponseCode, Payload ? std::move(Payload) : IoBufferBuilder::MakeCloneFromMemory(HttpResponse.text.data(), HttpResponse.text.size())); @@ -352,9 +537,10 @@ ShouldRetry(const cpr::Response& Response) { case cpr::ErrorCode::OK: break; - case cpr::ErrorCode::OPERATION_TIMEDOUT: + case cpr::ErrorCode::INTERNAL_ERROR: case cpr::ErrorCode::NETWORK_RECEIVE_ERROR: case cpr::ErrorCode::NETWORK_SEND_FAILURE: + case cpr::ErrorCode::OPERATION_TIMEDOUT: return true; default: return false; @@ -364,6 +550,7 @@ ShouldRetry(const cpr::Response& Response) case HttpResponseCode::RequestTimeout: case HttpResponseCode::TooManyRequests: case HttpResponseCode::InternalServerError: + case HttpResponseCode::BadGateway: case HttpResponseCode::ServiceUnavailable: case HttpResponseCode::GatewayTimeout: return true; @@ -375,6 +562,7 @@ ShouldRetry(const cpr::Response& Response) static bool ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile>& PayloadFile) { + ZEN_TRACE_CPU("ValidatePayload"); IoBuffer ResponseBuffer = (Response.text.empty() && PayloadFile) ? PayloadFile->BorrowIoBuffer() : IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()); @@ -396,6 +584,11 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile } } + if (Response.status_code == (long)HttpResponseCode::PartialContent) + { + return true; + } + if (auto JupiterHash = Response.header.find("X-Jupiter-IoHash"); JupiterHash != Response.header.end()) { IoHash ExpectedPayloadHash; @@ -448,22 +641,40 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile } static cpr::Response -DoWithRetry(std::function<cpr::Response()>&& Func, uint8_t RetryCount) +DoWithRetry( + std::string_view SessionId, + std::function<cpr::Response()>&& Func, + uint8_t RetryCount, + std::function<bool(cpr::Response& Result)>&& Validate = [](cpr::Response&) { return true; }) { uint8_t Attempt = 0; cpr::Response Result = Func(); - while (Attempt < RetryCount && ShouldRetry(Result)) + while (Attempt < RetryCount) { + if (!ShouldRetry(Result)) + { + if (Result.error || !IsHttpSuccessCode(Result.status_code)) + { + break; + } + if (Validate(Result)) + { + break; + } + } Sleep(100 * (Attempt + 1)); Attempt++; - ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); + ZEN_INFO("{} Attempt {}/{}", CommonResponse(SessionId, std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); Result = Func(); } return Result; } static cpr::Response -DoWithRetry(std::function<cpr::Response()>&& Func, std::unique_ptr<detail::TempPayloadFile>& PayloadFile, uint8_t RetryCount) +DoWithRetry(std::string_view SessionId, + std::function<cpr::Response()>&& Func, + std::unique_ptr<detail::TempPayloadFile>& PayloadFile, + uint8_t RetryCount) { uint8_t Attempt = 0; cpr::Response Result = Func(); @@ -482,7 +693,7 @@ DoWithRetry(std::function<cpr::Response()>&& Func, std::unique_ptr<detail::TempP } Sleep(100 * (Attempt + 1)); Attempt++; - ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); + ZEN_INFO("{} Attempt {}/{}", CommonResponse(SessionId, std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); Result = Func(); } return Result; @@ -511,12 +722,14 @@ struct HttpClient::Impl : public RefCounted inline cpr::Session* operator->() const { return CprSession; } inline cpr::Response Get() { + ZEN_TRACE_CPU("HttpClient::Impl::Get"); cpr::Response Result = CprSession->Get(); ZEN_TRACE("GET {}", Result); return Result; } inline cpr::Response Download(cpr::WriteCallback&& Write, std::optional<cpr::HeaderCallback>&& Header = {}) { + ZEN_TRACE_CPU("HttpClient::Impl::Download"); if (Header) { CprSession->SetHeaderCallback(std::move(Header.value())); @@ -529,12 +742,14 @@ struct HttpClient::Impl : public RefCounted } inline cpr::Response Head() { + ZEN_TRACE_CPU("HttpClient::Impl::Head"); cpr::Response Result = CprSession->Head(); ZEN_TRACE("HEAD {}", Result); return Result; } inline cpr::Response Put(std::optional<cpr::ReadCallback>&& Read = {}) { + ZEN_TRACE_CPU("HttpClient::Impl::Put"); if (Read) { CprSession->SetReadCallback(std::move(Read.value())); @@ -546,6 +761,7 @@ struct HttpClient::Impl : public RefCounted } inline cpr::Response Post(std::optional<cpr::ReadCallback>&& Read = {}) { + ZEN_TRACE_CPU("HttpClient::Impl::Post"); if (Read) { CprSession->SetReadCallback(std::move(Read.value())); @@ -557,6 +773,7 @@ struct HttpClient::Impl : public RefCounted } inline cpr::Response Delete() { + ZEN_TRACE_CPU("HttpClient::Impl::Delete"); cpr::Response Result = CprSession->Delete(); ZEN_TRACE("DELETE {}", Result); return Result; @@ -596,6 +813,7 @@ HttpClient::Impl::Impl(LoggerRef Log) : m_Log(Log) HttpClient::Impl::~Impl() { + ZEN_TRACE_CPU("HttpClient::Impl::~Impl"); m_SessionLock.WithExclusiveLock([&] { for (auto CprSession : m_Sessions) { @@ -614,6 +832,7 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl, const std::string_view SessionId, std::optional<HttpClientAccessToken> AccessToken) { + ZEN_TRACE_CPU("HttpClient::Impl::AllocSession"); cpr::Session* CprSession = nullptr; m_SessionLock.WithExclusiveLock([&] { if (!m_Sessions.empty()) @@ -670,6 +889,7 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl, void HttpClient::Impl::ReleaseSession(cpr::Session* CprSession) { + ZEN_TRACE_CPU("HttpClient::Impl::ReleaseSession"); CprSession->SetUrl({}); CprSession->SetHeader({}); CprSession->SetBody({}); @@ -694,6 +914,7 @@ HttpClient::~HttpClient() bool HttpClient::Authenticate() { + ZEN_TRACE_CPU("HttpClient::Authenticate"); std::optional<HttpClientAccessToken> Token = GetAccessToken(); if (!Token) { @@ -705,6 +926,7 @@ HttpClient::Authenticate() const std::optional<HttpClientAccessToken> HttpClient::GetAccessToken() { + ZEN_TRACE_CPU("HttpClient::GetAccessToken"); if (!m_ConnectionSettings.AccessTokenProvider.has_value()) { return {}; @@ -829,15 +1051,18 @@ HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap { ZEN_TRACE_CPU("HttpClient::Put"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); - return Sess.Put(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -845,31 +1070,40 @@ HttpClient::Put(std::string_view Url, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Put"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, - Url, - m_ConnectionSettings, - {{"Content-Length", "0"}}, - Parameters, - m_SessionId, - GetAccessToken()); - return Sess.Put(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse(m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, + Url, + m_ConnectionSettings, + {{"Content-Length", "0"}}, + Parameters, + m_SessionId, + GetAccessToken()); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Get"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); - return Sess.Get(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Get(); + }, + m_ConnectionSettings.RetryCount, + [](cpr::Response& Result) { + std::unique_ptr<detail::TempPayloadFile> NoTempFile; + return ValidatePayload(Result, NoTempFile); + })); } HttpClient::Response @@ -877,13 +1111,16 @@ HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Head"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - return Sess.Head(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Head(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -891,13 +1128,16 @@ HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Delete"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - return Sess.Delete(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Delete(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -905,13 +1145,16 @@ HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, cons { ZEN_TRACE_CPU("HttpClient::PostNoPayload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); - return Sess.Post(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -925,16 +1168,32 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType C { ZEN_TRACE_CPU("HttpClient::PostWithPayload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader({HeaderContentType(ContentType)}); - return Sess.Post(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if (Payload.GetFileReference(FileRef)) + { + uint64_t Offset = 0; + detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u); + auto ReadCallback = [&Payload, &Offset, &Buffer](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + Buffer.Read(buffer, size); + Offset += size; + return true; + }; + return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + } + Sess->SetBody(AsCprBody(Payload)); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -942,16 +1201,19 @@ HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& Addi { ZEN_TRACE_CPU("HttpClient::PostObjectPayload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)}); - return Sess.Post(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -965,24 +1227,23 @@ HttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenConten { ZEN_TRACE_CPU("HttpClient::Post"); - return CommonResponse(DoWithRetry( - [&]() { - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->UpdateHeader({HeaderContentType(ContentType)}); - - return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u); + auto ReadCallback = [&Reader](char* buffer, size_t& size, intptr_t) { + size = Reader.Read(buffer, size); + return true; + }; + return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -990,29 +1251,32 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue { ZEN_TRACE_CPU("HttpClient::Upload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); - - uint64_t Offset = 0; - if (Payload.IsWholeFile()) - { - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - } - Sess->SetBody(AsCprBody(Payload)); - return Sess.Put(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if (Payload.GetFileReference(FileRef)) + { + uint64_t Offset = 0; + detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u); + auto ReadCallback = [&Payload, &Offset, &Buffer](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + Buffer.Read(buffer, size); + Offset += size; + return true; + }; + return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + } + Sess->SetBody(AsCprBody(Payload)); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -1020,24 +1284,23 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont { ZEN_TRACE_CPU("HttpClient::Upload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->UpdateHeader({HeaderContentType(ContentType)}); - - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u); + auto ReadCallback = [&Reader](char* buffer, size_t& size, intptr_t) { + size = Reader.Read(buffer, size); + return true; + }; + return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -1048,6 +1311,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold std::string PayloadString; std::unique_ptr<detail::TempPayloadFile> PayloadFile; cpr::Response Response = DoWithRetry( + m_SessionId, [&]() { auto GetHeader = [&](std::string header) -> std::pair<std::string, std::string> { size_t DelimiterPos = header.find(':'); @@ -1087,6 +1351,30 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold return true; }; + uint64_t RequestedContentLength = (uint64_t)-1; + if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end()) + { + if (RangeIt->second.starts_with("bytes")) + { + size_t RangeStartPos = RangeIt->second.find('=', 5); + if (RangeStartPos != std::string::npos) + { + RangeStartPos++; + size_t RangeSplitPos = RangeIt->second.find('-', RangeStartPos); + if (RangeSplitPos != std::string::npos) + { + std::optional<size_t> RequestedRangeStart = + ParseInt<size_t>(RangeIt->second.substr(RangeStartPos, RangeSplitPos - RangeStartPos)); + std::optional<size_t> RequestedRangeEnd = ParseInt<size_t>(RangeIt->second.substr(RangeStartPos + 1)); + if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value()) + { + RequestedContentLength = RequestedRangeEnd.value() - 1; + } + } + } + } + } + cpr::Response Response; { std::vector<std::pair<std::string, std::string>> ReceivedHeaders; @@ -1094,13 +1382,13 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold std::pair<std::string, std::string> Header = GetHeader(header); if (Header.first == "Content-Length"sv) { - std::optional<size_t> ContentSize = ParseInt<size_t>(Header.second); - if (ContentSize.has_value()) + std::optional<size_t> ContentLength = ParseInt<size_t>(Header.second); + if (ContentLength.has_value()) { - if (ContentSize.value() > 1024 * 1024) + if (ContentLength.value() > 1024 * 1024) { PayloadFile = std::make_unique<detail::TempPayloadFile>(); - std::error_code Ec = PayloadFile->Open(TempFolderPath); + std::error_code Ec = PayloadFile->Open(TempFolderPath, ContentLength.value()); if (Ec) { ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", @@ -1111,7 +1399,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold } else { - PayloadString.reserve(ContentSize.value()); + PayloadString.reserve(ContentLength.value()); } } } @@ -1157,85 +1445,90 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold auto It = Response.header.find("Content-Length"); if (It != Response.header.end()) { - std::optional<int64_t> ContentLength = ParseInt<int64_t>(It->second); - if (ContentLength) - { - std::vector<std::pair<std::string, std::string>> ReceivedHeaders; + std::vector<std::pair<std::string, std::string>> ReceivedHeaders; - auto HeaderCallback = [&](std::string header, intptr_t) { - std::pair<std::string, std::string> Header = GetHeader(header); - if (!Header.first.empty()) - { - ReceivedHeaders.emplace_back(std::move(Header)); - } + auto HeaderCallback = [&](std::string header, intptr_t) { + std::pair<std::string, std::string> Header = GetHeader(header); + if (!Header.first.empty()) + { + ReceivedHeaders.emplace_back(std::move(Header)); + } - if (Header.first == "Content-Range"sv) + if (Header.first == "Content-Range"sv) + { + if (Header.second.starts_with("bytes "sv)) { - if (Header.second.starts_with("bytes "sv)) + size_t RangeStartEnd = Header.second.find('-', 6); + if (RangeStartEnd != std::string::npos) { - size_t RangeStartEnd = Header.second.find('-', 6); - if (RangeStartEnd != std::string::npos) + const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6)); + if (Start) { - const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6)); - if (Start) + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + if (Start.value() == DownloadedSize) { - uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); - if (Start.value() == DownloadedSize) - { - return 1; - } - else if (Start.value() > DownloadedSize) - { - return 0; - } - if (PayloadFile) - { - PayloadFile->ResetWritePos(Start.value()); - } - else - { - PayloadString = PayloadString.substr(0, Start.value()); - } return 1; } + else if (Start.value() > DownloadedSize) + { + return 0; + } + if (PayloadFile) + { + PayloadFile->ResetWritePos(Start.value()); + } + else + { + PayloadString = PayloadString.substr(0, Start.value()); + } + return 1; } } - return 0; } - return 1; - }; + return 0; + } + return 1; + }; - KeyValueMap HeadersWithRange(AdditionalHeader); - do - { - uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + KeyValueMap HeadersWithRange(AdditionalHeader); + do + { + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); - std::string Range = fmt::format("bytes={}-{}", DownloadedSize, ContentLength.value()); - if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) + uint64_t ContentLength = RequestedContentLength; + if (ContentLength == uint64_t(-1)) + { + if (auto ParsedContentLength = ParseInt<int64_t>(It->second); ParsedContentLength.has_value()) { - if (RangeIt->second == Range) - { - // If we didn't make any progress, abort - break; - } + ContentLength = ParsedContentLength.value(); } - HeadersWithRange.Entries.insert_or_assign("Range", Range); - - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, - Url, - m_ConnectionSettings, - HeadersWithRange, - {}, - m_SessionId, - GetAccessToken()); - Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); - for (const std::pair<std::string, std::string>& H : ReceivedHeaders) + } + + std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1); + if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) + { + if (RangeIt->second == Range) { - Response.header.insert_or_assign(H.first, H.second); + // If we didn't make any progress, abort + break; } - ReceivedHeaders.clear(); - } while (ShouldResume(Response)); - } + } + HeadersWithRange.Entries.insert_or_assign("Range", Range); + + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, + Url, + m_ConnectionSettings, + HeadersWithRange, + {}, + m_SessionId, + GetAccessToken()); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); + for (const std::pair<std::string, std::string>& H : ReceivedHeaders) + { + Response.header.insert_or_assign(H.first, H.second); + } + ReceivedHeaders.clear(); + } while (ShouldResume(Response)); } } } @@ -1249,7 +1542,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold PayloadFile, m_ConnectionSettings.RetryCount); - return CommonResponse(std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}); + return CommonResponse(m_SessionId, std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}); } ////////////////////////////////////////////////////////////////////////// @@ -1361,6 +1654,133 @@ HttpClient::Response::ThrowError(std::string_view ErrorPrefix) #if ZEN_WITH_TESTS +namespace testutil { + IoHash HashComposite(const CompositeBuffer& Payload) + { + IoHashStream Hasher; + const uint64_t PayloadSize = Payload.GetSize(); + std::vector<uint8_t> Buffer(64u * 1024u); + detail::CompositeBufferReadStream Stream(Payload, 137u * 1024u); + for (uint64_t Offset = 0; Offset < PayloadSize;) + { + uint64_t Count = Min(64u * 1024u, PayloadSize - Offset); + Stream.Read(Buffer.data(), Count); + Hasher.Append(Buffer.data(), Count); + Offset += Count; + } + return Hasher.GetHash(); + }; + + IoHash HashFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize) + { + IoHashStream Hasher; + std::vector<uint8_t> Buffer(64u * 1024u); + detail::BufferedReadFileStream Stream(FileHandle, FileOffset, FileSize, 137u * 1024u); + for (uint64_t Offset = 0; Offset < FileSize;) + { + uint64_t Count = Min(64u * 1024u, FileSize - Offset); + Stream.Read(Buffer.data(), Count); + Hasher.Append(Buffer.data(), Count); + Offset += Count; + } + return Hasher.GetHash(); + } + +} // namespace testutil + +TEST_CASE("responseformat") +{ + using namespace std::literals; + + SUBCASE("identity") + { + BodyLogFormatter _{"abcd"}; + CHECK_EQ(_.GetText(), "abcd"sv); + } + + SUBCASE("very long") + { + std::string_view LongView = + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"; + + BodyLogFormatter _{LongView}; + + CHECK(_.GetText().size() < LongView.size()); + CHECK(_.GetText().starts_with("[truncated"sv)); + } + + SUBCASE("invalid text") + { + std::string_view BadText = "totobaba\xff\xfe"; + + BodyLogFormatter _{BadText}; + + CHECK_EQ(_.GetText(), "totobaba"); + } +} + +TEST_CASE("BufferedReadFileStream") +{ + ScopedTemporaryDirectory TmpDir; + + IoBuffer DiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer1"); + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + CHECK(DiskBuffer.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); + + IoBuffer Partial(DiskBuffer, 37 * 1024, 512 * 1024); + CHECK(Partial.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(Partial), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); + + IoBuffer SmallDiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(63 * 1024)), TmpDir.Path() / "diskbuffer2"); + CHECK(SmallDiskBuffer.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(SmallDiskBuffer), + testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); +} + +TEST_CASE("CompositeBufferReadStream") +{ + ScopedTemporaryDirectory TmpDir; + + IoBuffer MemoryBuffer1 = CreateRandomBlob(64); + CHECK_EQ(IoHash::HashBuffer(MemoryBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer1)))); + + IoBuffer MemoryBuffer2 = CreateRandomBlob(561 * 1024); + CHECK_EQ(IoHash::HashBuffer(MemoryBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer2)))); + + IoBuffer DiskBuffer1 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(267 * 3 * 1024)), TmpDir.Path() / "diskbuffer1"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer1)))); + + IoBuffer DiskBuffer2 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(3 * 1024)), TmpDir.Path() / "diskbuffer2"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer2)))); + + IoBuffer DiskBuffer3 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer3"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer3), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer3)))); + + CompositeBuffer Data(SharedBuffer(std::move(MemoryBuffer1)), + SharedBuffer(std::move(DiskBuffer1)), + SharedBuffer(std::move(DiskBuffer2)), + SharedBuffer(std::move(MemoryBuffer2)), + SharedBuffer(std::move(DiskBuffer3))); + CHECK_EQ(IoHash::HashBuffer(Data), testutil::HashComposite(Data)); +} + TEST_CASE("httpclient") { using namespace std::literals; diff --git a/src/zenhttp/httpclientauth.cpp b/src/zenhttp/httpclientauth.cpp index 04ac2ad3f..39efe1d0c 100644 --- a/src/zenhttp/httpclientauth.cpp +++ b/src/zenhttp/httpclientauth.cpp @@ -2,14 +2,26 @@ #include <zenhttp/httpclientauth.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/process.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/uid.h> #include <zenhttp/auth/authmgr.h> +#include <ctime> + ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> #include <fmt/format.h> #include <json11.hpp> ZEN_THIRD_PARTY_INCLUDES_END +#if ZEN_PLATFORM_WINDOWS +# define timegm _mkgmtime +#endif // ZEN_PLATFORM_WINDOWS + namespace zen { namespace httpclientauth { using namespace std::literals; @@ -41,6 +53,7 @@ namespace zen { namespace httpclientauth { if (Response.error || Response.status_code != 200) { + ZEN_WARN("Failed fetching OAuth access token {}. Reason: '{}'", OAuthParams.Url, Response.reason); return HttpClientAccessToken{}; } @@ -49,6 +62,7 @@ namespace zen { namespace httpclientauth { if (JsonError.empty() == false) { + ZEN_WARN("Unable to parse OAuth json response from {}. Reason: '{}'", OAuthParams.Url, JsonError); return HttpClientAccessToken{}; } @@ -73,4 +87,101 @@ namespace zen { namespace httpclientauth { return CreateFromOpenIdProvider(AuthManager, "Default"sv); } + static HttpClientAccessToken GetOidcTokenFromExe(const std::filesystem::path& OidcExecutablePath, + std::string_view CloudHost, + bool Unattended) + { + Stopwatch Timer; + + CreateProcOptions ProcOptions; + + const std::filesystem::path AuthTokenPath(std::filesystem::temp_directory_path() / fmt::format(".zen-auth-{}", Oid::NewOid())); + auto _ = MakeGuard([AuthTokenPath]() { RemoveFile(AuthTokenPath); }); + + const std::string ProcArgs = fmt::format("{} --AuthConfigUrl {} --OutFile {} --Unattended={}", + OidcExecutablePath, + CloudHost, + AuthTokenPath, + Unattended ? "true"sv : "false"sv); + ZEN_DEBUG("Running: {}", ProcArgs); + ProcessHandle Proc; + Proc.Initialize(CreateProc(OidcExecutablePath, ProcArgs, ProcOptions)); + if (!Proc.IsValid()) + { + throw std::runtime_error(fmt::format("failed to launch '{}'", OidcExecutablePath)); + } + + int ExitCode = Proc.WaitExitCode(); + + auto EndTime = std::chrono::system_clock::now(); + + if (ExitCode == 0) + { + IoBuffer Body = IoBufferBuilder::MakeFromFile(AuthTokenPath); + std::string JsonText(reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()); + + std::string JsonError; + json11::Json Json = json11::Json::parse(JsonText, JsonError); + + if (JsonError.empty() == false) + { + ZEN_WARN("Unable to parse Oidcs json response from {}. Reason: '{}'", AuthTokenPath, JsonError); + return HttpClientAccessToken{}; + } + std::string Token = Json["Token"].string_value(); + std::string ExpiresAtUTCString = Json["ExpiresAtUtc"].string_value(); + ZEN_ASSERT(!ExpiresAtUTCString.empty()); + + int Year = 0; + int Month = 0; + int Day = 0; + int Hour = 0; + int Minute = 0; + int Second = 0; + int Millisecond = 0; + sscanf(ExpiresAtUTCString.c_str(), "%d-%d-%dT%d:%d:%d.%dZ", &Year, &Month, &Day, &Hour, &Minute, &Second, &Millisecond); + + std::tm Time = { + Second, + Minute, + Hour, + Day, + Month - 1, + Year - 1900, + }; + + time_t UTCTime = timegm(&Time); + HttpClientAccessToken::TimePoint ExpireTime = std::chrono::system_clock::from_time_t(UTCTime); + ExpireTime += std::chrono::microseconds(Millisecond); + + return HttpClientAccessToken{.Value = fmt::format("Bearer {}"sv, Token), .ExpireTime = ExpireTime}; + } + else + { + ZEN_WARN("Failed running {} to get auth token, error code {}", OidcExecutablePath, ExitCode); + } + return HttpClientAccessToken{}; + } + + std::optional<std::function<HttpClientAccessToken()>> CreateFromOidcTokenExecutable(const std::filesystem::path& OidcExecutablePath, + std::string_view CloudHost) + { + HttpClientAccessToken InitialToken = GetOidcTokenFromExe(OidcExecutablePath, CloudHost, false); + if (InitialToken.IsValid()) + { + return [OidcExecutablePath = std::filesystem::path(OidcExecutablePath), + CloudHost = std::string(CloudHost), + InitialToken]() mutable { + if (InitialToken.IsValid()) + { + HttpClientAccessToken Result = InitialToken; + InitialToken = {}; + return Result; + } + return GetOidcTokenFromExe(OidcExecutablePath, CloudHost, true); + }; + } + return {}; + } + }} // namespace zen::httpclientauth diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp index 1fbe22628..764f2a2a7 100644 --- a/src/zenhttp/httpserver.cpp +++ b/src/zenhttp/httpserver.cpp @@ -31,6 +31,8 @@ #include <span> #include <string_view> +#include <EASTL/fixed_vector.h> + namespace zen { using namespace std::literals; @@ -529,7 +531,7 @@ HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType { std::span<const SharedBuffer> Segments = Payload.GetSegments(); - std::vector<IoBuffer> Buffers; + eastl::fixed_vector<IoBuffer, 64> Buffers; Buffers.reserve(Segments.size()); for (auto& Segment : Segments) @@ -537,7 +539,7 @@ HttpServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentType Buffers.push_back(Segment.AsIoBuffer()); } - WriteResponse(ResponseCode, ContentType, Buffers); + WriteResponse(ResponseCode, ContentType, std::span<IoBuffer>(begin(Buffers), end(Buffers))); } std::string @@ -785,120 +787,131 @@ HttpRpcHandler::AddRpc(std::string_view RpcId, std::function<void(CbObject& RpcA ////////////////////////////////////////////////////////////////////////// -enum class HttpServerClass -{ - kHttpAsio, - kHttpSys, - kHttpPlugin, - kHttpMulti, - kHttpNull -}; - Ref<HttpServer> -CreateHttpServerClass(HttpServerClass Class, const HttpServerConfig& Config) +CreateHttpServerClass(const std::string_view ServerClass, const HttpServerConfig& Config) { - switch (Class) + if (ServerClass == "asio"sv) { - default: - case HttpServerClass::kHttpAsio: - ZEN_INFO("using asio HTTP server implementation"); - return CreateHttpAsioServer(Config.ForceLoopback, Config.ThreadCount); - - case HttpServerClass::kHttpMulti: - { - ZEN_INFO("using multi HTTP server implementation"); - Ref<HttpMultiServer> Server{new HttpMultiServer()}; - - // This is hardcoded for now, but should be configurable in the future - Server->AddServer(CreateHttpServerClass(HttpServerClass::kHttpSys, Config)); - Server->AddServer(CreateHttpServerClass(HttpServerClass::kHttpPlugin, Config)); + ZEN_INFO("using asio HTTP server implementation") + return CreateHttpAsioServer(Config.ForceLoopback, Config.ThreadCount); + } +#if ZEN_WITH_HTTPSYS + else if (ServerClass == "httpsys"sv) + { + ZEN_INFO("using http.sys server implementation") + return Ref<HttpServer>(CreateHttpSysServer({.ThreadCount = Config.ThreadCount, + .AsyncWorkThreadCount = Config.HttpSys.AsyncWorkThreadCount, + .IsAsyncResponseEnabled = Config.HttpSys.IsAsyncResponseEnabled, + .IsRequestLoggingEnabled = Config.HttpSys.IsRequestLoggingEnabled, + .IsDedicatedServer = Config.IsDedicatedServer, + .ForceLoopback = Config.ForceLoopback})); + } +#endif + else if (ServerClass == "null"sv) + { + ZEN_INFO("using null HTTP server implementation") + return Ref<HttpServer>(new HttpNullServer); + } + else + { + ZEN_WARN("unknown HTTP server implementation '{}', falling back to default", ServerClass) - return Server; - } +#if ZEN_WITH_HTTPSYS + return CreateHttpServerClass("httpsys"sv, Config); +#else + return CreateHttpServerClass("asio"sv, Config); +#endif + } +} #if ZEN_WITH_PLUGINS - case HttpServerClass::kHttpPlugin: - { - ZEN_INFO("using plugin HTTP server implementation"); - Ref<HttpPluginServer> Server{CreateHttpPluginServer()}; +Ref<HttpServer> +CreateHttpServerPlugin(const HttpServerPluginConfig& PluginConfig) +{ + const std::string& PluginName = PluginConfig.PluginName; - // This is hardcoded for now, but should be configurable in the future + ZEN_INFO("using '{}' plugin HTTP server implementation", PluginName) + if (PluginName.starts_with("builtin:"sv)) + { # if 0 - Ref<TransportPlugin> WinsockPlugin{CreateSocketTransportPlugin()}; - WinsockPlugin->Configure("port", "8558"); - Server->AddPlugin(WinsockPlugin); -# endif + Ref<TransportPlugin> Plugin = {}; + if (PluginName == "builtin:winsock"sv) + { + Plugin = CreateSocketTransportPlugin(); + } + else if (PluginName == "builtin:asio"sv) + { + Plugin = CreateAsioTransportPlugin(); + } + else + { + ZEN_WARN("Unknown builtin plugin '{}'", PluginName) + return {}; + } -# if 0 - Ref<TransportPlugin> AsioPlugin{CreateAsioTransportPlugin()}; - AsioPlugin->Configure("port", "8558"); - Server->AddPlugin(AsioPlugin); -# endif + ZEN_ASSERT(!Plugin.IsNull()); -# if 1 - Ref<DllTransportPlugin> DllPlugin{CreateDllTransportPlugin()}; - DllPlugin->LoadDll("winsock"); - DllPlugin->ConfigureDll("winsock", "port", "8558"); - Server->AddPlugin(DllPlugin); -# endif + for (const std::pair<std::string, std::string>& Option : PluginConfig.PluginOptions) + { + Plugin->Configure(Option.first.c_str(), Option.second.c_str()); + } - return Server; - } -#endif + Ref<HttpPluginServer> Server{CreateHttpPluginServer()}; + Server->AddPlugin(Plugin); + return Server; +# else + ZEN_WARN("Builtin plugin '{}' is not supported", PluginName) + return {}; +# endif + } -#if ZEN_WITH_HTTPSYS - case HttpServerClass::kHttpSys: - ZEN_INFO("using http.sys server implementation"); - return Ref<HttpServer>(CreateHttpSysServer({.ThreadCount = Config.ThreadCount, - .AsyncWorkThreadCount = Config.HttpSys.AsyncWorkThreadCount, - .IsAsyncResponseEnabled = Config.HttpSys.IsAsyncResponseEnabled, - .IsRequestLoggingEnabled = Config.HttpSys.IsRequestLoggingEnabled, - .IsDedicatedServer = Config.IsDedicatedServer, - .ForceLoopback = Config.ForceLoopback})); -#endif + Ref<DllTransportPlugin> DllPlugin{CreateDllTransportPlugin()}; + if (!DllPlugin->LoadDll(PluginName)) + { + return {}; + } - case HttpServerClass::kHttpNull: - ZEN_INFO("using null HTTP server implementation"); - return Ref<HttpServer>(new HttpNullServer); + for (const std::pair<std::string, std::string>& Option : PluginConfig.PluginOptions) + { + DllPlugin->ConfigureDll(PluginName, Option.first.c_str(), Option.second.c_str()); } + + Ref<HttpPluginServer> Server{CreateHttpPluginServer()}; + Server->AddPlugin(DllPlugin); + return Server; } +#endif Ref<HttpServer> CreateHttpServer(const HttpServerConfig& Config) { using namespace std::literals; - HttpServerClass Class = HttpServerClass::kHttpNull; - -#if ZEN_WITH_HTTPSYS - Class = HttpServerClass::kHttpSys; -#else - Class = HttpServerClass::kHttpAsio; -#endif - - if (Config.ServerClass == "asio"sv) - { - Class = HttpServerClass::kHttpAsio; - } - else if (Config.ServerClass == "httpsys"sv) - { - Class = HttpServerClass::kHttpSys; - } - else if (Config.ServerClass == "plugin"sv) - { - Class = HttpServerClass::kHttpPlugin; - } - else if (Config.ServerClass == "null"sv) +#if ZEN_WITH_PLUGINS + if (Config.PluginConfigs.empty()) { - Class = HttpServerClass::kHttpNull; + return CreateHttpServerClass(Config.ServerClass, Config); } - else if (Config.ServerClass == "multi"sv) + else { - Class = HttpServerClass::kHttpMulti; - } + Ref<HttpMultiServer> Server{new HttpMultiServer()}; + Server->AddServer(CreateHttpServerClass(Config.ServerClass, Config)); - return CreateHttpServerClass(Class, Config); + for (const HttpServerPluginConfig& PluginConfig : Config.PluginConfigs) + { + Ref<HttpServer> PluginServer = CreateHttpServerPlugin(PluginConfig); + if (!PluginServer.IsNull()) + { + Server->AddServer(PluginServer); + } + } + + return Server; + } +#else + return CreateHttpServerClass(Config.ServerClass, Config); +#endif } ////////////////////////////////////////////////////////////////////////// diff --git a/src/zenhttp/include/zenhttp/formatters.h b/src/zenhttp/include/zenhttp/formatters.h index 538136238..05a23d675 100644 --- a/src/zenhttp/include/zenhttp/formatters.h +++ b/src/zenhttp/include/zenhttp/formatters.h @@ -7,12 +7,57 @@ #include <zencore/iobuffer.h> #include <zencore/string.h> #include <zenhttp/httpclient.h> +#include <zenhttp/httpcommon.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> #include <fmt/format.h> ZEN_THIRD_PARTY_INCLUDES_END +namespace zen { + +struct BodyLogFormatter +{ +private: + std::string_view ResponseText; + zen::ExtendableStringBuilder<128> ModifiedResponse; + +public: + explicit BodyLogFormatter(std::string_view InResponseText) : ResponseText(InResponseText) + { + using namespace std::literals; + + const int TextSizeLimit = 1024; + + // Trim invalid UTF8 + + auto InvalidIt = zen::FindFirstInvalidUtf8Byte(ResponseText); + + if (InvalidIt != end(ResponseText)) + { + ResponseText = ResponseText.substr(0, InvalidIt - begin(ResponseText)); + } + + if (ResponseText.empty()) + { + ResponseText = "<suppressed non-text response>"sv; + } + + if (ResponseText.size() > TextSizeLimit) + { + const auto TruncatedString = "[truncated response] "sv; + ModifiedResponse.Append(TruncatedString); + ModifiedResponse.Append(ResponseText.data(), TextSizeLimit - TruncatedString.size()); + + ResponseText = ModifiedResponse; + } + } + + inline std::string_view GetText() const { return ResponseText; } +}; + +} // namespace zen + template<> struct fmt::formatter<cpr::Response> { @@ -23,15 +68,19 @@ struct fmt::formatter<cpr::Response> { using namespace std::literals; - if (Response.status_code == 200 || Response.status_code == 201) + zen::NiceTimeSpanMs NiceResponseTime(uint64_t(Response.elapsed * 1000)); + + if (zen::IsHttpSuccessCode(Response.status_code)) { return fmt::format_to(Ctx.out(), - "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s", + "Url: {}, Status: {}, Error: '{}' ({}), Bytes: {}/{} (Up/Down), Elapsed: {}", Response.url.str(), Response.status_code, + Response.error.message, + int(Response.error.code), Response.uploaded_bytes, Response.downloaded_bytes, - Response.elapsed); + NiceResponseTime.c_str()); } else { @@ -45,27 +94,35 @@ struct fmt::formatter<cpr::Response> zen::ExtendableStringBuilder<256> Sb; std::string_view Json = Obj.ToJson(Sb).ToView(); - return fmt::format_to(Ctx.out(), - "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Response: '{}', Reason: '{}'", - Response.url.str(), - Response.status_code, - Response.uploaded_bytes, - Response.downloaded_bytes, - Response.elapsed, - Json, - Response.reason); + return fmt::format_to( + Ctx.out(), + "Url: {}, Status: {}, Error: '{}' ({}). Bytes: {}/{} (Up/Down), Elapsed: {}, Response: '{}', Reason: '{}'", + Response.url.str(), + Response.status_code, + Response.error.message, + int(Response.error.code), + Response.uploaded_bytes, + Response.downloaded_bytes, + NiceResponseTime.c_str(), + Json, + Response.reason); } else { - return fmt::format_to(Ctx.out(), - "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Reponse: '{}', Reason: '{}'", - Response.url.str(), - Response.status_code, - Response.uploaded_bytes, - Response.downloaded_bytes, - Response.elapsed, - Response.text, - Response.reason); + zen::BodyLogFormatter Body(Response.text); + + return fmt::format_to( + Ctx.out(), + "Url: {}, Status: {}, Error: '{}' ({}), Bytes: {}/{} (Up/Down), Elapsed: {}, Response: '{}', Reason: '{}'", + Response.url.str(), + Response.status_code, + Response.error.message, + int(Response.error.code), + Response.uploaded_bytes, + Response.downloaded_bytes, + NiceResponseTime.c_str(), + Body.GetText(), + Response.reason); } } } diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index 1cf77d794..c991a71ea 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -34,7 +34,7 @@ struct HttpClientAccessToken using Clock = std::chrono::system_clock; using TimePoint = Clock::time_point; - static constexpr int64_t ExpireMarginInSeconds = 30; + static constexpr int64_t ExpireMarginInSeconds = 60 * 5; std::string Value; TimePoint ExpireTime; @@ -60,9 +60,6 @@ struct HttpClientSettings class HttpClient { public: - struct Settings - { - }; HttpClient(std::string_view BaseUri, const HttpClientSettings& Connectionsettings = {}); ~HttpClient(); @@ -100,7 +97,7 @@ public: HttpResponseCode StatusCode = HttpResponseCode::ImATeapot; IoBuffer ResponsePayload; // Note: this also includes the content type - // Contains the reponse headers + // Contains the response headers KeyValueMap Header; // The number of bytes sent as part of the request @@ -180,6 +177,7 @@ public: LoggerRef Logger() { return m_Log; } std::string_view GetBaseUri() const { return m_BaseUri; } bool Authenticate(); + std::string_view GetSessionId() const { return m_SessionId; } private: const std::optional<HttpClientAccessToken> GetAccessToken(); diff --git a/src/zenhttp/include/zenhttp/httpclientauth.h b/src/zenhttp/include/zenhttp/httpclientauth.h index aa07620ca..5b9b9d305 100644 --- a/src/zenhttp/include/zenhttp/httpclientauth.h +++ b/src/zenhttp/include/zenhttp/httpclientauth.h @@ -3,6 +3,7 @@ #pragma once #include <zenhttp/httpclient.h> +#include <optional> namespace zen { @@ -24,6 +25,9 @@ namespace httpclientauth { std::function<HttpClientAccessToken()> CreateFromOpenIdProvider(AuthMgr& AuthManager, std::string_view OpenIdProvider); std::function<HttpClientAccessToken()> CreateFromDefaultOpenIdProvider(AuthMgr& AuthManager); + + std::optional<std::function<HttpClientAccessToken()>> CreateFromOidcTokenExecutable(const std::filesystem::path& OidcExecutablePath, + std::string_view CloudHost); } // namespace httpclientauth } // namespace zen diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h index 7b87cb84b..03e547bf3 100644 --- a/src/zenhttp/include/zenhttp/httpserver.h +++ b/src/zenhttp/include/zenhttp/httpserver.h @@ -184,12 +184,19 @@ public: virtual void Close() = 0; }; +struct HttpServerPluginConfig +{ + std::string PluginName; + std::vector<std::pair<std::string, std::string>> PluginOptions; +}; + struct HttpServerConfig { - bool IsDedicatedServer = false; // Should be set to true for shared servers - std::string ServerClass; // Choice of HTTP server implementation - bool ForceLoopback = false; - unsigned int ThreadCount = 0; + bool IsDedicatedServer = false; // Should be set to true for shared servers + std::string ServerClass; // Choice of HTTP server implementation + std::vector<HttpServerPluginConfig> PluginConfigs; + bool ForceLoopback = false; + unsigned int ThreadCount = 0; struct { @@ -208,7 +215,7 @@ class HttpRouterRequest public: HttpRouterRequest(HttpServerRequest& Request) : m_HttpRequest(Request) {} - ZENCORE_API std::string GetCapture(uint32_t Index) const; + std::string_view GetCapture(uint32_t Index) const; inline HttpServerRequest& ServerRequest() { return m_HttpRequest; } private: @@ -220,12 +227,14 @@ private: friend class HttpRequestRouter; }; -inline std::string +inline std::string_view HttpRouterRequest::GetCapture(uint32_t Index) const { ZEN_ASSERT(Index < m_Match.size()); - return m_Match[Index]; + const auto& Match = m_Match[Index]; + + return std::string_view(&*Match.first, Match.second - Match.first); } /** HTTP request router helper diff --git a/src/zenhttp/packageformat.cpp b/src/zenhttp/packageformat.cpp index 676fc73fd..9d423ecbc 100644 --- a/src/zenhttp/packageformat.cpp +++ b/src/zenhttp/packageformat.cpp @@ -19,6 +19,8 @@ #include <span> #include <vector> +#include <EASTL/fixed_vector.h> + #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> #endif @@ -31,6 +33,10 @@ namespace zen { const std::string_view HandlePrefix(":?#:"); +typedef eastl::fixed_vector<IoBuffer, 16> IoBufferVec_t; + +IoBufferVec_t FormatPackageMessageInternal(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle); + std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, void* TargetProcessHandle) { @@ -42,10 +48,18 @@ FormatPackageMessageBuffer(const CbPackage& Data, void* TargetProcessHandle) return FormatPackageMessageBuffer(Data, FormatFlags::kDefault, TargetProcessHandle); } +std::vector<IoBuffer> +FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle) +{ + auto Vec = FormatPackageMessageInternal(Data, Flags, TargetProcessHandle); + return std::vector<IoBuffer>(begin(Vec), end(Vec)); +} + CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle) { - return CompositeBuffer(FormatPackageMessage(Data, Flags, TargetProcessHandle)); + auto Vec = FormatPackageMessageInternal(Data, Flags, TargetProcessHandle); + return CompositeBuffer(std::span{begin(Vec), end(Vec)}); } static void @@ -54,7 +68,7 @@ MarshalLocal(CbAttachmentEntry*& AttachmentInfo, CbAttachmentReferenceHeader& LocalRef, const IoHash& AttachmentHash, bool IsCompressed, - std::vector<IoBuffer>& ResponseBuffers) + IoBufferVec_t& ResponseBuffers) { IoBuffer RefBuffer(sizeof(CbAttachmentReferenceHeader) + Path8.size()); @@ -146,8 +160,8 @@ IsLocalRef(tsl::robin_map<void*, std::string>& FileNameMap, return true; }; -std::vector<IoBuffer> -FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle) +IoBufferVec_t +FormatPackageMessageInternal(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle) { ZEN_TRACE_CPU("FormatPackageMessage"); @@ -177,7 +191,7 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProce #endif // ZEN_PLATFORM_WINDOWS const std::span<const CbAttachment>& Attachments = Data.GetAttachments(); - std::vector<IoBuffer> ResponseBuffers; + IoBufferVec_t ResponseBuffers; ResponseBuffers.reserve(2 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each // attachment is likely to consist of several buffers @@ -265,11 +279,10 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProce { IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer(); ZEN_ASSERT(ObjIoBuffer.GetSize() > 0); - ResponseBuffers.emplace_back(std::move(ObjIoBuffer)); - *AttachmentInfo++ = {.PayloadSize = ObjIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Attachment.GetHash()}; + ResponseBuffers.emplace_back(std::move(ObjIoBuffer)); } else if (const CompositeBuffer& AttachmentBinary = Attachment.AsCompositeBinary()) { @@ -486,30 +499,25 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint { if (Entry.Flags & CbAttachmentEntry::kIsObject) { + CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer))); + if (!CompBuf) + { + // First payload is always a compact binary object + MalformedAttachments.push_back( + std::make_pair(i, + fmt::format("Invalid format, expected compressed buffer for CbObject (size {}) for {}", + AttachmentBuffer.GetSize(), + Entry.AttachmentHash))); + } + CbObject AttachmentObject = LoadCompactBinaryObject(std::move(CompBuf)); if (i == 0) { - CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer))); - if (CompBuf) - { - Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); - } - else - { - // First payload is always a compact binary object - MalformedAttachments.push_back( - std::make_pair(i, - fmt::format("Invalid format, expected compressed buffer for CbObject (size {}) for {}", - AttachmentBuffer.GetSize(), - Entry.AttachmentHash))); - } + // First payload is always a compact binary object + Package.SetObject(AttachmentObject); } else { - MalformedAttachments.push_back(std::make_pair( - i, - fmt::format("Invalid format, compressed object attachments are not currently supported (size {}) for {}", - AttachmentBuffer.GetSize(), - Entry.AttachmentHash))); + Attachments.emplace_back(CbAttachment(AttachmentObject, Entry.AttachmentHash)); } } else @@ -533,17 +541,14 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint { if (Entry.Flags & CbAttachmentEntry::kIsObject) { + CbObject AttachmentObject = LoadCompactBinaryObject(AttachmentBuffer); if (i == 0) { - Package.SetObject(LoadCompactBinaryObject(AttachmentBuffer)); + Package.SetObject(AttachmentObject); } else { - MalformedAttachments.push_back( - std::make_pair(i, - fmt::format("Invalid format, object attachments are not currently supported (size {}) for {}", - AttachmentBuffer.GetSize(), - Entry.AttachmentHash))); + Attachments.emplace_back(CbAttachment(AttachmentObject, Entry.AttachmentHash)); } } else if (AttachmentSize > 0) diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index fe59e3a6f..c1b7294c9 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -678,7 +678,7 @@ struct HttpAcceptor if (BindErrorCode == asio::error::address_in_use) { // Do a retry after a short sleep on same port just to be sure - ZEN_INFO("Desired port %d is in use, retrying", BasePort); + ZEN_INFO("Desired port {} is in use, retrying", BasePort); Sleep(100); m_Acceptor.bind(asio::ip::tcp::endpoint(BindAddress, EffectivePort), BindErrorCode); } @@ -697,13 +697,20 @@ struct HttpAcceptor { ZEN_ERROR("Unable open asio service, error '{}'", BindErrorCode.message()); } - else if (BindAddress.is_loopback()) + else { - m_AlternateProtocolAcceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), EffectivePort), BindErrorCode); - m_UseAlternateProtocolAcceptor = true; - ZEN_INFO("Registered local-only handler 'http://{}:{}/' - this is not accessible from remote hosts", - "localhost", - EffectivePort); + if (EffectivePort != BasePort) + { + ZEN_WARN("Desired port {} is in use, remapped to port {}", BasePort, EffectivePort); + } + if (BindAddress.is_loopback()) + { + m_AlternateProtocolAcceptor.bind(asio::ip::tcp::endpoint(asio::ip::address_v4::loopback(), EffectivePort), BindErrorCode); + m_UseAlternateProtocolAcceptor = true; + ZEN_INFO("Registered local-only handler 'http://{}:{}/' - this is not accessible from remote hosts", + "localhost", + EffectivePort); + } } #if ZEN_PLATFORM_WINDOWS diff --git a/src/zenhttp/servers/httpmulti.cpp b/src/zenhttp/servers/httpmulti.cpp index 2a6a90d2e..f4dc1e15b 100644 --- a/src/zenhttp/servers/httpmulti.cpp +++ b/src/zenhttp/servers/httpmulti.cpp @@ -103,6 +103,10 @@ HttpMultiServer::RequestExit() void HttpMultiServer::Close() { + for (auto& Server : m_Servers) + { + Server->Close(); + } } void diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index 87128c0c9..62dab02c4 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -16,6 +16,8 @@ #include <zencore/trace.h> #include <zenhttp/packageformat.h> +#include <EASTL/fixed_vector.h> + #if ZEN_WITH_HTTPSYS # define _WINSOCKAPI_ # include <zencore/windows.h> @@ -381,14 +383,14 @@ public: void SuppressResponseBody(); // typically used for HEAD requests private: - std::vector<HTTP_DATA_CHUNK> m_HttpDataChunks; - uint64_t m_TotalDataSize = 0; // Sum of all chunk sizes - uint16_t m_ResponseCode = 0; - uint32_t m_NextDataChunkOffset = 0; // Cursor used for very large chunk lists - uint32_t m_RemainingChunkCount = 0; // Backlog for multi-call sends - bool m_IsInitialResponse = true; - HttpContentType m_ContentType = HttpContentType::kBinary; - std::vector<IoBuffer> m_DataBuffers; + eastl::fixed_vector<HTTP_DATA_CHUNK, 16> m_HttpDataChunks; + uint64_t m_TotalDataSize = 0; // Sum of all chunk sizes + uint16_t m_ResponseCode = 0; + uint32_t m_NextDataChunkOffset = 0; // Cursor used for very large chunk lists + uint32_t m_RemainingChunkCount = 0; // Backlog for multi-call sends + bool m_IsInitialResponse = true; + HttpContentType m_ContentType = HttpContentType::kBinary; + eastl::fixed_vector<IoBuffer, 16> m_DataBuffers; void InitializeForPayload(uint16_t ResponseCode, std::span<IoBuffer> Blobs); }; @@ -533,7 +535,14 @@ HttpMessageResponseRequest::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfB if (IoResult != NO_ERROR) { - ZEN_WARN("response aborted due to error: {}", GetSystemErrorAsString(IoResult)); + ZEN_WARN("response '{}' ({}) aborted after transfering '{}', {} out of {} bytes, reason: {} ({})", + ReasonStringForHttpResultCode(m_ResponseCode), + m_ResponseCode, + ToString(m_ContentType), + NumberOfBytesTransferred, + m_TotalDataSize, + GetSystemErrorAsString(IoResult), + IoResult); // if one transmit failed there's really no need to go on return nullptr; @@ -682,7 +691,7 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) ); } - auto EmitReponseDetails = [&](StringBuilderBase& ResponseDetails) -> void { + auto EmitResponseDetails = [&](StringBuilderBase& ResponseDetails) -> void { for (int i = 0; i < ThisRequestChunkCount; ++i) { const HTTP_DATA_CHUNK Chunk = m_HttpDataChunks[ThisRequestChunkOffset + i]; @@ -765,7 +774,7 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) // Emit diagnostics ExtendableStringBuilder<256> ResponseDetails; - EmitReponseDetails(ResponseDetails); + EmitResponseDetails(ResponseDetails); ZEN_WARN("failed to send HTTP response (error {}: '{}'), request URL: '{}', ({}.{}) response: {}", SendResult, diff --git a/src/zenhttp/transports/dlltransport.cpp b/src/zenhttp/transports/dlltransport.cpp index e09e62ec5..fb3dd23b5 100644 --- a/src/zenhttp/transports/dlltransport.cpp +++ b/src/zenhttp/transports/dlltransport.cpp @@ -21,18 +21,31 @@ namespace zen { ////////////////////////////////////////////////////////////////////////// +class DllTransportLogger : public TransportLogger, public RefCounted +{ +public: + DllTransportLogger(std::string_view PluginName); + virtual ~DllTransportLogger() = default; + + void LogMessage(LogLevel Level, const char* Message) override; + +private: + std::string m_PluginName; +}; + struct LoadedDll { std::string Name; std::filesystem::path LoadedFromPath; + DllTransportLogger* Logger = nullptr; Ref<TransportPlugin> Plugin; }; class DllTransportPluginImpl : public DllTransportPlugin, RefCounted { public: - DllTransportPluginImpl(); - ~DllTransportPluginImpl(); + DllTransportPluginImpl() = default; + ~DllTransportPluginImpl() = default; virtual uint32_t AddRef() const override; virtual uint32_t Release() const override; @@ -42,7 +55,7 @@ public: virtual const char* GetDebugName() override; virtual bool IsAvailable() override; - virtual void LoadDll(std::string_view Name) override; + virtual bool LoadDll(std::string_view Name) override; virtual void ConfigureDll(std::string_view Name, const char* OptionTag, const char* OptionValue) override; private: @@ -51,12 +64,27 @@ private: std::vector<LoadedDll> m_Transports; }; -DllTransportPluginImpl::DllTransportPluginImpl() +DllTransportLogger::DllTransportLogger(std::string_view PluginName) : m_PluginName(PluginName) { } -DllTransportPluginImpl::~DllTransportPluginImpl() +void +DllTransportLogger::LogMessage(LogLevel PluginLogLevel, const char* Message) { + logging::level::LogLevel Level; + // clang-format off + switch (PluginLogLevel) + { + case LogLevel::Trace: Level = logging::level::Trace; break; + case LogLevel::Debug: Level = logging::level::Debug; break; + case LogLevel::Info: Level = logging::level::Info; break; + case LogLevel::Warn: Level = logging::level::Warn; break; + case LogLevel::Err: Level = logging::level::Err; break; + case LogLevel::Critical: Level = logging::level::Critical; break; + default: Level = logging::level::Off; break; + } + // clang-format on + ZEN_LOG(Log(), Level, "[{}] {}", m_PluginName, Message) } uint32_t @@ -109,6 +137,7 @@ DllTransportPluginImpl::Shutdown() try { Transport.Plugin->Shutdown(); + Transport.Logger->Release(); } catch (const std::exception&) { @@ -143,42 +172,73 @@ DllTransportPluginImpl::ConfigureDll(std::string_view Name, const char* OptionTa } } -void +bool DllTransportPluginImpl::LoadDll(std::string_view Name) { RwLock::ExclusiveLockScope _(m_Lock); - ExtendableStringBuilder<128> DllPath; - DllPath << Name << ".dll"; + ExtendableStringBuilder<1024> DllPath; + DllPath << Name; + if (!Name.ends_with(".dll")) + { + DllPath << ".dll"; + } + + std::string FileName = std::filesystem::path(DllPath.c_str()).filename().replace_extension().string(); + HMODULE DllHandle = LoadLibraryA(DllPath.c_str()); if (!DllHandle) { - std::error_code Ec = MakeErrorCodeFromLastError(); - - throw std::system_error(Ec, fmt::format("failed to load transport DLL from '{}'", DllPath)); + ZEN_WARN("Failed to load transport DLL from '{}' due to '{}'", DllPath, GetLastErrorAsString()) + return false; } - TransportPlugin* CreateTransportPlugin(); + PfnGetTransportPluginVersion GetVersion = (PfnGetTransportPluginVersion)GetProcAddress(DllHandle, "GetTransportPluginVersion"); + PfnCreateTransportPlugin CreatePlugin = (PfnCreateTransportPlugin)GetProcAddress(DllHandle, "CreateTransportPlugin"); + + uint32_t APIVersion = 0; + uint32_t PluginVersion = 0; + + if (GetVersion) + { + GetVersion(&APIVersion, &PluginVersion); + } - PfnCreateTransportPlugin CreatePlugin = (PfnCreateTransportPlugin)GetProcAddress(DllHandle, "CreateTransportPlugin"); + const bool bValidApiVersion = APIVersion == kTransportApiVersion; - if (!CreatePlugin) + if (!GetVersion || !CreatePlugin || !bValidApiVersion) { std::error_code Ec = MakeErrorCodeFromLastError(); FreeLibrary(DllHandle); - throw std::system_error(Ec, fmt::format("API mismatch detected in transport DLL loaded from '{}'", DllPath)); + if (GetVersion && !bValidApiVersion) + { + ZEN_WARN("Failed to load transport DLL from '{}' due to invalid API version {}, supported API version is {}", + DllPath, + APIVersion, + kTransportApiVersion) + } + else + { + ZEN_WARN("Failed to load transport DLL from '{}' due to not finding GetTransportPluginVersion or CreateTransportPlugin", + DllPath) + } + + return false; } LoadedDll NewDll; NewDll.Name = Name; NewDll.LoadedFromPath = DllPath.c_str(); - NewDll.Plugin = CreatePlugin(); + NewDll.Logger = new DllTransportLogger(FileName); + NewDll.Logger->AddRef(); + NewDll.Plugin = CreatePlugin(NewDll.Logger); m_Transports.emplace_back(std::move(NewDll)); + return true; } DllTransportPlugin* diff --git a/src/zenhttp/transports/dlltransport.h b/src/zenhttp/transports/dlltransport.h index 9346a10ce..c49f888da 100644 --- a/src/zenhttp/transports/dlltransport.h +++ b/src/zenhttp/transports/dlltransport.h @@ -15,7 +15,7 @@ namespace zen { class DllTransportPlugin : public TransportPlugin { public: - virtual void LoadDll(std::string_view Name) = 0; + virtual bool LoadDll(std::string_view Name) = 0; virtual void ConfigureDll(std::string_view Name, const char* OptionTag, const char* OptionValue) = 0; }; |