diff options
| author | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
| commit | bb298631ba35a323827dda0b8cd6158e276b5f61 (patch) | |
| tree | 7ba8db91c44ce83f2c518f80f80ab14910eefa6f /src/zenhttp/httpclient.cpp | |
| parent | Change to PutResult structure (diff) | |
| parent | 5.6.14 (diff) | |
| download | zen-bb298631ba35a323827dda0b8cd6158e276b5f61.tar.xz zen-bb298631ba35a323827dda0b8cd6158e276b5f61.zip | |
Merge branch 'main' into zs/put-overwrite-policy
Diffstat (limited to 'src/zenhttp/httpclient.cpp')
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 836 |
1 files changed, 628 insertions, 208 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 8052a8fd5..a2d323b5e 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -12,13 +12,19 @@ #include <zencore/filesystem.h> #include <zencore/iobuffer.h> #include <zencore/logging.h> +#include <zencore/memory/memory.h> #include <zencore/session.h> #include <zencore/sharedbuffer.h> #include <zencore/stream.h> #include <zencore/string.h> -#include <zencore/testing.h> #include <zencore/trace.h> +#if ZEN_WITH_TESTS +# include <zencore/basicfile.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif // ZEN_WITH_TESTS + ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -42,9 +48,13 @@ namespace detail { class TempPayloadFile { public: + TempPayloadFile(const TempPayloadFile&) = delete; + TempPayloadFile& operator=(const TempPayloadFile&) = delete; + TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {} ~TempPayloadFile() { + ZEN_TRACE_CPU("TempPayloadFile::Close"); try { if (m_FileHandle) @@ -85,8 +95,9 @@ namespace detail { } } - std::error_code Open(const std::filesystem::path& TempFolderPath) + std::error_code Open(const std::filesystem::path& TempFolderPath, uint64_t FinalSize) { + ZEN_TRACE_CPU("TempPayloadFile::Open"); ZEN_ASSERT(m_FileHandle == nullptr); std::uint64_t TmpIndex = ((std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) & 0xffffffffu) << 32) | @@ -126,11 +137,14 @@ namespace detail { #endif // ZEN_PLATFORM_WINDOWS m_FileHandle = FileHandle; + PrepareFileForScatteredWrite(m_FileHandle, FinalSize); + return {}; } std::error_code Write(std::string_view DataString) { + ZEN_TRACE_CPU("TempPayloadFile::Write"); const uint8_t* DataPtr = (const uint8_t*)DataString.data(); size_t DataSize = DataString.size(); if (DataSize >= CacheBufferSize) @@ -165,6 +179,7 @@ namespace detail { IoBuffer DetachToIoBuffer() { + ZEN_TRACE_CPU("TempPayloadFile::DetachToIoBuffer"); if (std::error_code Ec = Flush(); Ec) { ThrowSystemError(Ec.value(), Ec.message()); @@ -180,6 +195,7 @@ namespace detail { IoBuffer BorrowIoBuffer() { + ZEN_TRACE_CPU("TempPayloadFile::BorrowIoBuffer"); if (std::error_code Ec = Flush(); Ec) { ThrowSystemError(Ec.value(), Ec.message()); @@ -193,6 +209,7 @@ namespace detail { uint64_t GetSize() const { return m_WriteOffset; } void ResetWritePos(uint64_t WriteOffset) { + ZEN_TRACE_CPU("TempPayloadFile::ResetWritePos"); Flush(); m_WriteOffset = WriteOffset; } @@ -200,6 +217,7 @@ namespace detail { private: std::error_code Flush() { + ZEN_TRACE_CPU("TempPayloadFile::Flush"); if (m_CacheBufferOffset == 0) { return {}; @@ -211,6 +229,7 @@ namespace detail { std::error_code AppendData(const void* Data, uint64_t Size) { + ZEN_TRACE_CPU("TempPayloadFile::AppendData"); ZEN_ASSERT(m_FileHandle != nullptr); const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; @@ -261,6 +280,167 @@ namespace detail { std::uint64_t m_CacheBufferOffset = 0; }; + class BufferedReadFileStream + { + public: + BufferedReadFileStream(const BufferedReadFileStream&) = delete; + BufferedReadFileStream& operator=(const BufferedReadFileStream&) = delete; + + BufferedReadFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize, uint64_t BufferSize) + : m_FileHandle(FileHandle) + , m_FileSize(FileSize) + , m_FileEnd(FileOffset + FileSize) + , m_BufferSize(Min(BufferSize, FileSize)) + , m_FileOffset(FileOffset) + { + } + + ~BufferedReadFileStream() { Memory::Free(m_Buffer); } + void Read(void* Data, uint64_t Size) + { + ZEN_ASSERT(Data != nullptr); + if (Size > m_BufferSize) + { + Read(Data, Size, m_FileOffset); + m_FileOffset += Size; + return; + } + uint8_t* WritePtr = ((uint8_t*)Data); + uint64_t Begin = m_FileOffset; + uint64_t End = m_FileOffset + Size; + ZEN_ASSERT(m_FileOffset >= m_BufferStart); + if (m_FileOffset < m_BufferEnd) + { + ZEN_ASSERT(m_Buffer != nullptr); + uint64_t Count = Min(m_BufferEnd, End) - m_FileOffset; + memcpy(WritePtr + Begin - m_FileOffset, m_Buffer + Begin - m_BufferStart, Count); + Begin += Count; + if (Begin == End) + { + m_FileOffset = End; + return; + } + } + if (End == m_FileEnd) + { + Read(WritePtr + Begin - m_FileOffset, End - Begin, Begin); + } + else + { + if (!m_Buffer) + { + m_BufferSize = Min(m_FileEnd - m_FileOffset, m_BufferSize); + m_Buffer = (uint8_t*)Memory::Alloc(gsl::narrow<size_t>(m_BufferSize)); + } + m_BufferStart = Begin; + m_BufferEnd = Min(Begin + m_BufferSize, m_FileEnd); + Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart); + uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart; + memcpy(WritePtr + Begin - m_FileOffset, m_Buffer, Count); + ZEN_ASSERT(Begin + Count == End); + } + m_FileOffset = End; + } + + private: + void Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset) + { + const uint64_t MaxChunkSize = 1u * 1024 * 1024; + std::error_code Ec; + ReadFile(m_FileHandle, Data, BytesToRead, FileOffset, MaxChunkSize, Ec); + + if (Ec) + { + std::error_code DummyEc; + throw std::system_error( + Ec, + fmt::format( + "HttpClient::BufferedReadFileStream ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})", + FileOffset, + BytesToRead, + PathFromHandle(m_FileHandle, DummyEc).generic_string(), + m_FileSize)); + } + } + + void* m_FileHandle = nullptr; + const uint64_t m_FileSize = 0; + const uint64_t m_FileEnd = 0; + uint64_t m_BufferSize = 0; + uint8_t* m_Buffer = nullptr; + uint64_t m_BufferStart = 0; + uint64_t m_BufferEnd = 0; + uint64_t m_FileOffset = 0; + }; + + class CompositeBufferReadStream + { + public: + CompositeBufferReadStream(const CompositeBuffer& Data, uint64_t BufferSize) + : m_Data(Data) + , m_BufferSize(BufferSize) + , m_SegmentIndex(0) + , m_BytesLeftInSegment(0) + { + } + uint64_t Read(void* Data, uint64_t Size) + { + uint64_t Result = 0; + uint8_t* WritePtr = (uint8_t*)Data; + while ((Size > 0) && (m_SegmentIndex < m_Data.GetSegments().size())) + { + if (m_BytesLeftInSegment == 0) + { + const SharedBuffer& Segment = m_Data.GetSegments()[m_SegmentIndex]; + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if (Segment.AsIoBuffer().GetFileReference(FileRef)) + { + m_SegmentDiskBuffer = std::make_unique<BufferedReadFileStream>(FileRef.FileHandle, + FileRef.FileChunkOffset, + FileRef.FileChunkSize, + m_BufferSize); + } + else + { + m_SegmentMemoryBuffer = Segment.GetView(); + } + m_BytesLeftInSegment = Segment.GetSize(); + } + uint64_t BytesToRead = Min(m_BytesLeftInSegment, Size); + if (m_SegmentDiskBuffer) + { + m_SegmentDiskBuffer->Read(WritePtr, BytesToRead); + } + else + { + ZEN_ASSERT_SLOW(m_SegmentMemoryBuffer.GetSize() >= BytesToRead); + memcpy(WritePtr, m_SegmentMemoryBuffer.GetData(), BytesToRead); + m_SegmentMemoryBuffer.MidInline(BytesToRead); + } + WritePtr += BytesToRead; + Size -= BytesToRead; + Result += BytesToRead; + + m_BytesLeftInSegment -= BytesToRead; + if (m_BytesLeftInSegment == 0) + { + m_SegmentDiskBuffer.reset(); + m_SegmentMemoryBuffer.Reset(); + m_SegmentIndex++; + } + } + return Result; + } + + private: + const CompositeBuffer& m_Data; + const uint64_t m_BufferSize; + size_t m_SegmentIndex; + std::unique_ptr<BufferedReadFileStream> m_SegmentDiskBuffer; + MemoryView m_SegmentMemoryBuffer; + uint64_t m_BytesLeftInSegment; + }; + } // namespace detail ////////////////////////////////////////////////////////////////////////// @@ -282,7 +462,7 @@ AsCprBody(const IoBuffer& Obj) ////////////////////////////////////////////////////////////////////////// static HttpClient::Response -ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) +ResponseWithPayload(std::string_view SessionId, cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) { // This ends up doing a memcpy, would be good to get rid of it by streaming results // into buffer directly @@ -297,7 +477,7 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp if (!IsHttpSuccessCode(WorkResponseCode) && WorkResponseCode != HttpResponseCode::NotFound) { - ZEN_WARN("HttpClient request failed: {}", HttpResponse); + ZEN_WARN("HttpClient request failed (session: {}): {}", SessionId, HttpResponse); } return HttpClient::Response{.StatusCode = WorkResponseCode, @@ -309,12 +489,16 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp } static HttpClient::Response -CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) +CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) { const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code); if (HttpResponse.error) { - ZEN_WARN("HttpClient client error: {}", HttpResponse); + if (HttpResponse.error.code != cpr::ErrorCode::OPERATION_TIMEDOUT && + HttpResponse.error.code != cpr::ErrorCode::CONNECTION_FAILURE && HttpResponse.error.code != cpr::ErrorCode::REQUEST_CANCELLED) + { + ZEN_WARN("HttpClient client failure (session: {}): {}", SessionId, HttpResponse); + } // Client side failure code return HttpClient::Response{ @@ -339,6 +523,7 @@ CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) else { return ResponseWithPayload( + SessionId, HttpResponse, WorkResponseCode, Payload ? std::move(Payload) : IoBufferBuilder::MakeCloneFromMemory(HttpResponse.text.data(), HttpResponse.text.size())); @@ -352,9 +537,10 @@ ShouldRetry(const cpr::Response& Response) { case cpr::ErrorCode::OK: break; - case cpr::ErrorCode::OPERATION_TIMEDOUT: + case cpr::ErrorCode::INTERNAL_ERROR: case cpr::ErrorCode::NETWORK_RECEIVE_ERROR: case cpr::ErrorCode::NETWORK_SEND_FAILURE: + case cpr::ErrorCode::OPERATION_TIMEDOUT: return true; default: return false; @@ -364,6 +550,7 @@ ShouldRetry(const cpr::Response& Response) case HttpResponseCode::RequestTimeout: case HttpResponseCode::TooManyRequests: case HttpResponseCode::InternalServerError: + case HttpResponseCode::BadGateway: case HttpResponseCode::ServiceUnavailable: case HttpResponseCode::GatewayTimeout: return true; @@ -375,6 +562,7 @@ ShouldRetry(const cpr::Response& Response) static bool ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile>& PayloadFile) { + ZEN_TRACE_CPU("ValidatePayload"); IoBuffer ResponseBuffer = (Response.text.empty() && PayloadFile) ? PayloadFile->BorrowIoBuffer() : IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()); @@ -396,6 +584,11 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile } } + if (Response.status_code == (long)HttpResponseCode::PartialContent) + { + return true; + } + if (auto JupiterHash = Response.header.find("X-Jupiter-IoHash"); JupiterHash != Response.header.end()) { IoHash ExpectedPayloadHash; @@ -448,22 +641,40 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile } static cpr::Response -DoWithRetry(std::function<cpr::Response()>&& Func, uint8_t RetryCount) +DoWithRetry( + std::string_view SessionId, + std::function<cpr::Response()>&& Func, + uint8_t RetryCount, + std::function<bool(cpr::Response& Result)>&& Validate = [](cpr::Response&) { return true; }) { uint8_t Attempt = 0; cpr::Response Result = Func(); - while (Attempt < RetryCount && ShouldRetry(Result)) + while (Attempt < RetryCount) { + if (!ShouldRetry(Result)) + { + if (Result.error || !IsHttpSuccessCode(Result.status_code)) + { + break; + } + if (Validate(Result)) + { + break; + } + } Sleep(100 * (Attempt + 1)); Attempt++; - ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); + ZEN_INFO("{} Attempt {}/{}", CommonResponse(SessionId, std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); Result = Func(); } return Result; } static cpr::Response -DoWithRetry(std::function<cpr::Response()>&& Func, std::unique_ptr<detail::TempPayloadFile>& PayloadFile, uint8_t RetryCount) +DoWithRetry(std::string_view SessionId, + std::function<cpr::Response()>&& Func, + std::unique_ptr<detail::TempPayloadFile>& PayloadFile, + uint8_t RetryCount) { uint8_t Attempt = 0; cpr::Response Result = Func(); @@ -482,7 +693,7 @@ DoWithRetry(std::function<cpr::Response()>&& Func, std::unique_ptr<detail::TempP } Sleep(100 * (Attempt + 1)); Attempt++; - ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); + ZEN_INFO("{} Attempt {}/{}", CommonResponse(SessionId, std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); Result = Func(); } return Result; @@ -511,12 +722,14 @@ struct HttpClient::Impl : public RefCounted inline cpr::Session* operator->() const { return CprSession; } inline cpr::Response Get() { + ZEN_TRACE_CPU("HttpClient::Impl::Get"); cpr::Response Result = CprSession->Get(); ZEN_TRACE("GET {}", Result); return Result; } inline cpr::Response Download(cpr::WriteCallback&& Write, std::optional<cpr::HeaderCallback>&& Header = {}) { + ZEN_TRACE_CPU("HttpClient::Impl::Download"); if (Header) { CprSession->SetHeaderCallback(std::move(Header.value())); @@ -529,12 +742,14 @@ struct HttpClient::Impl : public RefCounted } inline cpr::Response Head() { + ZEN_TRACE_CPU("HttpClient::Impl::Head"); cpr::Response Result = CprSession->Head(); ZEN_TRACE("HEAD {}", Result); return Result; } inline cpr::Response Put(std::optional<cpr::ReadCallback>&& Read = {}) { + ZEN_TRACE_CPU("HttpClient::Impl::Put"); if (Read) { CprSession->SetReadCallback(std::move(Read.value())); @@ -546,6 +761,7 @@ struct HttpClient::Impl : public RefCounted } inline cpr::Response Post(std::optional<cpr::ReadCallback>&& Read = {}) { + ZEN_TRACE_CPU("HttpClient::Impl::Post"); if (Read) { CprSession->SetReadCallback(std::move(Read.value())); @@ -557,6 +773,7 @@ struct HttpClient::Impl : public RefCounted } inline cpr::Response Delete() { + ZEN_TRACE_CPU("HttpClient::Impl::Delete"); cpr::Response Result = CprSession->Delete(); ZEN_TRACE("DELETE {}", Result); return Result; @@ -596,6 +813,7 @@ HttpClient::Impl::Impl(LoggerRef Log) : m_Log(Log) HttpClient::Impl::~Impl() { + ZEN_TRACE_CPU("HttpClient::Impl::~Impl"); m_SessionLock.WithExclusiveLock([&] { for (auto CprSession : m_Sessions) { @@ -614,6 +832,7 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl, const std::string_view SessionId, std::optional<HttpClientAccessToken> AccessToken) { + ZEN_TRACE_CPU("HttpClient::Impl::AllocSession"); cpr::Session* CprSession = nullptr; m_SessionLock.WithExclusiveLock([&] { if (!m_Sessions.empty()) @@ -670,6 +889,7 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl, void HttpClient::Impl::ReleaseSession(cpr::Session* CprSession) { + ZEN_TRACE_CPU("HttpClient::Impl::ReleaseSession"); CprSession->SetUrl({}); CprSession->SetHeader({}); CprSession->SetBody({}); @@ -694,6 +914,7 @@ HttpClient::~HttpClient() bool HttpClient::Authenticate() { + ZEN_TRACE_CPU("HttpClient::Authenticate"); std::optional<HttpClientAccessToken> Token = GetAccessToken(); if (!Token) { @@ -705,6 +926,7 @@ HttpClient::Authenticate() const std::optional<HttpClientAccessToken> HttpClient::GetAccessToken() { + ZEN_TRACE_CPU("HttpClient::GetAccessToken"); if (!m_ConnectionSettings.AccessTokenProvider.has_value()) { return {}; @@ -829,15 +1051,18 @@ HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap { ZEN_TRACE_CPU("HttpClient::Put"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); - return Sess.Put(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -845,31 +1070,40 @@ HttpClient::Put(std::string_view Url, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Put"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, - Url, - m_ConnectionSettings, - {{"Content-Length", "0"}}, - Parameters, - m_SessionId, - GetAccessToken()); - return Sess.Put(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse(m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, + Url, + m_ConnectionSettings, + {{"Content-Length", "0"}}, + Parameters, + m_SessionId, + GetAccessToken()); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Get"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); - return Sess.Get(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Get(); + }, + m_ConnectionSettings.RetryCount, + [](cpr::Response& Result) { + std::unique_ptr<detail::TempPayloadFile> NoTempFile; + return ValidatePayload(Result, NoTempFile); + })); } HttpClient::Response @@ -877,13 +1111,16 @@ HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Head"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - return Sess.Head(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Head(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -891,13 +1128,16 @@ HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Delete"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - return Sess.Delete(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Delete(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -905,13 +1145,16 @@ HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, cons { ZEN_TRACE_CPU("HttpClient::PostNoPayload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); - return Sess.Post(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -925,16 +1168,32 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType C { ZEN_TRACE_CPU("HttpClient::PostWithPayload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader({HeaderContentType(ContentType)}); - return Sess.Post(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if (Payload.GetFileReference(FileRef)) + { + uint64_t Offset = 0; + detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u); + auto ReadCallback = [&Payload, &Offset, &Buffer](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + Buffer.Read(buffer, size); + Offset += size; + return true; + }; + return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + } + Sess->SetBody(AsCprBody(Payload)); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -942,16 +1201,19 @@ HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& Addi { ZEN_TRACE_CPU("HttpClient::PostObjectPayload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)}); - return Sess.Post(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -965,24 +1227,23 @@ HttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenConten { ZEN_TRACE_CPU("HttpClient::Post"); - return CommonResponse(DoWithRetry( - [&]() { - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->UpdateHeader({HeaderContentType(ContentType)}); - - return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u); + auto ReadCallback = [&Reader](char* buffer, size_t& size, intptr_t) { + size = Reader.Read(buffer, size); + return true; + }; + return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -990,29 +1251,32 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue { ZEN_TRACE_CPU("HttpClient::Upload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); - - uint64_t Offset = 0; - if (Payload.IsWholeFile()) - { - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - } - Sess->SetBody(AsCprBody(Payload)); - return Sess.Put(); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if (Payload.GetFileReference(FileRef)) + { + uint64_t Offset = 0; + detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u); + auto ReadCallback = [&Payload, &Offset, &Buffer](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + Buffer.Read(buffer, size); + Offset += size; + return true; + }; + return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + } + Sess->SetBody(AsCprBody(Payload)); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -1020,24 +1284,23 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont { ZEN_TRACE_CPU("HttpClient::Upload"); - return CommonResponse(DoWithRetry( - [&]() { - Impl::Session Sess = - m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->UpdateHeader({HeaderContentType(ContentType)}); - - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - }, - m_ConnectionSettings.RetryCount)); + return CommonResponse( + m_SessionId, + DoWithRetry( + m_SessionId, + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u); + auto ReadCallback = [&Reader](char* buffer, size_t& size, intptr_t) { + size = Reader.Read(buffer, size); + return true; + }; + return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -1048,6 +1311,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold std::string PayloadString; std::unique_ptr<detail::TempPayloadFile> PayloadFile; cpr::Response Response = DoWithRetry( + m_SessionId, [&]() { auto GetHeader = [&](std::string header) -> std::pair<std::string, std::string> { size_t DelimiterPos = header.find(':'); @@ -1087,6 +1351,30 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold return true; }; + uint64_t RequestedContentLength = (uint64_t)-1; + if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end()) + { + if (RangeIt->second.starts_with("bytes")) + { + size_t RangeStartPos = RangeIt->second.find('=', 5); + if (RangeStartPos != std::string::npos) + { + RangeStartPos++; + size_t RangeSplitPos = RangeIt->second.find('-', RangeStartPos); + if (RangeSplitPos != std::string::npos) + { + std::optional<size_t> RequestedRangeStart = + ParseInt<size_t>(RangeIt->second.substr(RangeStartPos, RangeSplitPos - RangeStartPos)); + std::optional<size_t> RequestedRangeEnd = ParseInt<size_t>(RangeIt->second.substr(RangeStartPos + 1)); + if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value()) + { + RequestedContentLength = RequestedRangeEnd.value() - 1; + } + } + } + } + } + cpr::Response Response; { std::vector<std::pair<std::string, std::string>> ReceivedHeaders; @@ -1094,13 +1382,13 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold std::pair<std::string, std::string> Header = GetHeader(header); if (Header.first == "Content-Length"sv) { - std::optional<size_t> ContentSize = ParseInt<size_t>(Header.second); - if (ContentSize.has_value()) + std::optional<size_t> ContentLength = ParseInt<size_t>(Header.second); + if (ContentLength.has_value()) { - if (ContentSize.value() > 1024 * 1024) + if (ContentLength.value() > 1024 * 1024) { PayloadFile = std::make_unique<detail::TempPayloadFile>(); - std::error_code Ec = PayloadFile->Open(TempFolderPath); + std::error_code Ec = PayloadFile->Open(TempFolderPath, ContentLength.value()); if (Ec) { ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", @@ -1111,7 +1399,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold } else { - PayloadString.reserve(ContentSize.value()); + PayloadString.reserve(ContentLength.value()); } } } @@ -1157,85 +1445,90 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold auto It = Response.header.find("Content-Length"); if (It != Response.header.end()) { - std::optional<int64_t> ContentLength = ParseInt<int64_t>(It->second); - if (ContentLength) - { - std::vector<std::pair<std::string, std::string>> ReceivedHeaders; + std::vector<std::pair<std::string, std::string>> ReceivedHeaders; - auto HeaderCallback = [&](std::string header, intptr_t) { - std::pair<std::string, std::string> Header = GetHeader(header); - if (!Header.first.empty()) - { - ReceivedHeaders.emplace_back(std::move(Header)); - } + auto HeaderCallback = [&](std::string header, intptr_t) { + std::pair<std::string, std::string> Header = GetHeader(header); + if (!Header.first.empty()) + { + ReceivedHeaders.emplace_back(std::move(Header)); + } - if (Header.first == "Content-Range"sv) + if (Header.first == "Content-Range"sv) + { + if (Header.second.starts_with("bytes "sv)) { - if (Header.second.starts_with("bytes "sv)) + size_t RangeStartEnd = Header.second.find('-', 6); + if (RangeStartEnd != std::string::npos) { - size_t RangeStartEnd = Header.second.find('-', 6); - if (RangeStartEnd != std::string::npos) + const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6)); + if (Start) { - const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6)); - if (Start) + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + if (Start.value() == DownloadedSize) { - uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); - if (Start.value() == DownloadedSize) - { - return 1; - } - else if (Start.value() > DownloadedSize) - { - return 0; - } - if (PayloadFile) - { - PayloadFile->ResetWritePos(Start.value()); - } - else - { - PayloadString = PayloadString.substr(0, Start.value()); - } return 1; } + else if (Start.value() > DownloadedSize) + { + return 0; + } + if (PayloadFile) + { + PayloadFile->ResetWritePos(Start.value()); + } + else + { + PayloadString = PayloadString.substr(0, Start.value()); + } + return 1; } } - return 0; } - return 1; - }; + return 0; + } + return 1; + }; - KeyValueMap HeadersWithRange(AdditionalHeader); - do - { - uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + KeyValueMap HeadersWithRange(AdditionalHeader); + do + { + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); - std::string Range = fmt::format("bytes={}-{}", DownloadedSize, ContentLength.value()); - if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) + uint64_t ContentLength = RequestedContentLength; + if (ContentLength == uint64_t(-1)) + { + if (auto ParsedContentLength = ParseInt<int64_t>(It->second); ParsedContentLength.has_value()) { - if (RangeIt->second == Range) - { - // If we didn't make any progress, abort - break; - } + ContentLength = ParsedContentLength.value(); } - HeadersWithRange.Entries.insert_or_assign("Range", Range); - - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, - Url, - m_ConnectionSettings, - HeadersWithRange, - {}, - m_SessionId, - GetAccessToken()); - Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); - for (const std::pair<std::string, std::string>& H : ReceivedHeaders) + } + + std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1); + if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) + { + if (RangeIt->second == Range) { - Response.header.insert_or_assign(H.first, H.second); + // If we didn't make any progress, abort + break; } - ReceivedHeaders.clear(); - } while (ShouldResume(Response)); - } + } + HeadersWithRange.Entries.insert_or_assign("Range", Range); + + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, + Url, + m_ConnectionSettings, + HeadersWithRange, + {}, + m_SessionId, + GetAccessToken()); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); + for (const std::pair<std::string, std::string>& H : ReceivedHeaders) + { + Response.header.insert_or_assign(H.first, H.second); + } + ReceivedHeaders.clear(); + } while (ShouldResume(Response)); } } } @@ -1249,7 +1542,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold PayloadFile, m_ConnectionSettings.RetryCount); - return CommonResponse(std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}); + return CommonResponse(m_SessionId, std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}); } ////////////////////////////////////////////////////////////////////////// @@ -1361,6 +1654,133 @@ HttpClient::Response::ThrowError(std::string_view ErrorPrefix) #if ZEN_WITH_TESTS +namespace testutil { + IoHash HashComposite(const CompositeBuffer& Payload) + { + IoHashStream Hasher; + const uint64_t PayloadSize = Payload.GetSize(); + std::vector<uint8_t> Buffer(64u * 1024u); + detail::CompositeBufferReadStream Stream(Payload, 137u * 1024u); + for (uint64_t Offset = 0; Offset < PayloadSize;) + { + uint64_t Count = Min(64u * 1024u, PayloadSize - Offset); + Stream.Read(Buffer.data(), Count); + Hasher.Append(Buffer.data(), Count); + Offset += Count; + } + return Hasher.GetHash(); + }; + + IoHash HashFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize) + { + IoHashStream Hasher; + std::vector<uint8_t> Buffer(64u * 1024u); + detail::BufferedReadFileStream Stream(FileHandle, FileOffset, FileSize, 137u * 1024u); + for (uint64_t Offset = 0; Offset < FileSize;) + { + uint64_t Count = Min(64u * 1024u, FileSize - Offset); + Stream.Read(Buffer.data(), Count); + Hasher.Append(Buffer.data(), Count); + Offset += Count; + } + return Hasher.GetHash(); + } + +} // namespace testutil + +TEST_CASE("responseformat") +{ + using namespace std::literals; + + SUBCASE("identity") + { + BodyLogFormatter _{"abcd"}; + CHECK_EQ(_.GetText(), "abcd"sv); + } + + SUBCASE("very long") + { + std::string_view LongView = + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz" + "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"; + + BodyLogFormatter _{LongView}; + + CHECK(_.GetText().size() < LongView.size()); + CHECK(_.GetText().starts_with("[truncated"sv)); + } + + SUBCASE("invalid text") + { + std::string_view BadText = "totobaba\xff\xfe"; + + BodyLogFormatter _{BadText}; + + CHECK_EQ(_.GetText(), "totobaba"); + } +} + +TEST_CASE("BufferedReadFileStream") +{ + ScopedTemporaryDirectory TmpDir; + + IoBuffer DiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer1"); + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + CHECK(DiskBuffer.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); + + IoBuffer Partial(DiskBuffer, 37 * 1024, 512 * 1024); + CHECK(Partial.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(Partial), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); + + IoBuffer SmallDiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(63 * 1024)), TmpDir.Path() / "diskbuffer2"); + CHECK(SmallDiskBuffer.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(SmallDiskBuffer), + testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); +} + +TEST_CASE("CompositeBufferReadStream") +{ + ScopedTemporaryDirectory TmpDir; + + IoBuffer MemoryBuffer1 = CreateRandomBlob(64); + CHECK_EQ(IoHash::HashBuffer(MemoryBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer1)))); + + IoBuffer MemoryBuffer2 = CreateRandomBlob(561 * 1024); + CHECK_EQ(IoHash::HashBuffer(MemoryBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer2)))); + + IoBuffer DiskBuffer1 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(267 * 3 * 1024)), TmpDir.Path() / "diskbuffer1"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer1)))); + + IoBuffer DiskBuffer2 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(3 * 1024)), TmpDir.Path() / "diskbuffer2"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer2)))); + + IoBuffer DiskBuffer3 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer3"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer3), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer3)))); + + CompositeBuffer Data(SharedBuffer(std::move(MemoryBuffer1)), + SharedBuffer(std::move(DiskBuffer1)), + SharedBuffer(std::move(DiskBuffer2)), + SharedBuffer(std::move(MemoryBuffer2)), + SharedBuffer(std::move(DiskBuffer3))); + CHECK_EQ(IoHash::HashBuffer(Data), testutil::HashComposite(Data)); +} + TEST_CASE("httpclient") { using namespace std::literals; |