// Copyright Epic Games, Inc. All Rights Reserved. #include "httpclientcommon.h" #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include #endif // ZEN_WITH_TESTS #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC # include # include # include #endif namespace zen { using namespace std::literals; namespace detail { static std::atomic_uint32_t TempFileBaseIndex; TempPayloadFile::TempPayloadFile() : m_FileHandle(nullptr), m_WriteOffset(0) {} TempPayloadFile::~TempPayloadFile() { ZEN_TRACE_CPU("TempPayloadFile::Close"); try { if (m_FileHandle) { #if ZEN_PLATFORM_WINDOWS // Mark file for deletion when final handle is closed FILE_DISPOSITION_INFO Fdi{.DeleteFile = TRUE}; SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); BOOL Success = CloseHandle(m_FileHandle); #else std::error_code Ec; std::filesystem::path FilePath = zen::PathFromHandle(m_FileHandle, Ec); if (Ec) { ZEN_WARN("Error reported on get file path from handle {} for temp payload unlink operation, reason '{}'", m_FileHandle, Ec.message()); } else { unlink(FilePath.c_str()); } int Fd = int(uintptr_t(m_FileHandle)); bool Success = (close(Fd) == 0); #endif if (!Success) { ZEN_WARN("Error reported on file handle close, reason '{}'", GetLastErrorAsString()); } m_FileHandle = nullptr; } } catch (const std::exception& Ex) { ZEN_ERROR("Failed deleting temp file {}. Reason '{}'", m_FileHandle, Ex.what()); } } std::error_code TempPayloadFile::Open(const std::filesystem::path& TempFolderPath, uint64_t FinalSize) { ZEN_TRACE_CPU("TempPayloadFile::Open"); ZEN_ASSERT(m_FileHandle == nullptr); std::uint64_t TmpIndex = ((std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) & 0xffffffffu) << 32) | detail::TempFileBaseIndex.fetch_add(1); std::filesystem::path FileName = TempFolderPath / fmt::to_string(TmpIndex); #if ZEN_PLATFORM_WINDOWS LPCWSTR lpFileName = FileName.c_str(); const DWORD dwDesiredAccess = (GENERIC_READ | GENERIC_WRITE | DELETE); const DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; LPSECURITY_ATTRIBUTES lpSecurityAttributes = nullptr; const DWORD dwCreationDisposition = CREATE_ALWAYS; const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; const HANDLE hTemplateFile = nullptr; const HANDLE FileHandle = CreateFile(lpFileName, dwDesiredAccess, dwShareMode, lpSecurityAttributes, dwCreationDisposition, dwFlagsAndAttributes, hTemplateFile); if (FileHandle == INVALID_HANDLE_VALUE) { return MakeErrorCodeFromLastError(); } #else // ZEN_PLATFORM_WINDOWS int OpenFlags = O_RDWR | O_CREAT | O_TRUNC | O_CLOEXEC; int Fd = open(FileName.c_str(), OpenFlags, 0666); if (Fd < 0) { return MakeErrorCodeFromLastError(); } fchmod(Fd, 0666); void* FileHandle = (void*)(uintptr_t(Fd)); #endif // ZEN_PLATFORM_WINDOWS m_FileHandle = FileHandle; PrepareFileForScatteredWrite(m_FileHandle, FinalSize); return {}; } std::error_code TempPayloadFile::Write(std::string_view DataString) { ZEN_TRACE_CPU("TempPayloadFile::Write"); const uint8_t* DataPtr = (const uint8_t*)DataString.data(); size_t DataSize = DataString.size(); if (DataSize >= CacheBufferSize) { std::error_code Ec = Flush(); if (Ec) { return Ec; } return AppendData(DataPtr, DataSize); } size_t CopySize = Min(DataSize, CacheBufferSize - m_CacheBufferOffset); memcpy(&m_CacheBuffer[m_CacheBufferOffset], DataPtr, CopySize); m_CacheBufferOffset += CopySize; DataSize -= CopySize; if (m_CacheBufferOffset == CacheBufferSize) { AppendData(m_CacheBuffer, CacheBufferSize); if (DataSize > 0) { ZEN_ASSERT(DataSize < CacheBufferSize); memcpy(m_CacheBuffer, DataPtr + CopySize, DataSize); } m_CacheBufferOffset = DataSize; } else { ZEN_ASSERT(DataSize == 0); } return {}; } IoBuffer TempPayloadFile::DetachToIoBuffer() { ZEN_TRACE_CPU("TempPayloadFile::DetachToIoBuffer"); if (std::error_code Ec = Flush(); Ec) { ThrowSystemError(Ec.value(), Ec.message()); } ZEN_ASSERT(m_FileHandle != nullptr); void* FileHandle = m_FileHandle; IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true); Buffer.SetDeleteOnClose(true); m_FileHandle = 0; m_WriteOffset = 0; return Buffer; } IoBuffer TempPayloadFile::BorrowIoBuffer() { ZEN_TRACE_CPU("TempPayloadFile::BorrowIoBuffer"); if (std::error_code Ec = Flush(); Ec) { ThrowSystemError(Ec.value(), Ec.message()); } ZEN_ASSERT(m_FileHandle != nullptr); void* FileHandle = m_FileHandle; IoBuffer Buffer(IoBuffer::BorrowedFile, FileHandle, 0, m_WriteOffset); return Buffer; } void TempPayloadFile::ResetWritePos(uint64_t WriteOffset) { ZEN_TRACE_CPU("TempPayloadFile::ResetWritePos"); Flush(); m_WriteOffset = WriteOffset; } std::error_code TempPayloadFile::Flush() { ZEN_TRACE_CPU("TempPayloadFile::Flush"); if (m_CacheBufferOffset == 0) { return {}; } std::error_code Res = AppendData(m_CacheBuffer, m_CacheBufferOffset); m_CacheBufferOffset = 0; return Res; } std::error_code TempPayloadFile::AppendData(const void* Data, uint64_t Size) { ZEN_TRACE_CPU("TempPayloadFile::AppendData"); ZEN_ASSERT(m_FileHandle != nullptr); const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; while (Size) { const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize); uint64_t NumberOfBytesWritten = 0; #if ZEN_PLATFORM_WINDOWS OVERLAPPED Ovl{}; Ovl.Offset = DWORD(m_WriteOffset & 0xffff'ffffu); Ovl.OffsetHigh = DWORD(m_WriteOffset >> 32); DWORD dwNumberOfBytesWritten = 0; BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl); if (Success) { NumberOfBytesWritten = static_cast(dwNumberOfBytesWritten); } #else static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files"); int Fd = int(uintptr_t(m_FileHandle)); int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, m_WriteOffset); bool Success = (BytesWritten > 0); if (Success) { NumberOfBytesWritten = static_cast(BytesWritten); } #endif if (!Success) { return MakeErrorCodeFromLastError(); } Size -= NumberOfBytesWritten; m_WriteOffset += NumberOfBytesWritten; Data = reinterpret_cast(Data) + NumberOfBytesWritten; } return {}; } BufferedReadFileStream::BufferedReadFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize, uint64_t BufferSize) : m_FileHandle(FileHandle) , m_FileSize(FileSize) , m_FileEnd(FileOffset + FileSize) , m_BufferSize(Min(BufferSize, FileSize)) , m_FileOffset(FileOffset) { } BufferedReadFileStream::~BufferedReadFileStream() { Memory::Free(m_Buffer); } void BufferedReadFileStream::Read(void* Data, uint64_t Size) { ZEN_ASSERT(Data != nullptr); if (Size > m_BufferSize) { Read(Data, Size, m_FileOffset); m_FileOffset += Size; return; } uint8_t* WritePtr = ((uint8_t*)Data); uint64_t Begin = m_FileOffset; uint64_t End = m_FileOffset + Size; ZEN_ASSERT(m_FileOffset >= m_BufferStart); if (m_FileOffset < m_BufferEnd) { ZEN_ASSERT(m_Buffer != nullptr); uint64_t Count = Min(m_BufferEnd, End) - m_FileOffset; memcpy(WritePtr + Begin - m_FileOffset, m_Buffer + Begin - m_BufferStart, Count); Begin += Count; if (Begin == End) { m_FileOffset = End; return; } } if (End == m_FileEnd) { Read(WritePtr + Begin - m_FileOffset, End - Begin, Begin); } else { if (!m_Buffer) { m_BufferSize = Min(m_FileEnd - m_FileOffset, m_BufferSize); m_Buffer = (uint8_t*)Memory::Alloc(gsl::narrow(m_BufferSize)); } m_BufferStart = Begin; m_BufferEnd = Min(Begin + m_BufferSize, m_FileEnd); Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart); uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart; memcpy(WritePtr + Begin - m_FileOffset, m_Buffer, Count); ZEN_ASSERT(Begin + Count == End); } m_FileOffset = End; } void BufferedReadFileStream::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset) { const uint64_t MaxChunkSize = 512u * 1024u; std::error_code Ec; ReadFile(m_FileHandle, Data, BytesToRead, FileOffset, MaxChunkSize, Ec); if (Ec) { std::error_code DummyEc; throw std::system_error( Ec, fmt::format("HttpClient::BufferedReadFileStream ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})", FileOffset, BytesToRead, PathFromHandle(m_FileHandle, DummyEc).generic_string(), m_FileSize)); } } CompositeBufferReadStream::CompositeBufferReadStream(const CompositeBuffer& Data, uint64_t BufferSize) : m_Data(Data) , m_BufferSize(BufferSize) , m_SegmentIndex(0) , m_BytesLeftInSegment(0) { } uint64_t CompositeBufferReadStream::Read(void* Data, uint64_t Size) { uint64_t Result = 0; uint8_t* WritePtr = (uint8_t*)Data; while ((Size > 0) && (m_SegmentIndex < m_Data.GetSegments().size())) { if (m_BytesLeftInSegment == 0) { const SharedBuffer& Segment = m_Data.GetSegments()[m_SegmentIndex]; IoBufferFileReference FileRef = {nullptr, 0, 0}; if (Segment.AsIoBuffer().GetFileReference(FileRef)) { m_SegmentDiskBuffer = std::make_unique(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, m_BufferSize); } else { m_SegmentMemoryBuffer = Segment.GetView(); } m_BytesLeftInSegment = Segment.GetSize(); } uint64_t BytesToRead = Min(m_BytesLeftInSegment, Size); if (m_SegmentDiskBuffer) { m_SegmentDiskBuffer->Read(WritePtr, BytesToRead); } else { ZEN_ASSERT_SLOW(m_SegmentMemoryBuffer.GetSize() >= BytesToRead); memcpy(WritePtr, m_SegmentMemoryBuffer.GetData(), BytesToRead); m_SegmentMemoryBuffer.MidInline(BytesToRead); } WritePtr += BytesToRead; Size -= BytesToRead; Result += BytesToRead; m_BytesLeftInSegment -= BytesToRead; if (m_BytesLeftInSegment == 0) { m_SegmentDiskBuffer.reset(); m_SegmentMemoryBuffer.Reset(); m_SegmentIndex++; } } return Result; } } // namespace detail } // namespace zen #if ZEN_WITH_TESTS namespace zen { namespace testutil { IoHash HashComposite(const CompositeBuffer& Payload) { IoHashStream Hasher; const uint64_t PayloadSize = Payload.GetSize(); std::vector Buffer(64u * 1024u); detail::CompositeBufferReadStream Stream(Payload, 137u * 1024u); for (uint64_t Offset = 0; Offset < PayloadSize;) { uint64_t Count = Min(64u * 1024u, PayloadSize - Offset); Stream.Read(Buffer.data(), Count); Hasher.Append(Buffer.data(), Count); Offset += Count; } return Hasher.GetHash(); }; IoHash HashFileStream(void* FileHandle, uint64_t FileOffset, uint64_t FileSize) { IoHashStream Hasher; std::vector Buffer(64u * 1024u); detail::BufferedReadFileStream Stream(FileHandle, FileOffset, FileSize, 137u * 1024u); for (uint64_t Offset = 0; Offset < FileSize;) { uint64_t Count = Min(64u * 1024u, FileSize - Offset); Stream.Read(Buffer.data(), Count); Hasher.Append(Buffer.data(), Count); Offset += Count; } return Hasher.GetHash(); } } // namespace testutil TEST_CASE("BufferedReadFileStream") { ScopedTemporaryDirectory TmpDir; IoBuffer DiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer1"); IoBufferFileReference FileRef = {nullptr, 0, 0}; CHECK(DiskBuffer.GetFileReference(FileRef)); CHECK_EQ(IoHash::HashBuffer(DiskBuffer), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); IoBuffer Partial(DiskBuffer, 37 * 1024, 512 * 1024); CHECK(Partial.GetFileReference(FileRef)); CHECK_EQ(IoHash::HashBuffer(Partial), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); IoBuffer SmallDiskBuffer = WriteToTempFile(CompositeBuffer(CreateRandomBlob(63 * 1024)), TmpDir.Path() / "diskbuffer2"); CHECK(SmallDiskBuffer.GetFileReference(FileRef)); CHECK_EQ(IoHash::HashBuffer(SmallDiskBuffer), testutil::HashFileStream(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize)); } TEST_CASE("CompositeBufferReadStream") { ScopedTemporaryDirectory TmpDir; IoBuffer MemoryBuffer1 = CreateRandomBlob(64); CHECK_EQ(IoHash::HashBuffer(MemoryBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer1)))); IoBuffer MemoryBuffer2 = CreateRandomBlob(561 * 1024); CHECK_EQ(IoHash::HashBuffer(MemoryBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(MemoryBuffer2)))); IoBuffer DiskBuffer1 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(267 * 3 * 1024)), TmpDir.Path() / "diskbuffer1"); CHECK_EQ(IoHash::HashBuffer(DiskBuffer1), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer1)))); IoBuffer DiskBuffer2 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(3 * 1024)), TmpDir.Path() / "diskbuffer2"); CHECK_EQ(IoHash::HashBuffer(DiskBuffer2), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer2)))); IoBuffer DiskBuffer3 = WriteToTempFile(CompositeBuffer(CreateRandomBlob(496 * 5 * 1024)), TmpDir.Path() / "diskbuffer3"); CHECK_EQ(IoHash::HashBuffer(DiskBuffer3), testutil::HashComposite(CompositeBuffer(SharedBuffer(DiskBuffer3)))); CompositeBuffer Data(SharedBuffer(std::move(MemoryBuffer1)), SharedBuffer(std::move(DiskBuffer1)), SharedBuffer(std::move(DiskBuffer2)), SharedBuffer(std::move(MemoryBuffer2)), SharedBuffer(std::move(DiskBuffer3))); CHECK_EQ(IoHash::HashBuffer(Data), testutil::HashComposite(Data)); } } // namespace zen #endif