diff options
| author | Dan Engelbrecht <[email protected]> | 2025-06-02 19:14:52 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-06-02 19:14:52 +0200 |
| commit | 33d443f5361d007f4971bf0d98585b81ee691437 (patch) | |
| tree | b5814d71a87b5b7a390cc4d94bf84dcdb8f6cd6b /src | |
| parent | fix cachbucket mem hit count (#415) (diff) | |
| download | zen-33d443f5361d007f4971bf0d98585b81ee691437.tar.xz zen-33d443f5361d007f4971bf0d98585b81ee691437.zip | |
http client streaming upload (#413)
- Improvement: Add streaming upload from HttpClient to reduce I/O caused by excessive MMap usage
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/basicfile.cpp | 149 | ||||
| -rw-r--r-- | src/zencore/filesystem.cpp | 126 | ||||
| -rw-r--r-- | src/zencore/include/zencore/filesystem.h | 11 | ||||
| -rw-r--r-- | src/zencore/iobuffer.cpp | 42 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 310 |
5 files changed, 438 insertions, 200 deletions
diff --git a/src/zencore/basicfile.cpp b/src/zencore/basicfile.cpp index 12ee26155..993f2b616 100644 --- a/src/zencore/basicfile.cpp +++ b/src/zencore/basicfile.cpp @@ -181,58 +181,18 @@ BasicFile::ReadRange(uint64_t FileOffset, uint64_t ByteCount) void BasicFile::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset) { - const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; - - while (BytesToRead) + const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; + std::error_code Ec; + ReadFile(m_FileHandle, Data, BytesToRead, FileOffset, MaxChunkSize, Ec); + if (Ec) { - const uint64_t NumberOfBytesToRead = Min(BytesToRead, MaxChunkSize); - int32_t Error = 0; - size_t BytesRead = 0; - -#if ZEN_PLATFORM_WINDOWS - OVERLAPPED Ovl{}; - - Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu); - Ovl.OffsetHigh = DWORD(FileOffset >> 32); - - DWORD dwNumberOfBytesRead = 0; - BOOL Success = ::ReadFile(m_FileHandle, Data, DWORD(NumberOfBytesToRead), &dwNumberOfBytesRead, &Ovl); - if (Success) - { - BytesRead = size_t(dwNumberOfBytesRead); - } - else - { - Error = zen::GetLastError(); - } -#else - static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files"); - int Fd = int(uintptr_t(m_FileHandle)); - ssize_t ReadResult = pread(Fd, Data, NumberOfBytesToRead, FileOffset); - if (ReadResult != -1) - { - BytesRead = size_t(ReadResult); - } - else - { - Error = zen::GetLastError(); - } -#endif - - if (Error || (BytesRead != NumberOfBytesToRead)) - { - std::error_code DummyEc; - throw std::system_error(std::error_code(Error, std::system_category()), - fmt::format("ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})", - FileOffset, - NumberOfBytesToRead, - PathFromHandle(m_FileHandle, DummyEc), - FileSizeFromHandle(m_FileHandle))); - } - - BytesToRead -= NumberOfBytesToRead; - FileOffset += NumberOfBytesToRead; - Data = reinterpret_cast<uint8_t*>(Data) + NumberOfBytesToRead; + std::error_code DummyEc; + throw std::system_error(Ec, + fmt::format("BasicFile::Read: ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})", + FileOffset, + BytesToRead, + PathFromHandle(m_FileHandle, DummyEc), + FileSizeFromHandle(m_FileHandle))); } } @@ -323,43 +283,9 @@ BasicFile::Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec) void BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec) { - ZEN_ASSERT(m_FileHandle != nullptr); - - Ec.clear(); - const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; - while (Size) - { - const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize); - -#if ZEN_PLATFORM_WINDOWS - OVERLAPPED Ovl{}; - - Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu); - Ovl.OffsetHigh = DWORD(FileOffset >> 32); - - DWORD dwNumberOfBytesWritten = 0; - - BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl); -#else - static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files"); - int Fd = int(uintptr_t(m_FileHandle)); - int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, FileOffset); - bool Success = (BytesWritten > 0); -#endif - - if (!Success) - { - Ec = MakeErrorCodeFromLastError(); - - return; - } - - Size -= NumberOfBytesToWrite; - FileOffset += NumberOfBytesToWrite; - Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesToWrite; - } + WriteFile(m_FileHandle, Data, Size, FileOffset, MaxChunkSize, Ec); } void @@ -405,59 +331,20 @@ BasicFile::Flush() uint64_t BasicFile::FileSize() const { -#if ZEN_PLATFORM_WINDOWS - ULARGE_INTEGER liFileSize; - liFileSize.LowPart = ::GetFileSize(m_FileHandle, &liFileSize.HighPart); - if (liFileSize.LowPart == INVALID_FILE_SIZE) - { - int Error = zen::GetLastError(); - if (Error) - { - std::error_code Dummy; - ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy))); - } - } - return uint64_t(liFileSize.QuadPart); -#else - int Fd = int(uintptr_t(m_FileHandle)); - static_assert(sizeof(decltype(stat::st_size)) == sizeof(uint64_t), "fstat() doesn't support large files"); - struct stat Stat; - if (fstat(Fd, &Stat) == -1) + std::error_code Ec; + uint64_t FileSize = FileSizeFromHandle(m_FileHandle, Ec); + if (Ec) { std::error_code Dummy; - ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy))); + ThrowSystemError(Ec.value(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy))); } - return uint64_t(Stat.st_size); -#endif + return FileSize; } uint64_t BasicFile::FileSize(std::error_code& Ec) const { -#if ZEN_PLATFORM_WINDOWS - ULARGE_INTEGER liFileSize; - liFileSize.LowPart = ::GetFileSize(m_FileHandle, &liFileSize.HighPart); - if (liFileSize.LowPart == INVALID_FILE_SIZE) - { - int Error = zen::GetLastError(); - if (Error) - { - Ec = MakeErrorCode(Error); - return 0; - } - } - return uint64_t(liFileSize.QuadPart); -#else - int Fd = int(uintptr_t(m_FileHandle)); - static_assert(sizeof(decltype(stat::st_size)) == sizeof(uint64_t), "fstat() doesn't support large files"); - struct stat Stat; - if (fstat(Fd, &Stat) == -1) - { - Ec = MakeErrorCodeFromLastError(); - return 0; - } - return uint64_t(Stat.st_size); -#endif + return FileSizeFromHandle(m_FileHandle, Ec); } void diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index f71397922..0a9b2a73a 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -1052,6 +1052,100 @@ CopyTree(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop } void +WriteFile(void* NativeHandle, const void* Data, uint64_t Size, uint64_t FileOffset, uint64_t ChunkSize, std::error_code& Ec) +{ + ZEN_ASSERT(NativeHandle != nullptr); + + Ec.clear(); + + while (Size) + { + const uint64_t NumberOfBytesToWrite = Min(Size, ChunkSize); + +#if ZEN_PLATFORM_WINDOWS + OVERLAPPED Ovl{}; + + Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu); + Ovl.OffsetHigh = DWORD(FileOffset >> 32); + + DWORD dwNumberOfBytesWritten = 0; + + BOOL Success = ::WriteFile(NativeHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl); +#else + static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files"); + int Fd = int(uintptr_t(NativeHandle)); + int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, FileOffset); + bool Success = (BytesWritten > 0); +#endif + + if (!Success) + { + Ec = MakeErrorCodeFromLastError(); + return; + } + + Size -= NumberOfBytesToWrite; + FileOffset += NumberOfBytesToWrite; + Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesToWrite; + } +} + +void +ReadFile(void* NativeHandle, void* Data, uint64_t Size, uint64_t FileOffset, uint64_t ChunkSize, std::error_code& Ec) +{ + while (Size) + { + const uint64_t NumberOfBytesToRead = Min(Size, ChunkSize); + size_t BytesRead = 0; + +#if ZEN_PLATFORM_WINDOWS + OVERLAPPED Ovl{}; + + Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu); + Ovl.OffsetHigh = DWORD(FileOffset >> 32); + + DWORD dwNumberOfBytesRead = 0; + BOOL Success = ::ReadFile(NativeHandle, Data, DWORD(NumberOfBytesToRead), &dwNumberOfBytesRead, &Ovl); + if (Success) + { + BytesRead = size_t(dwNumberOfBytesRead); + } + else if ((BytesRead != NumberOfBytesToRead)) + { + Ec = MakeErrorCode(ERROR_HANDLE_EOF); + return; + } + else + { + Ec = MakeErrorCodeFromLastError(); + return; + } +#else + static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files"); + int Fd = int(uintptr_t(NativeHandle)); + ssize_t ReadResult = pread(Fd, Data, NumberOfBytesToRead, FileOffset); + if (ReadResult != -1) + { + BytesRead = size_t(ReadResult); + } + else if ((BytesRead != NumberOfBytesToRead)) + { + Ec = MakeErrorCode(EIO); + return; + } + else + { + Ec = MakeErrorCodeFromLastError(); + return; + } +#endif + Size -= NumberOfBytesToRead; + FileOffset += NumberOfBytesToRead; + Data = reinterpret_cast<uint8_t*>(Data) + NumberOfBytesToRead; + } +} + +void WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t BufferCount) { #if ZEN_PLATFORM_WINDOWS @@ -1921,23 +2015,41 @@ FileSizeFromPath(const std::filesystem::path& Path, std::error_code& Ec) } uint64_t -FileSizeFromHandle(void* NativeHandle) +FileSizeFromHandle(void* NativeHandle, std::error_code& Ec) { - uint64_t FileSize = ~0ull; - #if ZEN_PLATFORM_WINDOWS BY_HANDLE_FILE_INFORMATION Bhfh = {}; if (GetFileInformationByHandle(NativeHandle, &Bhfh)) { - FileSize = uint64_t(Bhfh.nFileSizeHigh) << 32 | Bhfh.nFileSizeLow; + return uint64_t(Bhfh.nFileSizeHigh) << 32 | Bhfh.nFileSizeLow; + } + else + { + Ec = MakeErrorCodeFromLastError(); + return 0; } #else - int Fd = int(intptr_t(NativeHandle)); + int Fd = int(intptr_t(NativeHandle)); + static_assert(sizeof(decltype(stat::st_size)) == sizeof(uint64_t), "fstat() doesn't support large files"); struct stat Stat; - fstat(Fd, &Stat); - FileSize = size_t(Stat.st_size); + if (fstat(Fd, &Stat) == -1) + { + Ec = MakeErrorCodeFromLastError(); + return 0; + } + return uint64_t(Stat.st_size); #endif +} +uint64_t +FileSizeFromHandle(void* NativeHandle) +{ + std::error_code Ec; + uint64_t FileSize = FileSizeFromHandle(NativeHandle, Ec); + if (Ec) + { + return ~0ull; + } return FileSize; } diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h index e62170eba..dfd0eedc9 100644 --- a/src/zencore/include/zencore/filesystem.h +++ b/src/zencore/include/zencore/filesystem.h @@ -102,6 +102,10 @@ ZENCORE_API uint64_t FileSizeFromPath(const std::filesystem::path& Path, std::er */ ZENCORE_API uint64_t FileSizeFromHandle(void* NativeHandle); +/** Query file size from native file handle + */ +ZENCORE_API uint64_t FileSizeFromHandle(void* NativeHandle, std::error_code& Ec); + /** Get a native time tick of last modification time */ ZENCORE_API uint64_t GetModificationTickFromHandle(void* NativeHandle, std::error_code& Ec); @@ -167,6 +171,13 @@ ZENCORE_API void ScanFile(void* NativeHandle, uint64_t Size, uint64_t ChunkSize, std::function<void(const void* Data, size_t Size)>&& ProcessFunc); +ZENCORE_API void WriteFile(void* NativeHandle, + const void* Data, + uint64_t Size, + uint64_t FileOffset, + uint64_t ChunkSize, + std::error_code& Ec); +ZENCORE_API void ReadFile(void* NativeHandle, void* Data, uint64_t Size, uint64_t FileOffset, uint64_t ChunkSize, std::error_code& Ec); struct CopyFileOptions { diff --git a/src/zencore/iobuffer.cpp b/src/zencore/iobuffer.cpp index 3b5c89c3e..8e9a37a27 100644 --- a/src/zencore/iobuffer.cpp +++ b/src/zencore/iobuffer.cpp @@ -297,49 +297,21 @@ IoBufferExtendedCore::Materialize() const AllocateBuffer(m_DataBytes, sizeof(void*)); NewFlags |= kIsOwnedByThis; - int32_t Error = 0; - size_t BytesRead = 0; -#if ZEN_PLATFORM_WINDOWS - OVERLAPPED Ovl{}; - - Ovl.Offset = DWORD(m_FileOffset & 0xffff'ffffu); - Ovl.OffsetHigh = DWORD(m_FileOffset >> 32); - - DWORD dwNumberOfBytesRead = 0; - BOOL Success = ::ReadFile(m_FileHandle, (void*)m_DataPtr, DWORD(m_DataBytes), &dwNumberOfBytesRead, &Ovl) == TRUE; - if (Success) - { - BytesRead = size_t(dwNumberOfBytesRead); - } - else - { - Error = zen::GetLastError(); - } -#else - static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files"); - int Fd = int(uintptr_t(m_FileHandle)); - ssize_t ReadResult = pread(Fd, (void*)m_DataPtr, m_DataBytes, m_FileOffset); - if (ReadResult != -1) - { - BytesRead = size_t(ReadResult); - } - else - { - Error = zen::GetLastError(); - } -#endif // ZEN_PLATFORM_WINDOWS - if (Error || (BytesRead != m_DataBytes)) + std::error_code Ec; + ReadFile(m_FileHandle, (void*)m_DataPtr, m_DataBytes, m_FileOffset, DisableMMapSizeLimit, Ec); + if (Ec) { std::error_code DummyEc; - ZEN_WARN("IoBufferExtendedCore::Materialize: ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x}), {}", + ZEN_WARN("IoBufferExtendedCore::Materialize: ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x}), {} ({})", m_FileOffset, m_DataBytes, zen::PathFromHandle(m_FileHandle, DummyEc), zen::FileSizeFromHandle(m_FileHandle), - GetSystemErrorAsString(Error)); + Ec.message(), + Ec.value()); throw std::system_error( - std::error_code(Error, std::system_category()), + Ec, fmt::format("IoBufferExtendedCore::Materialize: ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})", m_FileOffset, m_DataBytes, diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index a6e4d9290..f2b26b922 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -12,13 +12,19 @@ #include <zencore/filesystem.h> #include <zencore/iobuffer.h> #include <zencore/logging.h> +#include <zencore/memory/memory.h> #include <zencore/session.h> #include <zencore/sharedbuffer.h> #include <zencore/stream.h> #include <zencore/string.h> -#include <zencore/testing.h> #include <zencore/trace.h> +#if ZEN_WITH_TESTS +# include <zencore/basicfile.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif // ZEN_WITH_TESTS + ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -42,6 +48,9 @@ namespace detail { class TempPayloadFile { public: + TempPayloadFile(const TempPayloadFile&) = delete; + TempPayloadFile& operator=(const TempPayloadFile&) = delete; + TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {} ~TempPayloadFile() { @@ -271,6 +280,167 @@ namespace detail { std::uint64_t m_CacheBufferOffset = 0; }; + class BufferedReadFileStream + { + public: + BufferedReadFileStream(const BufferedReadFileStream&) = delete; + BufferedReadFileStream& operator=(const BufferedReadFileStream&) = delete; + + BufferedReadFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize, uint64_t BufferSize) + : m_FileHandle(FileHandle) + , m_FileSize(FileSize) + , m_FileEnd(FileOffset + FileSize) + , m_BufferSize(Min(BufferSize, FileSize)) + , m_FileOffset(FileOffset) + { + } + + ~BufferedReadFileStream() { Memory::Free(m_Buffer); } + void Read(void* Data, uint64_t Size) + { + ZEN_ASSERT(Data != nullptr); + if (Size > m_BufferSize) + { + Read(Data, Size, m_FileOffset); + m_FileOffset += Size; + return; + } + uint8_t* WritePtr = ((uint8_t*)Data); + uint64_t Begin = m_FileOffset; + uint64_t End = m_FileOffset + Size; + ZEN_ASSERT(m_FileOffset >= m_BufferStart); + if (m_FileOffset < m_BufferEnd) + { + ZEN_ASSERT(m_Buffer != nullptr); + uint64_t Count = Min(m_BufferEnd, End) - m_FileOffset; + memcpy(WritePtr + Begin - m_FileOffset, m_Buffer + Begin - m_BufferStart, Count); + Begin += Count; + if (Begin == End) + { + m_FileOffset = End; + return; + } + } + if (End == m_FileEnd) + { + Read(WritePtr + Begin - m_FileOffset, End - Begin, Begin); + } + else + { + if (!m_Buffer) + { + m_BufferSize = Min(m_FileEnd - m_FileOffset, m_BufferSize); + m_Buffer = (uint8_t*)Memory::Alloc(gsl::narrow<size_t>(m_BufferSize)); + } + m_BufferStart = Begin; + m_BufferEnd = Min(Begin + m_BufferSize, m_FileSize); + Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart); + uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart; + memcpy(WritePtr + Begin - m_FileOffset, m_Buffer, Count); + ZEN_ASSERT(Begin + Count == End); + } + m_FileOffset = End; + } + + private: + void Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset) + { + const uint64_t MaxChunkSize = 1u * 1024 * 1024; + std::error_code Ec; + ReadFile(m_FileHandle, Data, BytesToRead, FileOffset, MaxChunkSize, Ec); + + if (Ec) + { + std::error_code DummyEc; + throw std::system_error( + Ec, + fmt::format( + "HttpClient::BufferedReadFileStream ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})", + FileOffset, + BytesToRead, + PathFromHandle(m_FileHandle, DummyEc).generic_string(), + m_FileSize)); + } + } + + void* m_FileHandle = nullptr; + const uint64_t m_FileSize = 0; + const uint64_t m_FileEnd = 0; + uint64_t m_BufferSize = 0; + uint8_t* m_Buffer = nullptr; + uint64_t m_BufferStart = 0; + uint64_t m_BufferEnd = 0; + uint64_t m_FileOffset = 0; + }; + + class CompositeBufferReadStream + { + public: + CompositeBufferReadStream(const CompositeBuffer& Data, uint64_t BufferSize) + : m_Data(Data) + , m_BufferSize(BufferSize) + , m_SegmentIndex(0) + , m_BytesLeftInSegment(0) + { + } + uint64_t Read(void* Data, uint64_t Size) + { + uint64_t Result = 0; + uint8_t* WritePtr = (uint8_t*)Data; + while ((Size > 0) && (m_SegmentIndex < m_Data.GetSegments().size())) + { + if (m_BytesLeftInSegment == 0) + { + const SharedBuffer& Segment = m_Data.GetSegments()[m_SegmentIndex]; + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if (Segment.AsIoBuffer().GetFileReference(FileRef)) + { + m_SegmentDiskBuffer = std::make_unique<BufferedReadFileStream>(FileRef.FileHandle, + FileRef.FileChunkOffset, + FileRef.FileChunkSize, + m_BufferSize); + } + else + { + m_SegmentMemoryBuffer = Segment.GetView(); + } + m_BytesLeftInSegment = Segment.GetSize(); + } + uint64_t BytesToRead = Min(m_BytesLeftInSegment, Size); + if (m_SegmentDiskBuffer) + { + m_SegmentDiskBuffer->Read(WritePtr, BytesToRead); + } + else + { + ZEN_ASSERT_SLOW(m_SegmentMemoryBuffer.GetSize() >= BytesToRead); + memcpy(WritePtr, m_SegmentMemoryBuffer.GetData(), BytesToRead); + m_SegmentMemoryBuffer.MidInline(BytesToRead); + } + WritePtr += BytesToRead; + Size -= BytesToRead; + Result += BytesToRead; + + m_BytesLeftInSegment -= BytesToRead; + if (m_BytesLeftInSegment == 0) + { + m_SegmentDiskBuffer.reset(); + m_SegmentMemoryBuffer.Reset(); + m_SegmentIndex++; + } + } + return Result; + } + + private: + const CompositeBuffer& m_Data; + const uint64_t m_BufferSize; + size_t m_SegmentIndex; + std::unique_ptr<BufferedReadFileStream> m_SegmentDiskBuffer; + MemoryView m_SegmentMemoryBuffer; + uint64_t m_BytesLeftInSegment; + }; + } // namespace detail ////////////////////////////////////////////////////////////////////////// @@ -1005,9 +1175,22 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType C [&]() { Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if (Payload.GetFileReference(FileRef)) + { + uint64_t Offset = 0; + detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u); + auto ReadCallback = [&Payload, &Offset, &Buffer](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + Buffer.Read(buffer, size); + Offset += size; + return true; + }; + return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + } Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader({HeaderContentType(ContentType)}); return Sess.Post(); }, m_ConnectionSettings.RetryCount)); @@ -1049,19 +1232,15 @@ HttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenConten DoWithRetry( m_SessionId, [&]() { - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->UpdateHeader({HeaderContentType(ContentType)}); + detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u); + auto ReadCallback = [&Reader](char* buffer, size_t& size, intptr_t) { + size = Reader.Read(buffer, size); + return true; + }; return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); }, m_ConnectionSettings.RetryCount)); @@ -1081,16 +1260,16 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); - uint64_t Offset = 0; - if (Payload.IsWholeFile()) + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if (Payload.GetFileReference(FileRef)) { - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; + uint64_t Offset = 0; + detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u); + auto ReadCallback = [&Payload, &Offset, &Buffer](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + Buffer.Read(buffer, size); + Offset += size; + return true; }; return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); } @@ -1114,13 +1293,9 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->UpdateHeader({HeaderContentType(ContentType)}); - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; + detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u); + auto ReadCallback = [&Reader](char* buffer, size_t& size, intptr_t) { + size = Reader.Read(buffer, size); return true; }; return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); @@ -1479,6 +1654,40 @@ HttpClient::Response::ThrowError(std::string_view ErrorPrefix) #if ZEN_WITH_TESTS +namespace testutil { + IoHash HashComposite(const CompositeBuffer& Payload) + { + IoHashStream Hasher; + const uint64_t PayloadSize = Payload.GetSize(); + std::vector<uint8_t> Buffer(64u * 1024u); + detail::CompositeBufferReadStream Stream(Payload, 137u * 1024u); + for (uint64_t Offset = 0; Offset < PayloadSize;) + { + uint64_t Count = Min(64u * 1024u, PayloadSize - Offset); + Stream.Read(Buffer.data(), Count); + Hasher.Append(Buffer.data(), Count); + Offset += Count; + } + return Hasher.GetHash(); + }; + + IoHash HashFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize) + { + IoHashStream Hasher; + std::vector<uint8_t> Buffer(64u * 1024u); + detail::BufferedReadFileStream Stream(FileHandle, FileOffset, FileSize, 137u * 1024u); + for (uint64_t Offset = 0; Offset < FileSize;) + { + uint64_t Count = Min(64u * 1024u, FileSize - Offset); + Stream.Read(Buffer.data(), Count); + Hasher.Append(Buffer.data(), Count); + Offset += Count; + } + return Hasher.GetHash(); + } + +} // namespace testutil + TEST_CASE("responseformat") { using namespace std::literals; @@ -1525,6 +1734,53 @@ TEST_CASE("responseformat") } } +TEST_CASE("BufferedReadFileStream") +{ + ScopedTemporaryDirectory TmpDir; + + IoBuffer DiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer1"); + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + CHECK(DiskBuffer.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); + + IoBuffer Partial(DiskBuffer, 37 * 1024, 512 * 1024); + CHECK(Partial.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(Partial), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); + + IoBuffer SmallDiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(63 * 1024)), TmpDir.Path() / "diskbuffer2"); + CHECK(SmallDiskBuffer.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(SmallDiskBuffer), + testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); +} + +TEST_CASE("CompositeBufferReadStream") +{ + ScopedTemporaryDirectory TmpDir; + + IoBuffer MemoryBuffer1 = CreateRandomBlob(64); + CHECK_EQ(IoHash::HashBuffer(MemoryBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer1)))); + + IoBuffer MemoryBuffer2 = CreateRandomBlob(561 * 1024); + CHECK_EQ(IoHash::HashBuffer(MemoryBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer2)))); + + IoBuffer DiskBuffer1 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(267 * 3 * 1024)), TmpDir.Path() / "diskbuffer1"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer1)))); + + IoBuffer DiskBuffer2 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(3 * 1024)), TmpDir.Path() / "diskbuffer2"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer2)))); + + IoBuffer DiskBuffer3 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer3"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer3), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer3)))); + + CompositeBuffer Data(SharedBuffer(std::move(MemoryBuffer1)), + SharedBuffer(std::move(DiskBuffer1)), + SharedBuffer(std::move(DiskBuffer2)), + SharedBuffer(std::move(MemoryBuffer2)), + SharedBuffer(std::move(DiskBuffer3))); + CHECK_EQ(IoHash::HashBuffer(Data), testutil::HashComposite(Data)); +} + TEST_CASE("httpclient") { using namespace std::literals; |