aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-02 19:14:52 +0200
committerGitHub Enterprise <[email protected]>2025-06-02 19:14:52 +0200
commit33d443f5361d007f4971bf0d98585b81ee691437 (patch)
treeb5814d71a87b5b7a390cc4d94bf84dcdb8f6cd6b /src
parentfix cachbucket mem hit count (#415) (diff)
downloadzen-33d443f5361d007f4971bf0d98585b81ee691437.tar.xz
zen-33d443f5361d007f4971bf0d98585b81ee691437.zip
http client streaming upload (#413)
- Improvement: Add streaming upload from HttpClient to reduce I/O caused by excessive MMap usage
Diffstat (limited to 'src')
-rw-r--r--src/zencore/basicfile.cpp149
-rw-r--r--src/zencore/filesystem.cpp126
-rw-r--r--src/zencore/include/zencore/filesystem.h11
-rw-r--r--src/zencore/iobuffer.cpp42
-rw-r--r--src/zenhttp/httpclient.cpp310
5 files changed, 438 insertions, 200 deletions
diff --git a/src/zencore/basicfile.cpp b/src/zencore/basicfile.cpp
index 12ee26155..993f2b616 100644
--- a/src/zencore/basicfile.cpp
+++ b/src/zencore/basicfile.cpp
@@ -181,58 +181,18 @@ BasicFile::ReadRange(uint64_t FileOffset, uint64_t ByteCount)
void
BasicFile::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset)
{
- const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
-
- while (BytesToRead)
+ const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
+ std::error_code Ec;
+ ReadFile(m_FileHandle, Data, BytesToRead, FileOffset, MaxChunkSize, Ec);
+ if (Ec)
{
- const uint64_t NumberOfBytesToRead = Min(BytesToRead, MaxChunkSize);
- int32_t Error = 0;
- size_t BytesRead = 0;
-
-#if ZEN_PLATFORM_WINDOWS
- OVERLAPPED Ovl{};
-
- Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu);
- Ovl.OffsetHigh = DWORD(FileOffset >> 32);
-
- DWORD dwNumberOfBytesRead = 0;
- BOOL Success = ::ReadFile(m_FileHandle, Data, DWORD(NumberOfBytesToRead), &dwNumberOfBytesRead, &Ovl);
- if (Success)
- {
- BytesRead = size_t(dwNumberOfBytesRead);
- }
- else
- {
- Error = zen::GetLastError();
- }
-#else
- static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files");
- int Fd = int(uintptr_t(m_FileHandle));
- ssize_t ReadResult = pread(Fd, Data, NumberOfBytesToRead, FileOffset);
- if (ReadResult != -1)
- {
- BytesRead = size_t(ReadResult);
- }
- else
- {
- Error = zen::GetLastError();
- }
-#endif
-
- if (Error || (BytesRead != NumberOfBytesToRead))
- {
- std::error_code DummyEc;
- throw std::system_error(std::error_code(Error, std::system_category()),
- fmt::format("ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})",
- FileOffset,
- NumberOfBytesToRead,
- PathFromHandle(m_FileHandle, DummyEc),
- FileSizeFromHandle(m_FileHandle)));
- }
-
- BytesToRead -= NumberOfBytesToRead;
- FileOffset += NumberOfBytesToRead;
- Data = reinterpret_cast<uint8_t*>(Data) + NumberOfBytesToRead;
+ std::error_code DummyEc;
+ throw std::system_error(Ec,
+ fmt::format("BasicFile::Read: ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})",
+ FileOffset,
+ BytesToRead,
+ PathFromHandle(m_FileHandle, DummyEc),
+ FileSizeFromHandle(m_FileHandle)));
}
}
@@ -323,43 +283,9 @@ BasicFile::Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec)
void
BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec)
{
- ZEN_ASSERT(m_FileHandle != nullptr);
-
- Ec.clear();
-
const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
- while (Size)
- {
- const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize);
-
-#if ZEN_PLATFORM_WINDOWS
- OVERLAPPED Ovl{};
-
- Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu);
- Ovl.OffsetHigh = DWORD(FileOffset >> 32);
-
- DWORD dwNumberOfBytesWritten = 0;
-
- BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl);
-#else
- static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files");
- int Fd = int(uintptr_t(m_FileHandle));
- int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, FileOffset);
- bool Success = (BytesWritten > 0);
-#endif
-
- if (!Success)
- {
- Ec = MakeErrorCodeFromLastError();
-
- return;
- }
-
- Size -= NumberOfBytesToWrite;
- FileOffset += NumberOfBytesToWrite;
- Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesToWrite;
- }
+ WriteFile(m_FileHandle, Data, Size, FileOffset, MaxChunkSize, Ec);
}
void
@@ -405,59 +331,20 @@ BasicFile::Flush()
uint64_t
BasicFile::FileSize() const
{
-#if ZEN_PLATFORM_WINDOWS
- ULARGE_INTEGER liFileSize;
- liFileSize.LowPart = ::GetFileSize(m_FileHandle, &liFileSize.HighPart);
- if (liFileSize.LowPart == INVALID_FILE_SIZE)
- {
- int Error = zen::GetLastError();
- if (Error)
- {
- std::error_code Dummy;
- ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy)));
- }
- }
- return uint64_t(liFileSize.QuadPart);
-#else
- int Fd = int(uintptr_t(m_FileHandle));
- static_assert(sizeof(decltype(stat::st_size)) == sizeof(uint64_t), "fstat() doesn't support large files");
- struct stat Stat;
- if (fstat(Fd, &Stat) == -1)
+ std::error_code Ec;
+ uint64_t FileSize = FileSizeFromHandle(m_FileHandle, Ec);
+ if (Ec)
{
std::error_code Dummy;
- ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy)));
+ ThrowSystemError(Ec.value(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy)));
}
- return uint64_t(Stat.st_size);
-#endif
+ return FileSize;
}
uint64_t
BasicFile::FileSize(std::error_code& Ec) const
{
-#if ZEN_PLATFORM_WINDOWS
- ULARGE_INTEGER liFileSize;
- liFileSize.LowPart = ::GetFileSize(m_FileHandle, &liFileSize.HighPart);
- if (liFileSize.LowPart == INVALID_FILE_SIZE)
- {
- int Error = zen::GetLastError();
- if (Error)
- {
- Ec = MakeErrorCode(Error);
- return 0;
- }
- }
- return uint64_t(liFileSize.QuadPart);
-#else
- int Fd = int(uintptr_t(m_FileHandle));
- static_assert(sizeof(decltype(stat::st_size)) == sizeof(uint64_t), "fstat() doesn't support large files");
- struct stat Stat;
- if (fstat(Fd, &Stat) == -1)
- {
- Ec = MakeErrorCodeFromLastError();
- return 0;
- }
- return uint64_t(Stat.st_size);
-#endif
+ return FileSizeFromHandle(m_FileHandle, Ec);
}
void
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index f71397922..0a9b2a73a 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -1052,6 +1052,100 @@ CopyTree(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop
}
void
+WriteFile(void* NativeHandle, const void* Data, uint64_t Size, uint64_t FileOffset, uint64_t ChunkSize, std::error_code& Ec)
+{
+ ZEN_ASSERT(NativeHandle != nullptr);
+
+ Ec.clear();
+
+ while (Size)
+ {
+ const uint64_t NumberOfBytesToWrite = Min(Size, ChunkSize);
+
+#if ZEN_PLATFORM_WINDOWS
+ OVERLAPPED Ovl{};
+
+ Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu);
+ Ovl.OffsetHigh = DWORD(FileOffset >> 32);
+
+ DWORD dwNumberOfBytesWritten = 0;
+
+ BOOL Success = ::WriteFile(NativeHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl);
+#else
+ static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files");
+ int Fd = int(uintptr_t(NativeHandle));
+ int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, FileOffset);
+ bool Success = (BytesWritten > 0);
+#endif
+
+ if (!Success)
+ {
+ Ec = MakeErrorCodeFromLastError();
+ return;
+ }
+
+ Size -= NumberOfBytesToWrite;
+ FileOffset += NumberOfBytesToWrite;
+ Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesToWrite;
+ }
+}
+
+void
+ReadFile(void* NativeHandle, void* Data, uint64_t Size, uint64_t FileOffset, uint64_t ChunkSize, std::error_code& Ec)
+{
+ while (Size)
+ {
+ const uint64_t NumberOfBytesToRead = Min(Size, ChunkSize);
+ size_t BytesRead = 0;
+
+#if ZEN_PLATFORM_WINDOWS
+ OVERLAPPED Ovl{};
+
+ Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu);
+ Ovl.OffsetHigh = DWORD(FileOffset >> 32);
+
+ DWORD dwNumberOfBytesRead = 0;
+ BOOL Success = ::ReadFile(NativeHandle, Data, DWORD(NumberOfBytesToRead), &dwNumberOfBytesRead, &Ovl);
+ if (Success)
+ {
+ BytesRead = size_t(dwNumberOfBytesRead);
+ }
+ else if ((BytesRead != NumberOfBytesToRead))
+ {
+ Ec = MakeErrorCode(ERROR_HANDLE_EOF);
+ return;
+ }
+ else
+ {
+ Ec = MakeErrorCodeFromLastError();
+ return;
+ }
+#else
+ static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files");
+ int Fd = int(uintptr_t(NativeHandle));
+ ssize_t ReadResult = pread(Fd, Data, NumberOfBytesToRead, FileOffset);
+ if (ReadResult != -1)
+ {
+ BytesRead = size_t(ReadResult);
+ }
+ else if ((BytesRead != NumberOfBytesToRead))
+ {
+ Ec = MakeErrorCode(EIO);
+ return;
+ }
+ else
+ {
+ Ec = MakeErrorCodeFromLastError();
+ return;
+ }
+#endif
+ Size -= NumberOfBytesToRead;
+ FileOffset += NumberOfBytesToRead;
+ Data = reinterpret_cast<uint8_t*>(Data) + NumberOfBytesToRead;
+ }
+}
+
+void
WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t BufferCount)
{
#if ZEN_PLATFORM_WINDOWS
@@ -1921,23 +2015,41 @@ FileSizeFromPath(const std::filesystem::path& Path, std::error_code& Ec)
}
uint64_t
-FileSizeFromHandle(void* NativeHandle)
+FileSizeFromHandle(void* NativeHandle, std::error_code& Ec)
{
- uint64_t FileSize = ~0ull;
-
#if ZEN_PLATFORM_WINDOWS
BY_HANDLE_FILE_INFORMATION Bhfh = {};
if (GetFileInformationByHandle(NativeHandle, &Bhfh))
{
- FileSize = uint64_t(Bhfh.nFileSizeHigh) << 32 | Bhfh.nFileSizeLow;
+ return uint64_t(Bhfh.nFileSizeHigh) << 32 | Bhfh.nFileSizeLow;
+ }
+ else
+ {
+ Ec = MakeErrorCodeFromLastError();
+ return 0;
}
#else
- int Fd = int(intptr_t(NativeHandle));
+ int Fd = int(intptr_t(NativeHandle));
+ static_assert(sizeof(decltype(stat::st_size)) == sizeof(uint64_t), "fstat() doesn't support large files");
struct stat Stat;
- fstat(Fd, &Stat);
- FileSize = size_t(Stat.st_size);
+ if (fstat(Fd, &Stat) == -1)
+ {
+ Ec = MakeErrorCodeFromLastError();
+ return 0;
+ }
+ return uint64_t(Stat.st_size);
#endif
+}
+uint64_t
+FileSizeFromHandle(void* NativeHandle)
+{
+ std::error_code Ec;
+ uint64_t FileSize = FileSizeFromHandle(NativeHandle, Ec);
+ if (Ec)
+ {
+ return ~0ull;
+ }
return FileSize;
}
diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h
index e62170eba..dfd0eedc9 100644
--- a/src/zencore/include/zencore/filesystem.h
+++ b/src/zencore/include/zencore/filesystem.h
@@ -102,6 +102,10 @@ ZENCORE_API uint64_t FileSizeFromPath(const std::filesystem::path& Path, std::er
*/
ZENCORE_API uint64_t FileSizeFromHandle(void* NativeHandle);
+/** Query file size from native file handle
+ */
+ZENCORE_API uint64_t FileSizeFromHandle(void* NativeHandle, std::error_code& Ec);
+
/** Get a native time tick of last modification time
*/
ZENCORE_API uint64_t GetModificationTickFromHandle(void* NativeHandle, std::error_code& Ec);
@@ -167,6 +171,13 @@ ZENCORE_API void ScanFile(void* NativeHandle,
uint64_t Size,
uint64_t ChunkSize,
std::function<void(const void* Data, size_t Size)>&& ProcessFunc);
+ZENCORE_API void WriteFile(void* NativeHandle,
+ const void* Data,
+ uint64_t Size,
+ uint64_t FileOffset,
+ uint64_t ChunkSize,
+ std::error_code& Ec);
+ZENCORE_API void ReadFile(void* NativeHandle, void* Data, uint64_t Size, uint64_t FileOffset, uint64_t ChunkSize, std::error_code& Ec);
struct CopyFileOptions
{
diff --git a/src/zencore/iobuffer.cpp b/src/zencore/iobuffer.cpp
index 3b5c89c3e..8e9a37a27 100644
--- a/src/zencore/iobuffer.cpp
+++ b/src/zencore/iobuffer.cpp
@@ -297,49 +297,21 @@ IoBufferExtendedCore::Materialize() const
AllocateBuffer(m_DataBytes, sizeof(void*));
NewFlags |= kIsOwnedByThis;
- int32_t Error = 0;
- size_t BytesRead = 0;
-#if ZEN_PLATFORM_WINDOWS
- OVERLAPPED Ovl{};
-
- Ovl.Offset = DWORD(m_FileOffset & 0xffff'ffffu);
- Ovl.OffsetHigh = DWORD(m_FileOffset >> 32);
-
- DWORD dwNumberOfBytesRead = 0;
- BOOL Success = ::ReadFile(m_FileHandle, (void*)m_DataPtr, DWORD(m_DataBytes), &dwNumberOfBytesRead, &Ovl) == TRUE;
- if (Success)
- {
- BytesRead = size_t(dwNumberOfBytesRead);
- }
- else
- {
- Error = zen::GetLastError();
- }
-#else
- static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files");
- int Fd = int(uintptr_t(m_FileHandle));
- ssize_t ReadResult = pread(Fd, (void*)m_DataPtr, m_DataBytes, m_FileOffset);
- if (ReadResult != -1)
- {
- BytesRead = size_t(ReadResult);
- }
- else
- {
- Error = zen::GetLastError();
- }
-#endif // ZEN_PLATFORM_WINDOWS
- if (Error || (BytesRead != m_DataBytes))
+ std::error_code Ec;
+ ReadFile(m_FileHandle, (void*)m_DataPtr, m_DataBytes, m_FileOffset, DisableMMapSizeLimit, Ec);
+ if (Ec)
{
std::error_code DummyEc;
- ZEN_WARN("IoBufferExtendedCore::Materialize: ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x}), {}",
+ ZEN_WARN("IoBufferExtendedCore::Materialize: ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x}), {} ({})",
m_FileOffset,
m_DataBytes,
zen::PathFromHandle(m_FileHandle, DummyEc),
zen::FileSizeFromHandle(m_FileHandle),
- GetSystemErrorAsString(Error));
+ Ec.message(),
+ Ec.value());
throw std::system_error(
- std::error_code(Error, std::system_category()),
+ Ec,
fmt::format("IoBufferExtendedCore::Materialize: ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})",
m_FileOffset,
m_DataBytes,
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index a6e4d9290..f2b26b922 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -12,13 +12,19 @@
#include <zencore/filesystem.h>
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
+#include <zencore/memory/memory.h>
#include <zencore/session.h>
#include <zencore/sharedbuffer.h>
#include <zencore/stream.h>
#include <zencore/string.h>
-#include <zencore/testing.h>
#include <zencore/trace.h>
+#if ZEN_WITH_TESTS
+# include <zencore/basicfile.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+#endif // ZEN_WITH_TESTS
+
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -42,6 +48,9 @@ namespace detail {
class TempPayloadFile
{
public:
+ TempPayloadFile(const TempPayloadFile&) = delete;
+ TempPayloadFile& operator=(const TempPayloadFile&) = delete;
+
TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {}
~TempPayloadFile()
{
@@ -271,6 +280,167 @@ namespace detail {
std::uint64_t m_CacheBufferOffset = 0;
};
+ class BufferedReadFileStream
+ {
+ public:
+ BufferedReadFileStream(const BufferedReadFileStream&) = delete;
+ BufferedReadFileStream& operator=(const BufferedReadFileStream&) = delete;
+
+ BufferedReadFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize, uint64_t BufferSize)
+ : m_FileHandle(FileHandle)
+ , m_FileSize(FileSize)
+ , m_FileEnd(FileOffset + FileSize)
+ , m_BufferSize(Min(BufferSize, FileSize))
+ , m_FileOffset(FileOffset)
+ {
+ }
+
+ ~BufferedReadFileStream() { Memory::Free(m_Buffer); }
+ void Read(void* Data, uint64_t Size)
+ {
+ ZEN_ASSERT(Data != nullptr);
+ if (Size > m_BufferSize)
+ {
+ Read(Data, Size, m_FileOffset);
+ m_FileOffset += Size;
+ return;
+ }
+ uint8_t* WritePtr = ((uint8_t*)Data);
+ uint64_t Begin = m_FileOffset;
+ uint64_t End = m_FileOffset + Size;
+ ZEN_ASSERT(m_FileOffset >= m_BufferStart);
+ if (m_FileOffset < m_BufferEnd)
+ {
+ ZEN_ASSERT(m_Buffer != nullptr);
+ uint64_t Count = Min(m_BufferEnd, End) - m_FileOffset;
+ memcpy(WritePtr + Begin - m_FileOffset, m_Buffer + Begin - m_BufferStart, Count);
+ Begin += Count;
+ if (Begin == End)
+ {
+ m_FileOffset = End;
+ return;
+ }
+ }
+ if (End == m_FileEnd)
+ {
+ Read(WritePtr + Begin - m_FileOffset, End - Begin, Begin);
+ }
+ else
+ {
+ if (!m_Buffer)
+ {
+ m_BufferSize = Min(m_FileEnd - m_FileOffset, m_BufferSize);
+ m_Buffer = (uint8_t*)Memory::Alloc(gsl::narrow<size_t>(m_BufferSize));
+ }
+ m_BufferStart = Begin;
+ m_BufferEnd = Min(Begin + m_BufferSize, m_FileSize);
+ Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart);
+ uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart;
+ memcpy(WritePtr + Begin - m_FileOffset, m_Buffer, Count);
+ ZEN_ASSERT(Begin + Count == End);
+ }
+ m_FileOffset = End;
+ }
+
+ private:
+ void Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset)
+ {
+ const uint64_t MaxChunkSize = 1u * 1024 * 1024;
+ std::error_code Ec;
+ ReadFile(m_FileHandle, Data, BytesToRead, FileOffset, MaxChunkSize, Ec);
+
+ if (Ec)
+ {
+ std::error_code DummyEc;
+ throw std::system_error(
+ Ec,
+ fmt::format(
+ "HttpClient::BufferedReadFileStream ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})",
+ FileOffset,
+ BytesToRead,
+ PathFromHandle(m_FileHandle, DummyEc).generic_string(),
+ m_FileSize));
+ }
+ }
+
+ void* m_FileHandle = nullptr;
+ const uint64_t m_FileSize = 0;
+ const uint64_t m_FileEnd = 0;
+ uint64_t m_BufferSize = 0;
+ uint8_t* m_Buffer = nullptr;
+ uint64_t m_BufferStart = 0;
+ uint64_t m_BufferEnd = 0;
+ uint64_t m_FileOffset = 0;
+ };
+
+ class CompositeBufferReadStream
+ {
+ public:
+ CompositeBufferReadStream(const CompositeBuffer& Data, uint64_t BufferSize)
+ : m_Data(Data)
+ , m_BufferSize(BufferSize)
+ , m_SegmentIndex(0)
+ , m_BytesLeftInSegment(0)
+ {
+ }
+ uint64_t Read(void* Data, uint64_t Size)
+ {
+ uint64_t Result = 0;
+ uint8_t* WritePtr = (uint8_t*)Data;
+ while ((Size > 0) && (m_SegmentIndex < m_Data.GetSegments().size()))
+ {
+ if (m_BytesLeftInSegment == 0)
+ {
+ const SharedBuffer& Segment = m_Data.GetSegments()[m_SegmentIndex];
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ if (Segment.AsIoBuffer().GetFileReference(FileRef))
+ {
+ m_SegmentDiskBuffer = std::make_unique<BufferedReadFileStream>(FileRef.FileHandle,
+ FileRef.FileChunkOffset,
+ FileRef.FileChunkSize,
+ m_BufferSize);
+ }
+ else
+ {
+ m_SegmentMemoryBuffer = Segment.GetView();
+ }
+ m_BytesLeftInSegment = Segment.GetSize();
+ }
+ uint64_t BytesToRead = Min(m_BytesLeftInSegment, Size);
+ if (m_SegmentDiskBuffer)
+ {
+ m_SegmentDiskBuffer->Read(WritePtr, BytesToRead);
+ }
+ else
+ {
+ ZEN_ASSERT_SLOW(m_SegmentMemoryBuffer.GetSize() >= BytesToRead);
+ memcpy(WritePtr, m_SegmentMemoryBuffer.GetData(), BytesToRead);
+ m_SegmentMemoryBuffer.MidInline(BytesToRead);
+ }
+ WritePtr += BytesToRead;
+ Size -= BytesToRead;
+ Result += BytesToRead;
+
+ m_BytesLeftInSegment -= BytesToRead;
+ if (m_BytesLeftInSegment == 0)
+ {
+ m_SegmentDiskBuffer.reset();
+ m_SegmentMemoryBuffer.Reset();
+ m_SegmentIndex++;
+ }
+ }
+ return Result;
+ }
+
+ private:
+ const CompositeBuffer& m_Data;
+ const uint64_t m_BufferSize;
+ size_t m_SegmentIndex;
+ std::unique_ptr<BufferedReadFileStream> m_SegmentDiskBuffer;
+ MemoryView m_SegmentMemoryBuffer;
+ uint64_t m_BytesLeftInSegment;
+ };
+
} // namespace detail
//////////////////////////////////////////////////////////////////////////
@@ -1005,9 +1175,22 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType C
[&]() {
Impl::Session Sess =
m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->UpdateHeader({HeaderContentType(ContentType)});
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ if (Payload.GetFileReference(FileRef))
+ {
+ uint64_t Offset = 0;
+ detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u);
+ auto ReadCallback = [&Payload, &Offset, &Buffer](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ Buffer.Read(buffer, size);
+ Offset += size;
+ return true;
+ };
+ return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ }
Sess->SetBody(AsCprBody(Payload));
- Sess->UpdateHeader({HeaderContentType(ContentType)});
return Sess.Post();
},
m_ConnectionSettings.RetryCount));
@@ -1049,19 +1232,15 @@ HttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenConten
DoWithRetry(
m_SessionId,
[&]() {
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
Impl::Session Sess =
m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
Sess->UpdateHeader({HeaderContentType(ContentType)});
+ detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u);
+ auto ReadCallback = [&Reader](char* buffer, size_t& size, intptr_t) {
+ size = Reader.Read(buffer, size);
+ return true;
+ };
return Sess.Post(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
},
m_ConnectionSettings.RetryCount));
@@ -1081,16 +1260,16 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue
m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
- uint64_t Offset = 0;
- if (Payload.IsWholeFile())
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ if (Payload.GetFileReference(FileRef))
{
- auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Payload.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
+ uint64_t Offset = 0;
+ detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u);
+ auto ReadCallback = [&Payload, &Offset, &Buffer](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ Buffer.Read(buffer, size);
+ Offset += size;
+ return true;
};
return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
}
@@ -1114,13 +1293,9 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont
m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
Sess->UpdateHeader({HeaderContentType(ContentType)});
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
+ detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u);
+ auto ReadCallback = [&Reader](char* buffer, size_t& size, intptr_t) {
+ size = Reader.Read(buffer, size);
return true;
};
return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
@@ -1479,6 +1654,40 @@ HttpClient::Response::ThrowError(std::string_view ErrorPrefix)
#if ZEN_WITH_TESTS
+namespace testutil {
+ IoHash HashComposite(const CompositeBuffer& Payload)
+ {
+ IoHashStream Hasher;
+ const uint64_t PayloadSize = Payload.GetSize();
+ std::vector<uint8_t> Buffer(64u * 1024u);
+ detail::CompositeBufferReadStream Stream(Payload, 137u * 1024u);
+ for (uint64_t Offset = 0; Offset < PayloadSize;)
+ {
+ uint64_t Count = Min(64u * 1024u, PayloadSize - Offset);
+ Stream.Read(Buffer.data(), Count);
+ Hasher.Append(Buffer.data(), Count);
+ Offset += Count;
+ }
+ return Hasher.GetHash();
+ };
+
+ IoHash HashFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize)
+ {
+ IoHashStream Hasher;
+ std::vector<uint8_t> Buffer(64u * 1024u);
+ detail::BufferedReadFileStream Stream(FileHandle, FileOffset, FileSize, 137u * 1024u);
+ for (uint64_t Offset = 0; Offset < FileSize;)
+ {
+ uint64_t Count = Min(64u * 1024u, FileSize - Offset);
+ Stream.Read(Buffer.data(), Count);
+ Hasher.Append(Buffer.data(), Count);
+ Offset += Count;
+ }
+ return Hasher.GetHash();
+ }
+
+} // namespace testutil
+
TEST_CASE("responseformat")
{
using namespace std::literals;
@@ -1525,6 +1734,53 @@ TEST_CASE("responseformat")
}
}
+TEST_CASE("BufferedReadFileStream")
+{
+ ScopedTemporaryDirectory TmpDir;
+
+ IoBuffer DiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer1");
+
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ CHECK(DiskBuffer.GetFileReference(FileRef));
+ CHECK_EQ(IoHash::HashBuffer(DiskBuffer), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize));
+
+ IoBuffer Partial(DiskBuffer, 37 * 1024, 512 * 1024);
+ CHECK(Partial.GetFileReference(FileRef));
+ CHECK_EQ(IoHash::HashBuffer(Partial), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize));
+
+ IoBuffer SmallDiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(63 * 1024)), TmpDir.Path() / "diskbuffer2");
+ CHECK(SmallDiskBuffer.GetFileReference(FileRef));
+ CHECK_EQ(IoHash::HashBuffer(SmallDiskBuffer),
+ testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize));
+}
+
+TEST_CASE("CompositeBufferReadStream")
+{
+ ScopedTemporaryDirectory TmpDir;
+
+ IoBuffer MemoryBuffer1 = CreateRandomBlob(64);
+ CHECK_EQ(IoHash::HashBuffer(MemoryBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer1))));
+
+ IoBuffer MemoryBuffer2 = CreateRandomBlob(561 * 1024);
+ CHECK_EQ(IoHash::HashBuffer(MemoryBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer2))));
+
+ IoBuffer DiskBuffer1 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(267 * 3 * 1024)), TmpDir.Path() / "diskbuffer1");
+ CHECK_EQ(IoHash::HashBuffer(DiskBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer1))));
+
+ IoBuffer DiskBuffer2 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(3 * 1024)), TmpDir.Path() / "diskbuffer2");
+ CHECK_EQ(IoHash::HashBuffer(DiskBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer2))));
+
+ IoBuffer DiskBuffer3 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer3");
+ CHECK_EQ(IoHash::HashBuffer(DiskBuffer3), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer3))));
+
+ CompositeBuffer Data(SharedBuffer(std::move(MemoryBuffer1)),
+ SharedBuffer(std::move(DiskBuffer1)),
+ SharedBuffer(std::move(DiskBuffer2)),
+ SharedBuffer(std::move(MemoryBuffer2)),
+ SharedBuffer(std::move(DiskBuffer3)));
+ CHECK_EQ(IoHash::HashBuffer(Data), testutil::HashComposite(Data));
+}
+
TEST_CASE("httpclient")
{
using namespace std::literals;