aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/clients/httpclientcommon.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-09-30 19:07:51 +0200
committerGitHub Enterprise <[email protected]>2025-09-30 19:07:51 +0200
commit634181a04efff90def7a98d98eac7078e1d4e62d (patch)
tree04678bba636a76d21f300ff6e73af4473274cf12 /src/zenhttp/clients/httpclientcommon.cpp
parentuse batching clang-format for quicker turnaround on validate actions (#529) (diff)
downloadzen-634181a04efff90def7a98d98eac7078e1d4e62d.tar.xz
zen-634181a04efff90def7a98d98eac7078e1d4e62d.zip
HttpClient support for pluggable back-ends (#532)
refactored HttpClient to separate out cpr implementation into separate classes, with an abstract base class to allow plugging in multiple implementations in the future
Diffstat (limited to 'src/zenhttp/clients/httpclientcommon.cpp')
-rw-r--r--src/zenhttp/clients/httpclientcommon.cpp474
1 files changed, 474 insertions, 0 deletions
diff --git a/src/zenhttp/clients/httpclientcommon.cpp b/src/zenhttp/clients/httpclientcommon.cpp
new file mode 100644
index 000000000..8e5136dff
--- /dev/null
+++ b/src/zenhttp/clients/httpclientcommon.cpp
@@ -0,0 +1,474 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "httpclientcommon.h"
+
+#include <fmt/format.h>
+#include <zencore/except.h>
+#include <zencore/filesystem.h>
+#include <zencore/iohash.h>
+#include <zencore/logging.h>
+#include <zencore/memory/memory.h>
+#include <zencore/windows.h>
+#include <gsl/gsl-lite.hpp>
+
+#if ZEN_WITH_TESTS
+# include <zencore/basicfile.h>
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
+#endif // ZEN_WITH_TESTS
+
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+# include <fcntl.h>
+# include <sys/stat.h>
+# include <unistd.h>
+#endif
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace detail {
+
+ static std::atomic_uint32_t TempFileBaseIndex;
+
+ TempPayloadFile::TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {}
+ TempPayloadFile::~TempPayloadFile()
+ {
+ ZEN_TRACE_CPU("TempPayloadFile::Close");
+ try
+ {
+ if (m_FileHandle)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ // Mark file for deletion when final handle is closed
+ FILE_DISPOSITION_INFO Fdi{.DeleteFile = TRUE};
+
+ SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
+ BOOL Success = CloseHandle(m_FileHandle);
+#else
+ std::error_code Ec;
+ std::filesystem::path FilePath = zen::PathFromHandle(m_FileHandle, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Error reported on get file path from handle {} for temp payload unlink operation, reason '{}'",
+ m_FileHandle,
+ Ec.message());
+ }
+ else
+ {
+ unlink(FilePath.c_str());
+ }
+ int Fd = int(uintptr_t(m_FileHandle));
+ bool Success = (close(Fd) == 0);
+#endif
+ if (!Success)
+ {
+ ZEN_WARN("Error reported on file handle close, reason '{}'", GetLastErrorAsString());
+ }
+
+ m_FileHandle = nullptr;
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed deleting temp file {}. Reason '{}'", m_FileHandle, Ex.what());
+ }
+ }
+
+ std::error_code TempPayloadFile::Open(const std::filesystem::path& TempFolderPath, uint64_t FinalSize)
+ {
+ ZEN_TRACE_CPU("TempPayloadFile::Open");
+ ZEN_ASSERT(m_FileHandle == nullptr);
+
+ std::uint64_t TmpIndex = ((std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) & 0xffffffffu) << 32) |
+ detail::TempFileBaseIndex.fetch_add(1);
+
+ std::filesystem::path FileName = TempFolderPath / fmt::to_string(TmpIndex);
+#if ZEN_PLATFORM_WINDOWS
+ LPCWSTR lpFileName = FileName.c_str();
+ const DWORD dwDesiredAccess = (GENERIC_READ | GENERIC_WRITE | DELETE);
+ const DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE;
+ LPSECURITY_ATTRIBUTES lpSecurityAttributes = nullptr;
+ const DWORD dwCreationDisposition = CREATE_ALWAYS;
+ const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL;
+ const HANDLE hTemplateFile = nullptr;
+ const HANDLE FileHandle = CreateFile(lpFileName,
+ dwDesiredAccess,
+ dwShareMode,
+ lpSecurityAttributes,
+ dwCreationDisposition,
+ dwFlagsAndAttributes,
+ hTemplateFile);
+
+ if (FileHandle == INVALID_HANDLE_VALUE)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+#else // ZEN_PLATFORM_WINDOWS
+ int OpenFlags = O_RDWR | O_CREAT | O_TRUNC | O_CLOEXEC;
+ int Fd = open(FileName.c_str(), OpenFlags, 0666);
+ if (Fd < 0)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+ fchmod(Fd, 0666);
+
+ void* FileHandle = (void*)(uintptr_t(Fd));
+#endif // ZEN_PLATFORM_WINDOWS
+ m_FileHandle = FileHandle;
+
+ PrepareFileForScatteredWrite(m_FileHandle, FinalSize);
+
+ return {};
+ }
+
+ std::error_code TempPayloadFile::Write(std::string_view DataString)
+ {
+ ZEN_TRACE_CPU("TempPayloadFile::Write");
+ const uint8_t* DataPtr = (const uint8_t*)DataString.data();
+ size_t DataSize = DataString.size();
+ if (DataSize >= CacheBufferSize)
+ {
+ std::error_code Ec = Flush();
+ if (Ec)
+ {
+ return Ec;
+ }
+ return AppendData(DataPtr, DataSize);
+ }
+ size_t CopySize = Min(DataSize, CacheBufferSize - m_CacheBufferOffset);
+ memcpy(&m_CacheBuffer[m_CacheBufferOffset], DataPtr, CopySize);
+ m_CacheBufferOffset += CopySize;
+ DataSize -= CopySize;
+ if (m_CacheBufferOffset == CacheBufferSize)
+ {
+ AppendData(m_CacheBuffer, CacheBufferSize);
+ if (DataSize > 0)
+ {
+ ZEN_ASSERT(DataSize < CacheBufferSize);
+ memcpy(m_CacheBuffer, DataPtr + CopySize, DataSize);
+ }
+ m_CacheBufferOffset = DataSize;
+ }
+ else
+ {
+ ZEN_ASSERT(DataSize == 0);
+ }
+ return {};
+ }
+
+ IoBuffer TempPayloadFile::DetachToIoBuffer()
+ {
+ ZEN_TRACE_CPU("TempPayloadFile::DetachToIoBuffer");
+ if (std::error_code Ec = Flush(); Ec)
+ {
+ ThrowSystemError(Ec.value(), Ec.message());
+ }
+ ZEN_ASSERT(m_FileHandle != nullptr);
+ void* FileHandle = m_FileHandle;
+ IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true);
+ Buffer.SetDeleteOnClose(true);
+ m_FileHandle = 0;
+ m_WriteOffset = 0;
+ return Buffer;
+ }
+
+ IoBuffer TempPayloadFile::BorrowIoBuffer()
+ {
+ ZEN_TRACE_CPU("TempPayloadFile::BorrowIoBuffer");
+ if (std::error_code Ec = Flush(); Ec)
+ {
+ ThrowSystemError(Ec.value(), Ec.message());
+ }
+ ZEN_ASSERT(m_FileHandle != nullptr);
+ void* FileHandle = m_FileHandle;
+ IoBuffer Buffer(IoBuffer::BorrowedFile, FileHandle, 0, m_WriteOffset);
+ return Buffer;
+ }
+
+ void TempPayloadFile::ResetWritePos(uint64_t WriteOffset)
+ {
+ ZEN_TRACE_CPU("TempPayloadFile::ResetWritePos");
+ Flush();
+ m_WriteOffset = WriteOffset;
+ }
+
+ std::error_code TempPayloadFile::Flush()
+ {
+ ZEN_TRACE_CPU("TempPayloadFile::Flush");
+ if (m_CacheBufferOffset == 0)
+ {
+ return {};
+ }
+ std::error_code Res = AppendData(m_CacheBuffer, m_CacheBufferOffset);
+ m_CacheBufferOffset = 0;
+ return Res;
+ }
+
+ std::error_code TempPayloadFile::AppendData(const void* Data, uint64_t Size)
+ {
+ ZEN_TRACE_CPU("TempPayloadFile::AppendData");
+ ZEN_ASSERT(m_FileHandle != nullptr);
+ const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
+
+ while (Size)
+ {
+ const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize);
+ uint64_t NumberOfBytesWritten = 0;
+#if ZEN_PLATFORM_WINDOWS
+ OVERLAPPED Ovl{};
+
+ Ovl.Offset = DWORD(m_WriteOffset & 0xffff'ffffu);
+ Ovl.OffsetHigh = DWORD(m_WriteOffset >> 32);
+
+ DWORD dwNumberOfBytesWritten = 0;
+
+ BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl);
+ if (Success)
+ {
+ NumberOfBytesWritten = static_cast<uint64_t>(dwNumberOfBytesWritten);
+ }
+#else
+ static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files");
+ int Fd = int(uintptr_t(m_FileHandle));
+ int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, m_WriteOffset);
+ bool Success = (BytesWritten > 0);
+ if (Success)
+ {
+ NumberOfBytesWritten = static_cast<uint64_t>(BytesWritten);
+ }
+#endif
+
+ if (!Success)
+ {
+ return MakeErrorCodeFromLastError();
+ }
+
+ Size -= NumberOfBytesWritten;
+ m_WriteOffset += NumberOfBytesWritten;
+ Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesWritten;
+ }
+ return {};
+ }
+
+ BufferedReadFileStream::BufferedReadFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize, uint64_t BufferSize)
+ : m_FileHandle(FileHandle)
+ , m_FileSize(FileSize)
+ , m_FileEnd(FileOffset + FileSize)
+ , m_BufferSize(Min(BufferSize, FileSize))
+ , m_FileOffset(FileOffset)
+ {
+ }
+
+ BufferedReadFileStream::~BufferedReadFileStream() { Memory::Free(m_Buffer); }
+ void BufferedReadFileStream::Read(void* Data, uint64_t Size)
+ {
+ ZEN_ASSERT(Data != nullptr);
+ if (Size > m_BufferSize)
+ {
+ Read(Data, Size, m_FileOffset);
+ m_FileOffset += Size;
+ return;
+ }
+ uint8_t* WritePtr = ((uint8_t*)Data);
+ uint64_t Begin = m_FileOffset;
+ uint64_t End = m_FileOffset + Size;
+ ZEN_ASSERT(m_FileOffset >= m_BufferStart);
+ if (m_FileOffset < m_BufferEnd)
+ {
+ ZEN_ASSERT(m_Buffer != nullptr);
+ uint64_t Count = Min(m_BufferEnd, End) - m_FileOffset;
+ memcpy(WritePtr + Begin - m_FileOffset, m_Buffer + Begin - m_BufferStart, Count);
+ Begin += Count;
+ if (Begin == End)
+ {
+ m_FileOffset = End;
+ return;
+ }
+ }
+ if (End == m_FileEnd)
+ {
+ Read(WritePtr + Begin - m_FileOffset, End - Begin, Begin);
+ }
+ else
+ {
+ if (!m_Buffer)
+ {
+ m_BufferSize = Min(m_FileEnd - m_FileOffset, m_BufferSize);
+ m_Buffer = (uint8_t*)Memory::Alloc(gsl::narrow<size_t>(m_BufferSize));
+ }
+ m_BufferStart = Begin;
+ m_BufferEnd = Min(Begin + m_BufferSize, m_FileEnd);
+ Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart);
+ uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart;
+ memcpy(WritePtr + Begin - m_FileOffset, m_Buffer, Count);
+ ZEN_ASSERT(Begin + Count == End);
+ }
+ m_FileOffset = End;
+ }
+
+ void BufferedReadFileStream::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset)
+ {
+ const uint64_t MaxChunkSize = 1u * 1024 * 1024;
+ std::error_code Ec;
+ ReadFile(m_FileHandle, Data, BytesToRead, FileOffset, MaxChunkSize, Ec);
+
+ if (Ec)
+ {
+ std::error_code DummyEc;
+ throw std::system_error(
+ Ec,
+ fmt::format("HttpClient::BufferedReadFileStream ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})",
+ FileOffset,
+ BytesToRead,
+ PathFromHandle(m_FileHandle, DummyEc).generic_string(),
+ m_FileSize));
+ }
+ }
+
+ CompositeBufferReadStream::CompositeBufferReadStream(const CompositeBuffer& Data, uint64_t BufferSize)
+ : m_Data(Data)
+ , m_BufferSize(BufferSize)
+ , m_SegmentIndex(0)
+ , m_BytesLeftInSegment(0)
+ {
+ }
+ uint64_t CompositeBufferReadStream::Read(void* Data, uint64_t Size)
+ {
+ uint64_t Result = 0;
+ uint8_t* WritePtr = (uint8_t*)Data;
+ while ((Size > 0) && (m_SegmentIndex < m_Data.GetSegments().size()))
+ {
+ if (m_BytesLeftInSegment == 0)
+ {
+ const SharedBuffer& Segment = m_Data.GetSegments()[m_SegmentIndex];
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ if (Segment.AsIoBuffer().GetFileReference(FileRef))
+ {
+ m_SegmentDiskBuffer = std::make_unique<BufferedReadFileStream>(FileRef.FileHandle,
+ FileRef.FileChunkOffset,
+ FileRef.FileChunkSize,
+ m_BufferSize);
+ }
+ else
+ {
+ m_SegmentMemoryBuffer = Segment.GetView();
+ }
+ m_BytesLeftInSegment = Segment.GetSize();
+ }
+ uint64_t BytesToRead = Min(m_BytesLeftInSegment, Size);
+ if (m_SegmentDiskBuffer)
+ {
+ m_SegmentDiskBuffer->Read(WritePtr, BytesToRead);
+ }
+ else
+ {
+ ZEN_ASSERT_SLOW(m_SegmentMemoryBuffer.GetSize() >= BytesToRead);
+ memcpy(WritePtr, m_SegmentMemoryBuffer.GetData(), BytesToRead);
+ m_SegmentMemoryBuffer.MidInline(BytesToRead);
+ }
+ WritePtr += BytesToRead;
+ Size -= BytesToRead;
+ Result += BytesToRead;
+
+ m_BytesLeftInSegment -= BytesToRead;
+ if (m_BytesLeftInSegment == 0)
+ {
+ m_SegmentDiskBuffer.reset();
+ m_SegmentMemoryBuffer.Reset();
+ m_SegmentIndex++;
+ }
+ }
+ return Result;
+ }
+
+} // namespace detail
+
+} // namespace zen
+
+#if ZEN_WITH_TESTS
+namespace zen {
+
+namespace testutil {
+ IoHash HashComposite(const CompositeBuffer& Payload)
+ {
+ IoHashStream Hasher;
+ const uint64_t PayloadSize = Payload.GetSize();
+ std::vector<uint8_t> Buffer(64u * 1024u);
+ detail::CompositeBufferReadStream Stream(Payload, 137u * 1024u);
+ for (uint64_t Offset = 0; Offset < PayloadSize;)
+ {
+ uint64_t Count = Min(64u * 1024u, PayloadSize - Offset);
+ Stream.Read(Buffer.data(), Count);
+ Hasher.Append(Buffer.data(), Count);
+ Offset += Count;
+ }
+ return Hasher.GetHash();
+ };
+
+ IoHash HashFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize)
+ {
+ IoHashStream Hasher;
+ std::vector<uint8_t> Buffer(64u * 1024u);
+ detail::BufferedReadFileStream Stream(FileHandle, FileOffset, FileSize, 137u * 1024u);
+ for (uint64_t Offset = 0; Offset < FileSize;)
+ {
+ uint64_t Count = Min(64u * 1024u, FileSize - Offset);
+ Stream.Read(Buffer.data(), Count);
+ Hasher.Append(Buffer.data(), Count);
+ Offset += Count;
+ }
+ return Hasher.GetHash();
+ }
+
+} // namespace testutil
+
+TEST_CASE("BufferedReadFileStream")
+{
+ ScopedTemporaryDirectory TmpDir;
+
+ IoBuffer DiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer1");
+
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ CHECK(DiskBuffer.GetFileReference(FileRef));
+ CHECK_EQ(IoHash::HashBuffer(DiskBuffer), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize));
+
+ IoBuffer Partial(DiskBuffer, 37 * 1024, 512 * 1024);
+ CHECK(Partial.GetFileReference(FileRef));
+ CHECK_EQ(IoHash::HashBuffer(Partial), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize));
+
+ IoBuffer SmallDiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(63 * 1024)), TmpDir.Path() / "diskbuffer2");
+ CHECK(SmallDiskBuffer.GetFileReference(FileRef));
+ CHECK_EQ(IoHash::HashBuffer(SmallDiskBuffer),
+ testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize));
+}
+
+TEST_CASE("CompositeBufferReadStream")
+{
+ ScopedTemporaryDirectory TmpDir;
+
+ IoBuffer MemoryBuffer1 = CreateRandomBlob(64);
+ CHECK_EQ(IoHash::HashBuffer(MemoryBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer1))));
+
+ IoBuffer MemoryBuffer2 = CreateRandomBlob(561 * 1024);
+ CHECK_EQ(IoHash::HashBuffer(MemoryBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer2))));
+
+ IoBuffer DiskBuffer1 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(267 * 3 * 1024)), TmpDir.Path() / "diskbuffer1");
+ CHECK_EQ(IoHash::HashBuffer(DiskBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer1))));
+
+ IoBuffer DiskBuffer2 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(3 * 1024)), TmpDir.Path() / "diskbuffer2");
+ CHECK_EQ(IoHash::HashBuffer(DiskBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer2))));
+
+ IoBuffer DiskBuffer3 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer3");
+ CHECK_EQ(IoHash::HashBuffer(DiskBuffer3), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer3))));
+
+ CompositeBuffer Data(SharedBuffer(std::move(MemoryBuffer1)),
+ SharedBuffer(std::move(DiskBuffer1)),
+ SharedBuffer(std::move(DiskBuffer2)),
+ SharedBuffer(std::move(MemoryBuffer2)),
+ SharedBuffer(std::move(DiskBuffer3)));
+ CHECK_EQ(IoHash::HashBuffer(Data), testutil::HashComposite(Data));
+}
+
+} // namespace zen
+#endif