diff options
| author | Stefan Boberg <[email protected]> | 2025-09-30 19:07:51 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-09-30 19:07:51 +0200 |
| commit | 634181a04efff90def7a98d98eac7078e1d4e62d (patch) | |
| tree | 04678bba636a76d21f300ff6e73af4473274cf12 /src/zenhttp/clients/httpclientcommon.cpp | |
| parent | use batching clang-format for quicker turnaround on validate actions (#529) (diff) | |
| download | zen-634181a04efff90def7a98d98eac7078e1d4e62d.tar.xz zen-634181a04efff90def7a98d98eac7078e1d4e62d.zip | |
HttpClient support for pluggable back-ends (#532)
refactored HttpClient to separate out cpr implementation into separate classes, with an abstract base class to allow plugging in multiple implementations in the future
Diffstat (limited to 'src/zenhttp/clients/httpclientcommon.cpp')
| -rw-r--r-- | src/zenhttp/clients/httpclientcommon.cpp | 474 |
1 files changed, 474 insertions, 0 deletions
diff --git a/src/zenhttp/clients/httpclientcommon.cpp b/src/zenhttp/clients/httpclientcommon.cpp new file mode 100644 index 000000000..8e5136dff --- /dev/null +++ b/src/zenhttp/clients/httpclientcommon.cpp @@ -0,0 +1,474 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpclientcommon.h" + +#include <fmt/format.h> +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/iohash.h> +#include <zencore/logging.h> +#include <zencore/memory/memory.h> +#include <zencore/windows.h> +#include <gsl/gsl-lite.hpp> + +#if ZEN_WITH_TESTS +# include <zencore/basicfile.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif // ZEN_WITH_TESTS + +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC +# include <fcntl.h> +# include <sys/stat.h> +# include <unistd.h> +#endif + +namespace zen { + +using namespace std::literals; + +namespace detail { + + static std::atomic_uint32_t TempFileBaseIndex; + + TempPayloadFile::TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {} + TempPayloadFile::~TempPayloadFile() + { + ZEN_TRACE_CPU("TempPayloadFile::Close"); + try + { + if (m_FileHandle) + { +#if ZEN_PLATFORM_WINDOWS + // Mark file for deletion when final handle is closed + FILE_DISPOSITION_INFO Fdi{.DeleteFile = TRUE}; + + SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); + BOOL Success = CloseHandle(m_FileHandle); +#else + std::error_code Ec; + std::filesystem::path FilePath = zen::PathFromHandle(m_FileHandle, Ec); + if (Ec) + { + ZEN_WARN("Error reported on get file path from handle {} for temp payload unlink operation, reason '{}'", + m_FileHandle, + Ec.message()); + } + else + { + unlink(FilePath.c_str()); + } + int Fd = int(uintptr_t(m_FileHandle)); + bool Success = (close(Fd) == 0); +#endif + if (!Success) + { + ZEN_WARN("Error reported on file handle close, reason '{}'", GetLastErrorAsString()); + } + + m_FileHandle = nullptr; + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed deleting temp file {}. Reason '{}'", m_FileHandle, Ex.what()); + } + } + + std::error_code TempPayloadFile::Open(const std::filesystem::path& TempFolderPath, uint64_t FinalSize) + { + ZEN_TRACE_CPU("TempPayloadFile::Open"); + ZEN_ASSERT(m_FileHandle == nullptr); + + std::uint64_t TmpIndex = ((std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) & 0xffffffffu) << 32) | + detail::TempFileBaseIndex.fetch_add(1); + + std::filesystem::path FileName = TempFolderPath / fmt::to_string(TmpIndex); +#if ZEN_PLATFORM_WINDOWS + LPCWSTR lpFileName = FileName.c_str(); + const DWORD dwDesiredAccess = (GENERIC_READ | GENERIC_WRITE | DELETE); + const DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; + LPSECURITY_ATTRIBUTES lpSecurityAttributes = nullptr; + const DWORD dwCreationDisposition = CREATE_ALWAYS; + const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; + const HANDLE hTemplateFile = nullptr; + const HANDLE FileHandle = CreateFile(lpFileName, + dwDesiredAccess, + dwShareMode, + lpSecurityAttributes, + dwCreationDisposition, + dwFlagsAndAttributes, + hTemplateFile); + + if (FileHandle == INVALID_HANDLE_VALUE) + { + return MakeErrorCodeFromLastError(); + } +#else // ZEN_PLATFORM_WINDOWS + int OpenFlags = O_RDWR | O_CREAT | O_TRUNC | O_CLOEXEC; + int Fd = open(FileName.c_str(), OpenFlags, 0666); + if (Fd < 0) + { + return MakeErrorCodeFromLastError(); + } + fchmod(Fd, 0666); + + void* FileHandle = (void*)(uintptr_t(Fd)); +#endif // ZEN_PLATFORM_WINDOWS + m_FileHandle = FileHandle; + + PrepareFileForScatteredWrite(m_FileHandle, FinalSize); + + return {}; + } + + std::error_code TempPayloadFile::Write(std::string_view DataString) + { + ZEN_TRACE_CPU("TempPayloadFile::Write"); + const uint8_t* DataPtr = (const uint8_t*)DataString.data(); + size_t DataSize = DataString.size(); + if (DataSize >= CacheBufferSize) + { + std::error_code Ec = Flush(); + if (Ec) + { + return Ec; + } + return AppendData(DataPtr, DataSize); + } + size_t CopySize = Min(DataSize, CacheBufferSize - m_CacheBufferOffset); + memcpy(&m_CacheBuffer[m_CacheBufferOffset], DataPtr, CopySize); + m_CacheBufferOffset += CopySize; + DataSize -= CopySize; + if (m_CacheBufferOffset == CacheBufferSize) + { + AppendData(m_CacheBuffer, CacheBufferSize); + if (DataSize > 0) + { + ZEN_ASSERT(DataSize < CacheBufferSize); + memcpy(m_CacheBuffer, DataPtr + CopySize, DataSize); + } + m_CacheBufferOffset = DataSize; + } + else + { + ZEN_ASSERT(DataSize == 0); + } + return {}; + } + + IoBuffer TempPayloadFile::DetachToIoBuffer() + { + ZEN_TRACE_CPU("TempPayloadFile::DetachToIoBuffer"); + if (std::error_code Ec = Flush(); Ec) + { + ThrowSystemError(Ec.value(), Ec.message()); + } + ZEN_ASSERT(m_FileHandle != nullptr); + void* FileHandle = m_FileHandle; + IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true); + Buffer.SetDeleteOnClose(true); + m_FileHandle = 0; + m_WriteOffset = 0; + return Buffer; + } + + IoBuffer TempPayloadFile::BorrowIoBuffer() + { + ZEN_TRACE_CPU("TempPayloadFile::BorrowIoBuffer"); + if (std::error_code Ec = Flush(); Ec) + { + ThrowSystemError(Ec.value(), Ec.message()); + } + ZEN_ASSERT(m_FileHandle != nullptr); + void* FileHandle = m_FileHandle; + IoBuffer Buffer(IoBuffer::BorrowedFile, FileHandle, 0, m_WriteOffset); + return Buffer; + } + + void TempPayloadFile::ResetWritePos(uint64_t WriteOffset) + { + ZEN_TRACE_CPU("TempPayloadFile::ResetWritePos"); + Flush(); + m_WriteOffset = WriteOffset; + } + + std::error_code TempPayloadFile::Flush() + { + ZEN_TRACE_CPU("TempPayloadFile::Flush"); + if (m_CacheBufferOffset == 0) + { + return {}; + } + std::error_code Res = AppendData(m_CacheBuffer, m_CacheBufferOffset); + m_CacheBufferOffset = 0; + return Res; + } + + std::error_code TempPayloadFile::AppendData(const void* Data, uint64_t Size) + { + ZEN_TRACE_CPU("TempPayloadFile::AppendData"); + ZEN_ASSERT(m_FileHandle != nullptr); + const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; + + while (Size) + { + const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize); + uint64_t NumberOfBytesWritten = 0; +#if ZEN_PLATFORM_WINDOWS + OVERLAPPED Ovl{}; + + Ovl.Offset = DWORD(m_WriteOffset & 0xffff'ffffu); + Ovl.OffsetHigh = DWORD(m_WriteOffset >> 32); + + DWORD dwNumberOfBytesWritten = 0; + + BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl); + if (Success) + { + NumberOfBytesWritten = static_cast<uint64_t>(dwNumberOfBytesWritten); + } +#else + static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files"); + int Fd = int(uintptr_t(m_FileHandle)); + int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, m_WriteOffset); + bool Success = (BytesWritten > 0); + if (Success) + { + NumberOfBytesWritten = static_cast<uint64_t>(BytesWritten); + } +#endif + + if (!Success) + { + return MakeErrorCodeFromLastError(); + } + + Size -= NumberOfBytesWritten; + m_WriteOffset += NumberOfBytesWritten; + Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesWritten; + } + return {}; + } + + BufferedReadFileStream::BufferedReadFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize, uint64_t BufferSize) + : m_FileHandle(FileHandle) + , m_FileSize(FileSize) + , m_FileEnd(FileOffset + FileSize) + , m_BufferSize(Min(BufferSize, FileSize)) + , m_FileOffset(FileOffset) + { + } + + BufferedReadFileStream::~BufferedReadFileStream() { Memory::Free(m_Buffer); } + void BufferedReadFileStream::Read(void* Data, uint64_t Size) + { + ZEN_ASSERT(Data != nullptr); + if (Size > m_BufferSize) + { + Read(Data, Size, m_FileOffset); + m_FileOffset += Size; + return; + } + uint8_t* WritePtr = ((uint8_t*)Data); + uint64_t Begin = m_FileOffset; + uint64_t End = m_FileOffset + Size; + ZEN_ASSERT(m_FileOffset >= m_BufferStart); + if (m_FileOffset < m_BufferEnd) + { + ZEN_ASSERT(m_Buffer != nullptr); + uint64_t Count = Min(m_BufferEnd, End) - m_FileOffset; + memcpy(WritePtr + Begin - m_FileOffset, m_Buffer + Begin - m_BufferStart, Count); + Begin += Count; + if (Begin == End) + { + m_FileOffset = End; + return; + } + } + if (End == m_FileEnd) + { + Read(WritePtr + Begin - m_FileOffset, End - Begin, Begin); + } + else + { + if (!m_Buffer) + { + m_BufferSize = Min(m_FileEnd - m_FileOffset, m_BufferSize); + m_Buffer = (uint8_t*)Memory::Alloc(gsl::narrow<size_t>(m_BufferSize)); + } + m_BufferStart = Begin; + m_BufferEnd = Min(Begin + m_BufferSize, m_FileEnd); + Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart); + uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart; + memcpy(WritePtr + Begin - m_FileOffset, m_Buffer, Count); + ZEN_ASSERT(Begin + Count == End); + } + m_FileOffset = End; + } + + void BufferedReadFileStream::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset) + { + const uint64_t MaxChunkSize = 1u * 1024 * 1024; + std::error_code Ec; + ReadFile(m_FileHandle, Data, BytesToRead, FileOffset, MaxChunkSize, Ec); + + if (Ec) + { + std::error_code DummyEc; + throw std::system_error( + Ec, + fmt::format("HttpClient::BufferedReadFileStream ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})", + FileOffset, + BytesToRead, + PathFromHandle(m_FileHandle, DummyEc).generic_string(), + m_FileSize)); + } + } + + CompositeBufferReadStream::CompositeBufferReadStream(const CompositeBuffer& Data, uint64_t BufferSize) + : m_Data(Data) + , m_BufferSize(BufferSize) + , m_SegmentIndex(0) + , m_BytesLeftInSegment(0) + { + } + uint64_t CompositeBufferReadStream::Read(void* Data, uint64_t Size) + { + uint64_t Result = 0; + uint8_t* WritePtr = (uint8_t*)Data; + while ((Size > 0) && (m_SegmentIndex < m_Data.GetSegments().size())) + { + if (m_BytesLeftInSegment == 0) + { + const SharedBuffer& Segment = m_Data.GetSegments()[m_SegmentIndex]; + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if (Segment.AsIoBuffer().GetFileReference(FileRef)) + { + m_SegmentDiskBuffer = std::make_unique<BufferedReadFileStream>(FileRef.FileHandle, + FileRef.FileChunkOffset, + FileRef.FileChunkSize, + m_BufferSize); + } + else + { + m_SegmentMemoryBuffer = Segment.GetView(); + } + m_BytesLeftInSegment = Segment.GetSize(); + } + uint64_t BytesToRead = Min(m_BytesLeftInSegment, Size); + if (m_SegmentDiskBuffer) + { + m_SegmentDiskBuffer->Read(WritePtr, BytesToRead); + } + else + { + ZEN_ASSERT_SLOW(m_SegmentMemoryBuffer.GetSize() >= BytesToRead); + memcpy(WritePtr, m_SegmentMemoryBuffer.GetData(), BytesToRead); + m_SegmentMemoryBuffer.MidInline(BytesToRead); + } + WritePtr += BytesToRead; + Size -= BytesToRead; + Result += BytesToRead; + + m_BytesLeftInSegment -= BytesToRead; + if (m_BytesLeftInSegment == 0) + { + m_SegmentDiskBuffer.reset(); + m_SegmentMemoryBuffer.Reset(); + m_SegmentIndex++; + } + } + return Result; + } + +} // namespace detail + +} // namespace zen + +#if ZEN_WITH_TESTS +namespace zen { + +namespace testutil { + IoHash HashComposite(const CompositeBuffer& Payload) + { + IoHashStream Hasher; + const uint64_t PayloadSize = Payload.GetSize(); + std::vector<uint8_t> Buffer(64u * 1024u); + detail::CompositeBufferReadStream Stream(Payload, 137u * 1024u); + for (uint64_t Offset = 0; Offset < PayloadSize;) + { + uint64_t Count = Min(64u * 1024u, PayloadSize - Offset); + Stream.Read(Buffer.data(), Count); + Hasher.Append(Buffer.data(), Count); + Offset += Count; + } + return Hasher.GetHash(); + }; + + IoHash HashFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize) + { + IoHashStream Hasher; + std::vector<uint8_t> Buffer(64u * 1024u); + detail::BufferedReadFileStream Stream(FileHandle, FileOffset, FileSize, 137u * 1024u); + for (uint64_t Offset = 0; Offset < FileSize;) + { + uint64_t Count = Min(64u * 1024u, FileSize - Offset); + Stream.Read(Buffer.data(), Count); + Hasher.Append(Buffer.data(), Count); + Offset += Count; + } + return Hasher.GetHash(); + } + +} // namespace testutil + +TEST_CASE("BufferedReadFileStream") +{ + ScopedTemporaryDirectory TmpDir; + + IoBuffer DiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer1"); + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + CHECK(DiskBuffer.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); + + IoBuffer Partial(DiskBuffer, 37 * 1024, 512 * 1024); + CHECK(Partial.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(Partial), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); + + IoBuffer SmallDiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(63 * 1024)), TmpDir.Path() / "diskbuffer2"); + CHECK(SmallDiskBuffer.GetFileReference(FileRef)); + CHECK_EQ(IoHash::HashBuffer(SmallDiskBuffer), + testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); +} + +TEST_CASE("CompositeBufferReadStream") +{ + ScopedTemporaryDirectory TmpDir; + + IoBuffer MemoryBuffer1 = CreateRandomBlob(64); + CHECK_EQ(IoHash::HashBuffer(MemoryBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer1)))); + + IoBuffer MemoryBuffer2 = CreateRandomBlob(561 * 1024); + CHECK_EQ(IoHash::HashBuffer(MemoryBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer2)))); + + IoBuffer DiskBuffer1 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(267 * 3 * 1024)), TmpDir.Path() / "diskbuffer1"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer1)))); + + IoBuffer DiskBuffer2 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(3 * 1024)), TmpDir.Path() / "diskbuffer2"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer2)))); + + IoBuffer DiskBuffer3 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer3"); + CHECK_EQ(IoHash::HashBuffer(DiskBuffer3), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer3)))); + + CompositeBuffer Data(SharedBuffer(std::move(MemoryBuffer1)), + SharedBuffer(std::move(DiskBuffer1)), + SharedBuffer(std::move(DiskBuffer2)), + SharedBuffer(std::move(MemoryBuffer2)), + SharedBuffer(std::move(DiskBuffer3))); + CHECK_EQ(IoHash::HashBuffer(Data), testutil::HashComposite(Data)); +} + +} // namespace zen +#endif |