diff options
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/basicfile.cpp | 1033 | ||||
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 5 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/basicfile.h | 185 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupiterclient.h | 57 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupitersession.h | 152 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/logging/rotatingfilesink.h | 2 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/packageformat.h | 164 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupiterclient.cpp | 29 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 505 | ||||
| -rw-r--r-- | src/zenutil/packageformat.cpp | 894 | ||||
| -rw-r--r-- | src/zenutil/xmake.lua | 2 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 4 |
13 files changed, 749 insertions, 2285 deletions
diff --git a/src/zenutil/basicfile.cpp b/src/zenutil/basicfile.cpp deleted file mode 100644 index 391c150c6..000000000 --- a/src/zenutil/basicfile.cpp +++ /dev/null @@ -1,1033 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "zenutil/basicfile.h" - -#include <zencore/compactbinary.h> -#include <zencore/except.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/memory/memory.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> - -#if ZEN_PLATFORM_WINDOWS -# include <zencore/windows.h> -#else -# include <fcntl.h> -# include <sys/file.h> -# include <sys/stat.h> -# include <unistd.h> -#endif - -#include <fmt/format.h> -#include <gsl/gsl-lite.hpp> - -namespace zen { - -BasicFile::~BasicFile() -{ - Close(); -} - -void -BasicFile::Open(const std::filesystem::path& FileName, Mode Mode) -{ - std::error_code Ec; - Open(FileName, Mode, Ec); - - if (Ec) - { - throw std::system_error(Ec, fmt::format("failed to open file '{}', mode: {:x}", FileName, uint32_t(Mode))); - } -} - -void -BasicFile::Open(const std::filesystem::path& FileName, Mode InMode, std::error_code& Ec) -{ - Ec.clear(); - - Mode Mode = InMode & Mode::kModeMask; - -#if ZEN_PLATFORM_WINDOWS - DWORD dwCreationDisposition = 0; - DWORD dwDesiredAccess = 0; - switch (Mode) - { - case Mode::kRead: - dwCreationDisposition |= OPEN_EXISTING; - dwDesiredAccess |= GENERIC_READ; - break; - case Mode::kWrite: - dwCreationDisposition |= OPEN_ALWAYS; - dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE); - break; - case Mode::kDelete: - dwCreationDisposition |= OPEN_ALWAYS; - dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE | DELETE); - break; - case Mode::kTruncate: - dwCreationDisposition |= CREATE_ALWAYS; - dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE); - break; - case Mode::kTruncateDelete: - dwCreationDisposition |= CREATE_ALWAYS; - dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE | DELETE); - break; - } - - const DWORD dwShareMode = FILE_SHARE_READ | (EnumHasAllFlags(InMode, Mode::kPreventWrite) ? 0 : FILE_SHARE_WRITE) | - (EnumHasAllFlags(InMode, Mode::kPreventDelete) ? 0 : FILE_SHARE_DELETE); - const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; - const HANDLE hTemplateFile = nullptr; - const HANDLE FileHandle = CreateFile(FileName.c_str(), - dwDesiredAccess, - dwShareMode, - /* lpSecurityAttributes */ nullptr, - dwCreationDisposition, - dwFlagsAndAttributes, - hTemplateFile); - - if (FileHandle == INVALID_HANDLE_VALUE) - { - Ec = MakeErrorCodeFromLastError(); - - return; - } -#else - int OpenFlags = O_CLOEXEC; - switch (Mode) - { - case Mode::kRead: - OpenFlags |= O_RDONLY; - break; - case Mode::kWrite: - case Mode::kDelete: - OpenFlags |= (O_RDWR | O_CREAT); - break; - case Mode::kTruncate: - case Mode::kTruncateDelete: - OpenFlags |= (O_RDWR | O_CREAT | O_TRUNC); - break; - } - - int Fd = open(FileName.c_str(), OpenFlags, 0666); - if (Fd < 0) - { - Ec = MakeErrorCodeFromLastError(); - return; - } - if (Mode != Mode::kRead) - { - fchmod(Fd, 0666); - } - - void* FileHandle = (void*)(uintptr_t(Fd)); -#endif - - m_FileHandle = FileHandle; -} - -void -BasicFile::Open(const std::filesystem::path& FileName, Mode Mode, std::function<bool(std::error_code& Ec)>&& RetryCallback) -{ - std::error_code Ec; - Open(FileName, Mode, Ec); - while (Ec && RetryCallback(Ec)) - { - Ec.clear(); - Open(FileName, Mode, Ec); - } - if (Ec) - { - throw std::system_error(Ec, fmt::format("failed to open file '{}', mode: {:x}", FileName, uint32_t(Mode))); - } -} - -void -BasicFile::Close() -{ - if (m_FileHandle) - { -#if ZEN_PLATFORM_WINDOWS - ::CloseHandle(m_FileHandle); -#else - int Fd = int(uintptr_t(m_FileHandle)); - close(Fd); -#endif - m_FileHandle = nullptr; - } -} - -IoBuffer -BasicFile::ReadRange(uint64_t FileOffset, uint64_t ByteCount) -{ - return IoBufferBuilder::MakeFromFileHandle(m_FileHandle, FileOffset, ByteCount); -} - -void -BasicFile::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset) -{ - const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; - - while (BytesToRead) - { - const uint64_t NumberOfBytesToRead = Min(BytesToRead, MaxChunkSize); - int32_t Error = 0; - size_t BytesRead = 0; - -#if ZEN_PLATFORM_WINDOWS - OVERLAPPED Ovl{}; - - Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu); - Ovl.OffsetHigh = DWORD(FileOffset >> 32); - - DWORD dwNumberOfBytesRead = 0; - BOOL Success = ::ReadFile(m_FileHandle, Data, DWORD(NumberOfBytesToRead), &dwNumberOfBytesRead, &Ovl); - if (Success) - { - BytesRead = size_t(dwNumberOfBytesRead); - } - else - { - Error = zen::GetLastError(); - } -#else - static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files"); - int Fd = int(uintptr_t(m_FileHandle)); - ssize_t ReadResult = pread(Fd, Data, NumberOfBytesToRead, FileOffset); - if (ReadResult != -1) - { - BytesRead = size_t(ReadResult); - } - else - { - Error = zen::GetLastError(); - } -#endif - - if (Error || (BytesRead != NumberOfBytesToRead)) - { - std::error_code DummyEc; - throw std::system_error(std::error_code(Error, std::system_category()), - fmt::format("ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})", - FileOffset, - NumberOfBytesToRead, - PathFromHandle(m_FileHandle, DummyEc), - FileSizeFromHandle(m_FileHandle))); - } - - BytesToRead -= NumberOfBytesToRead; - FileOffset += NumberOfBytesToRead; - Data = reinterpret_cast<uint8_t*>(Data) + NumberOfBytesToRead; - } -} - -IoBuffer -BasicFile::ReadAll() -{ - if (const uint64_t Size = FileSize()) - { - IoBuffer Buffer(Size); - Read(Buffer.MutableData(), Size, 0); - return Buffer; - } - else - { - return {}; - } -} - -void -BasicFile::StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) -{ - StreamByteRange(0, FileSize(), std::move(ChunkFun)); -} - -void -BasicFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) -{ - const uint64_t ChunkSize = 128 * 1024; - IoBuffer ReadBuffer{ChunkSize}; - void* BufferPtr = ReadBuffer.MutableData(); - - uint64_t RemainBytes = Size; - uint64_t CurrentOffset = FileOffset; - - while (RemainBytes) - { - const uint64_t ThisChunkBytes = zen::Min(ChunkSize, RemainBytes); - - Read(BufferPtr, ThisChunkBytes, CurrentOffset); - - ChunkFun(BufferPtr, ThisChunkBytes); - - CurrentOffset += ThisChunkBytes; - RemainBytes -= ThisChunkBytes; - } -} - -uint64_t -BasicFile::Write(CompositeBuffer Data, uint64_t FileOffset, std::error_code& Ec) -{ - uint64_t WrittenBytes = 0; - for (const SharedBuffer& Buffer : Data.GetSegments()) - { - MemoryView BlockView = Buffer.GetView(); - Write(BlockView, FileOffset + WrittenBytes, Ec); - - if (Ec) - { - return WrittenBytes; - } - - WrittenBytes += BlockView.GetSize(); - } - - return WrittenBytes; -} - -void -BasicFile::Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec) -{ - Write(Data.GetData(), Data.GetSize(), FileOffset, Ec); -} - -void -BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec) -{ - Ec.clear(); - - const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; - - while (Size) - { - const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize); - -#if ZEN_PLATFORM_WINDOWS - OVERLAPPED Ovl{}; - - Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu); - Ovl.OffsetHigh = DWORD(FileOffset >> 32); - - DWORD dwNumberOfBytesWritten = 0; - - BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl); -#else - static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files"); - int Fd = int(uintptr_t(m_FileHandle)); - int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, FileOffset); - bool Success = (BytesWritten > 0); -#endif - - if (!Success) - { - Ec = MakeErrorCodeFromLastError(); - - return; - } - - Size -= NumberOfBytesToWrite; - FileOffset += NumberOfBytesToWrite; - Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesToWrite; - } -} - -void -BasicFile::Write(MemoryView Data, uint64_t FileOffset) -{ - Write(Data.GetData(), Data.GetSize(), FileOffset); -} - -void -BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset) -{ - std::error_code Ec; - Write(Data, Size, Offset, Ec); - - if (Ec) - { - std::error_code Dummy; - throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle, Dummy))); - } -} - -void -BasicFile::WriteAll(IoBuffer Data, std::error_code& Ec) -{ - Write(Data.Data(), Data.Size(), 0, Ec); -} - -void -BasicFile::Flush() -{ - if (m_FileHandle == nullptr) - { - return; - } -#if ZEN_PLATFORM_WINDOWS - FlushFileBuffers(m_FileHandle); -#else - int Fd = int(uintptr_t(m_FileHandle)); - fsync(Fd); -#endif -} - -uint64_t -BasicFile::FileSize() const -{ -#if ZEN_PLATFORM_WINDOWS - ULARGE_INTEGER liFileSize; - liFileSize.LowPart = ::GetFileSize(m_FileHandle, &liFileSize.HighPart); - if (liFileSize.LowPart == INVALID_FILE_SIZE) - { - int Error = zen::GetLastError(); - if (Error) - { - std::error_code Dummy; - ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy))); - } - } - return uint64_t(liFileSize.QuadPart); -#else - int Fd = int(uintptr_t(m_FileHandle)); - static_assert(sizeof(decltype(stat::st_size)) == sizeof(uint64_t), "fstat() doesn't support large files"); - struct stat Stat; - if (fstat(Fd, &Stat) == -1) - { - std::error_code Dummy; - ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy))); - } - return uint64_t(Stat.st_size); -#endif -} - -uint64_t -BasicFile::FileSize(std::error_code& Ec) const -{ -#if ZEN_PLATFORM_WINDOWS - ULARGE_INTEGER liFileSize; - liFileSize.LowPart = ::GetFileSize(m_FileHandle, &liFileSize.HighPart); - if (liFileSize.LowPart == INVALID_FILE_SIZE) - { - int Error = zen::GetLastError(); - if (Error) - { - Ec = MakeErrorCode(Error); - return 0; - } - } - return uint64_t(liFileSize.QuadPart); -#else - int Fd = int(uintptr_t(m_FileHandle)); - static_assert(sizeof(decltype(stat::st_size)) == sizeof(uint64_t), "fstat() doesn't support large files"); - struct stat Stat; - if (fstat(Fd, &Stat) == -1) - { - Ec = MakeErrorCodeFromLastError(); - return 0; - } - return uint64_t(Stat.st_size); -#endif -} - -void -BasicFile::SetFileSize(uint64_t FileSize) -{ -#if ZEN_PLATFORM_WINDOWS - LARGE_INTEGER liFileSize; - liFileSize.QuadPart = FileSize; - BOOL OK = ::SetFilePointerEx(m_FileHandle, liFileSize, 0, FILE_BEGIN); - if (OK == FALSE) - { - int Error = zen::GetLastError(); - if (Error) - { - std::error_code Dummy; - ThrowSystemError(Error, - fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); - } - } - OK = ::SetEndOfFile(m_FileHandle); - if (OK == FALSE) - { - int Error = zen::GetLastError(); - if (Error) - { - std::error_code Dummy; - ThrowSystemError(Error, - fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); - } - } -#elif ZEN_PLATFORM_MAC - int Fd = int(intptr_t(m_FileHandle)); - if (ftruncate(Fd, (off_t)FileSize) < 0) - { - int Error = zen::GetLastError(); - if (Error) - { - std::error_code Dummy; - ThrowSystemError(Error, - fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); - } - } -#else - int Fd = int(intptr_t(m_FileHandle)); - if (ftruncate64(Fd, (off64_t)FileSize) < 0) - { - int Error = zen::GetLastError(); - if (Error) - { - std::error_code Dummy; - ThrowSystemError(Error, - fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); - } - } - if (FileSize > 0) - { - int Error = posix_fallocate64(Fd, 0, (off64_t)FileSize); - if (Error) - { - std::error_code Dummy; - ThrowSystemError(Error, - fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); - } - } -#endif -} - -void -BasicFile::Attach(void* Handle) -{ - ZEN_ASSERT(Handle != nullptr); - ZEN_ASSERT(m_FileHandle == nullptr); - m_FileHandle = Handle; -} - -void* -BasicFile::Detach() -{ - void* FileHandle = m_FileHandle; - m_FileHandle = 0; - return FileHandle; -} - -////////////////////////////////////////////////////////////////////////// - -TemporaryFile::~TemporaryFile() -{ - Close(); -} - -void -TemporaryFile::Close() -{ - if (m_FileHandle) - { -#if ZEN_PLATFORM_WINDOWS - // Mark file for deletion when final handle is closed - - FILE_DISPOSITION_INFO Fdi{.DeleteFile = TRUE}; - - SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi); -#else - std::error_code Ec; - std::filesystem::path FilePath = zen::PathFromHandle(m_FileHandle, Ec); - if (!Ec) - { - unlink(FilePath.c_str()); - } -#endif - - BasicFile::Close(); - } -} - -void -TemporaryFile::CreateTemporary(std::filesystem::path TempDirName, std::error_code& Ec) -{ - StringBuilder<64> TempName; - Oid::NewOid().ToString(TempName); - - m_TempPath = TempDirName / TempName.c_str(); - - Open(m_TempPath, BasicFile::Mode::kTruncateDelete, Ec); -} - -void -TemporaryFile::MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::error_code& Ec) -{ - // We intentionally call the base class Close() since otherwise we'll end up - // deleting the temporary file - BasicFile::Close(); - - std::filesystem::rename(m_TempPath, FinalFileName, Ec); - - if (Ec) - { - // Try to re-open the temp file so we clean up after us when TemporaryFile is destructed - std::error_code DummyEc; - Open(m_TempPath, BasicFile::Mode::kWrite, DummyEc); - } -} - -////////////////////////////////////////////////////////////////////////// - -void -TemporaryFile::SafeWriteFile(const std::filesystem::path& Path, MemoryView Data) -{ - TemporaryFile TempFile; - std::error_code Ec; - SafeWriteFile(Path, Data, Ec); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to safely write file '{}'", Path)); - } -} - -void -TemporaryFile::SafeWriteFile(const std::filesystem::path& Path, MemoryView Data, std::error_code& OutEc) -{ - TemporaryFile TempFile; - if (TempFile.CreateTemporary(Path.parent_path(), OutEc); !OutEc) - { - if (TempFile.Write(Data, 0, OutEc); !OutEc) - { - TempFile.MoveTemporaryIntoPlace(Path, OutEc); - } - } -} - -////////////////////////////////////////////////////////////////////////// - -LockFile::LockFile() -{ -} - -LockFile::~LockFile() -{ -#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - int Fd = int(intptr_t(m_FileHandle)); - flock(Fd, LOCK_UN | LOCK_NB); -#endif -} - -void -LockFile::Create(std::filesystem::path FileName, CbObject Payload, std::error_code& Ec) -{ -#if ZEN_PLATFORM_WINDOWS - Ec.clear(); - - const DWORD dwCreationDisposition = CREATE_ALWAYS; - DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE | DELETE; - const DWORD dwShareMode = FILE_SHARE_READ; - const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_DELETE_ON_CLOSE; - HANDLE hTemplateFile = nullptr; - - HANDLE FileHandle = CreateFile(FileName.c_str(), - dwDesiredAccess, - dwShareMode, - /* lpSecurityAttributes */ nullptr, - dwCreationDisposition, - dwFlagsAndAttributes, - hTemplateFile); - - if (FileHandle == INVALID_HANDLE_VALUE) - { - Ec = zen::MakeErrorCodeFromLastError(); - - return; - } -#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - int Fd = open(FileName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666); - if (Fd < 0) - { - Ec = zen::MakeErrorCodeFromLastError(); - return; - } - fchmod(Fd, 0666); - - int LockRet = flock(Fd, LOCK_EX | LOCK_NB); - if (LockRet < 0) - { - Ec = zen::MakeErrorCodeFromLastError(); - close(Fd); - return; - } - - void* FileHandle = (void*)uintptr_t(Fd); -#endif - - m_FileHandle = FileHandle; - - BasicFile::Write(Payload.GetBuffer(), 0, Ec); -} - -void -LockFile::Update(CbObject Payload, std::error_code& Ec) -{ - BasicFile::Write(Payload.GetBuffer(), 0, Ec); -} - -////////////////////////////////////////////////////////////////////////// - -BasicFileBuffer::BasicFileBuffer(BasicFile& Base, uint64_t BufferSize) -: m_Base(Base) -, m_Buffer(nullptr) -, m_BufferSize(BufferSize) -, m_Size(Base.FileSize()) -, m_BufferStart(0) -, m_BufferEnd(0) -{ - m_Buffer = (uint8_t*)Memory::Alloc(m_BufferSize); -} - -BasicFileBuffer::~BasicFileBuffer() -{ - Memory::Free(m_Buffer); -} - -void -BasicFileBuffer::Read(void* Data, uint64_t Size, uint64_t FileOffset) -{ - if (m_Buffer == nullptr || (Size > m_BufferSize) || (FileOffset + Size > m_Size)) - { - m_Base.Read(Data, Size, FileOffset); - return; - } - uint8_t* WritePtr = ((uint8_t*)Data); - uint64_t Begin = FileOffset; - uint64_t End = FileOffset + Size; - if (FileOffset <= m_BufferStart) - { - if (End > m_BufferStart) - { - uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart; - memcpy(WritePtr + End - Count - FileOffset, m_Buffer, Count); - End -= Count; - if (Begin == End) - { - return; - } - } - } - else if (FileOffset < m_BufferEnd) - { - uint64_t Count = Min(m_BufferEnd, End) - FileOffset; - memcpy(WritePtr + Begin - FileOffset, m_Buffer + Begin - m_BufferStart, Count); - Begin += Count; - if (Begin == End) - { - return; - } - } - m_BufferStart = Begin; - m_BufferEnd = Min(Begin + m_BufferSize, m_Size); - m_Base.Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart); - uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart; - memcpy(WritePtr + Begin - FileOffset, m_Buffer, Count); - ZEN_ASSERT(Begin + Count == End); -} - -MemoryView -BasicFileBuffer::MakeView(uint64_t Size, uint64_t FileOffset) -{ - if (FileOffset < m_BufferStart || (FileOffset + Size) > m_BufferEnd) - { - if (m_Buffer == nullptr || (Size > m_BufferSize) || (FileOffset + Size > m_Size)) - { - return {}; - } - m_BufferStart = FileOffset; - m_BufferEnd = Min(m_BufferStart + m_BufferSize, m_Size); - m_Base.Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart); - } - return MemoryView(m_Buffer + (FileOffset - m_BufferStart), Size); -} - -////////////////////////////////////////////////////////////////////////// - -BasicFileWriter::BasicFileWriter(BasicFile& Base, uint64_t BufferSize) -: m_Base(Base) -, m_Buffer(nullptr) -, m_BufferSize(BufferSize) -, m_BufferStart(0) -, m_BufferEnd(0) -{ - m_Buffer = (uint8_t*)Memory::Alloc(m_BufferSize); -} - -BasicFileWriter::~BasicFileWriter() -{ - Flush(); - Memory::Free(m_Buffer); -} - -void -BasicFileWriter::Write(const void* Data, uint64_t Size, uint64_t FileOffset) -{ - if (m_Buffer == nullptr || (Size >= m_BufferSize)) - { - m_Base.Write(Data, Size, FileOffset); - return; - } - - // Note that this only supports buffering of sequential writes! - - if (FileOffset != m_BufferEnd) - { - Flush(); - m_BufferStart = m_BufferEnd = FileOffset; - } - - const uint8_t* DataPtr = (const uint8_t*)Data; - while (Size) - { - const uint64_t RemainingBufferCapacity = m_BufferStart + m_BufferSize - m_BufferEnd; - const uint64_t BlockWriteBytes = Min(RemainingBufferCapacity, Size); - const uint64_t BufferWriteOffset = FileOffset - m_BufferStart; - - ZEN_ASSERT_SLOW(BufferWriteOffset < m_BufferSize); - ZEN_ASSERT_SLOW((BufferWriteOffset + BlockWriteBytes) <= m_BufferSize); - - memcpy(m_Buffer + BufferWriteOffset, DataPtr, BlockWriteBytes); - - Size -= BlockWriteBytes; - m_BufferEnd += BlockWriteBytes; - FileOffset += BlockWriteBytes; - DataPtr += BlockWriteBytes; - - if ((m_BufferEnd - m_BufferStart) == m_BufferSize) - { - Flush(); - } - } -} - -void -BasicFileWriter::Flush() -{ - const uint64_t BufferedBytes = m_BufferEnd - m_BufferStart; - - if (BufferedBytes == 0) - return; - - const uint64_t WriteOffset = m_BufferStart; - m_BufferStart = m_BufferEnd; - - m_Base.Write(m_Buffer, BufferedBytes, WriteOffset); -} - -////////////////////////////////////////////////////////////////////////// - -/* - ___________ __ - \__ ___/___ _______/ |_ ______ - | |_/ __ \ / ___/\ __\/ ___/ - | |\ ___/ \___ \ | | \___ \ - |____| \___ >____ > |__| /____ > - \/ \/ \/ -*/ - -#if ZEN_WITH_TESTS - -TEST_CASE("BasicFile") -{ - ScopedCurrentDirectoryChange _; - - BasicFile File1; - CHECK_THROWS(File1.Open("zonk", BasicFile::Mode::kRead)); - CHECK_NOTHROW(File1.Open("zonk", BasicFile::Mode::kTruncate)); - CHECK_NOTHROW(File1.Write("abcd", 4, 0)); - CHECK(File1.FileSize() == 4); - { - IoBuffer Data = File1.ReadAll(); - CHECK(Data.Size() == 4); - CHECK_EQ(memcmp(Data.Data(), "abcd", 4), 0); - } - CHECK_NOTHROW(File1.Write("efgh", 4, 2)); - CHECK(File1.FileSize() == 6); - { - IoBuffer Data = File1.ReadAll(); - CHECK(Data.Size() == 6); - CHECK_EQ(memcmp(Data.Data(), "abefgh", 6), 0); - } -} - -TEST_CASE("TemporaryFile") -{ - ScopedCurrentDirectoryChange _; - - SUBCASE("DeleteOnClose") - { - std::filesystem::path Path; - { - TemporaryFile TmpFile; - std::error_code Ec; - TmpFile.CreateTemporary(std::filesystem::current_path(), Ec); - CHECK(!Ec); - Path = TmpFile.GetPath(); - CHECK(std::filesystem::exists(Path)); - } - CHECK(std::filesystem::exists(Path) == false); - } - - SUBCASE("MoveIntoPlace") - { - TemporaryFile TmpFile; - std::error_code Ec; - TmpFile.CreateTemporary(std::filesystem::current_path(), Ec); - CHECK(!Ec); - std::filesystem::path TempPath = TmpFile.GetPath(); - std::filesystem::path FinalPath = std::filesystem::current_path() / "final"; - CHECK(std::filesystem::exists(TempPath)); - TmpFile.MoveTemporaryIntoPlace(FinalPath, Ec); - CHECK(!Ec); - CHECK(std::filesystem::exists(TempPath) == false); - CHECK(std::filesystem::exists(FinalPath)); - } -} - -TEST_CASE("BasicFileBuffer") -{ - ScopedCurrentDirectoryChange _; - { - BasicFile File1; - const std::string_view Data = "0123456789abcdef"; - File1.Open("buffered", BasicFile::Mode::kTruncate); - for (uint32_t I = 0; I < 16; ++I) - { - File1.Write(Data.data(), Data.size(), I * Data.size()); - } - } - SUBCASE("EvenBuffer") - { - BasicFile File1; - File1.Open("buffered", BasicFile::Mode::kRead); - BasicFileBuffer File1Buffer(File1, 16); - // Non-primed - { - char Buffer[16] = {0}; - File1Buffer.Read(Buffer, 16, 1 * 16); - std::string_view Verify(Buffer, 16); - CHECK(Verify == "0123456789abcdef"); - } - // Primed - { - char Buffer[16] = {0}; - File1Buffer.Read(Buffer, 16, 1 * 16); - std::string_view Verify(Buffer, 16); - CHECK(Verify == "0123456789abcdef"); - } - } - SUBCASE("UnevenBuffer") - { - BasicFile File1; - File1.Open("buffered", BasicFile::Mode::kRead); - BasicFileBuffer File1Buffer(File1, 16); - // Non-primed - { - char Buffer[16] = {0}; - File1Buffer.Read(Buffer, 16, 7); - std::string_view Verify(Buffer, 16); - CHECK(Verify == "789abcdef0123456"); - } - // Primed - { - char Buffer[16] = {0}; - File1Buffer.Read(Buffer, 16, 7); - std::string_view Verify(Buffer, 16); - CHECK(Verify == "789abcdef0123456"); - } - } - SUBCASE("BiggerThanBuffer") - { - BasicFile File1; - File1.Open("buffered", BasicFile::Mode::kRead); - BasicFileBuffer File1Buffer(File1, 16); - char Buffer[17] = {0}; - File1Buffer.Read(Buffer, 17, 0 * 16); - std::string_view Verify(Buffer, 17); - CHECK(Verify == "0123456789abcdef0"); - } - SUBCASE("InsideBuffer") - { - BasicFile File1; - File1.Open("buffered", BasicFile::Mode::kRead); - BasicFileBuffer File1Buffer(File1, 16); - char Buffer[16] = {0}; - File1Buffer.Read(Buffer, 16, 0 * 16); - - File1Buffer.Read(Buffer, 8, 2); - std::string_view Verify(Buffer, 8); - CHECK(Verify == "23456789"); - } - SUBCASE("BeginningOfBuffer") - { - BasicFile File1; - File1.Open("buffered", BasicFile::Mode::kRead); - BasicFileBuffer File1Buffer(File1, 16); - char Buffer[16] = {0}; - File1Buffer.Read(Buffer, 16, 8); - - File1Buffer.Read(Buffer, 8, 8); - std::string_view Verify(Buffer, 8); - CHECK(Verify == "89abcdef"); - } - SUBCASE("EndOfBuffer") - { - BasicFile File1; - File1.Open("buffered", BasicFile::Mode::kRead); - BasicFileBuffer File1Buffer(File1, 16); - char Buffer[16] = {0}; - File1Buffer.Read(Buffer, 16, 0 * 16); - - File1Buffer.Read(Buffer, 8, 8); - std::string_view Verify(Buffer, 8); - CHECK(Verify == "89abcdef"); - } - SUBCASE("OverEnd") - { - BasicFile File1; - File1.Open("buffered", BasicFile::Mode::kRead); - BasicFileBuffer File1Buffer(File1, 16); - char Buffer[16] = {0}; - File1Buffer.Read(Buffer, 16, 0 * 16); - - File1Buffer.Read(Buffer, 16, 8); - std::string_view Verify(Buffer, 16); - CHECK(Verify == "89abcdef01234567"); - } - SUBCASE("OverBegin") - { - BasicFile File1; - File1.Open("buffered", BasicFile::Mode::kRead); - BasicFileBuffer File1Buffer(File1, 16); - char Buffer[16] = {0}; - File1Buffer.Read(Buffer, 16, 1 * 16); - - File1Buffer.Read(Buffer, 16, 8); - std::string_view Verify(Buffer, 16); - CHECK(Verify == "89abcdef01234567"); - } - SUBCASE("EndOfFile") - { - BasicFile File1; - File1.Open("buffered", BasicFile::Mode::kRead); - BasicFileBuffer File1Buffer(File1, 16); - char Buffer[16] = {0}; - File1Buffer.Read(Buffer, 16, 0 * 16); - - File1Buffer.Read(Buffer, 8, 256 - 8); - std::string_view Verify(Buffer, 8); - CHECK(Verify == "89abcdef"); - } -} - -void -basicfile_forcelink() -{ -} - -#endif - -} // namespace zen diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 9bef4d1a4..1f951167d 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -1,5 +1,8 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zenutil/cache/rpcrecording.h> + +#include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -8,8 +11,6 @@ #include <zencore/system.h> #include <zencore/testing.h> #include <zencore/testutils.h> -#include <zenutil/basicfile.h> -#include <zenutil/cache/rpcrecording.h> ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/format.h> diff --git a/src/zenutil/include/zenutil/basicfile.h b/src/zenutil/include/zenutil/basicfile.h deleted file mode 100644 index 03c5605df..000000000 --- a/src/zenutil/include/zenutil/basicfile.h +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/zencore.h> - -#include <zencore/compositebuffer.h> -#include <zencore/enumflags.h> -#include <zencore/iobuffer.h> - -#include <filesystem> -#include <functional> - -namespace zen { - -class CbObject; - -/** - * Probably the most basic file abstraction in the universe - * - * One thing of note is that there is no notion of a "current file position" - * in this API -- all reads and writes are done from explicit offsets in - * the file. This avoids concurrency issues which can occur otherwise. - * - */ - -class BasicFile -{ -public: - BasicFile() = default; - ~BasicFile(); - - BasicFile(const BasicFile&) = delete; - BasicFile& operator=(const BasicFile&) = delete; - - enum class Mode : uint32_t - { - kRead = 0, // Opens a existing file for read only - kWrite = 1, // Opens (or creates) a file for read and write - kTruncate = 2, // Opens (or creates) a file for read and write and sets the size to zero - kDelete = 3, // Opens (or creates) a file for read and write allowing .DeleteFile file disposition to be set - kTruncateDelete = - 4, // Opens (or creates) a file for read and write and sets the size to zero allowing .DeleteFile file disposition to be set - kModeMask = 0x0007, - kPreventDelete = 0x1000'0000, // Do not open with delete sharing mode (prevent other processes from deleting file while open) - kPreventWrite = 0x2000'0000, // Do not open with write sharing mode (prevent other processes from writing to file while open) - }; - - void Open(const std::filesystem::path& FileName, Mode Mode); - void Open(const std::filesystem::path& FileName, Mode Mode, std::error_code& Ec); - void Open(const std::filesystem::path& FileName, Mode Mode, std::function<bool(std::error_code& Ec)>&& RetryCallback); - void Close(); - void Read(void* Data, uint64_t Size, uint64_t FileOffset); - IoBuffer ReadRange(uint64_t FileOffset, uint64_t ByteCount); - void StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); - void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); - void Write(MemoryView Data, uint64_t FileOffset); - void Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec); - uint64_t Write(CompositeBuffer Data, uint64_t FileOffset, std::error_code& Ec); - void Write(const void* Data, uint64_t Size, uint64_t FileOffset); - void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec); - void Flush(); - [[nodiscard]] uint64_t FileSize() const; - [[nodiscard]] uint64_t FileSize(std::error_code& Ec) const; - void SetFileSize(uint64_t FileSize); - IoBuffer ReadAll(); - void WriteAll(IoBuffer Data, std::error_code& Ec); - void Attach(void* Handle); - void* Detach(); - - inline void* Handle() { return m_FileHandle; } - bool IsOpen() const { return m_FileHandle != nullptr; } - -protected: - void* m_FileHandle = nullptr; // This is either null or valid -private: -}; - -ENUM_CLASS_FLAGS(BasicFile::Mode); - -/** - * Simple abstraction for a temporary file - * - * Works like a regular BasicFile but implements a simple mechanism to allow creating - * a temporary file for writing in a directory which may later be moved atomically - * into the intended location after it has been fully written to. - * - */ - -class TemporaryFile : public BasicFile -{ -public: - TemporaryFile() = default; - ~TemporaryFile(); - - TemporaryFile(const TemporaryFile&) = delete; - TemporaryFile& operator=(const TemporaryFile&) = delete; - - void CreateTemporary(std::filesystem::path TempDirName, std::error_code& Ec); - void MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::error_code& Ec); - const std::filesystem::path& GetPath() const { return m_TempPath; } - - static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data); - static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data, std::error_code& OutEc); - -private: - void Close(); - std::filesystem::path m_TempPath; - - using BasicFile::Open; -}; - -/** Lock file abstraction - - */ - -class LockFile : protected BasicFile -{ -public: - LockFile(); - ~LockFile(); - - void Create(std::filesystem::path FileName, CbObject Payload, std::error_code& Ec); - void Update(CbObject Payload, std::error_code& Ec); - -private: -}; - -/** Adds a layer of buffered reading to a BasicFile - -This class is not intended for concurrent access, it is not thread safe. - -*/ - -class BasicFileBuffer -{ -public: - BasicFileBuffer(BasicFile& Base, uint64_t BufferSize); - ~BasicFileBuffer(); - - void Read(void* Data, uint64_t Size, uint64_t FileOffset); - MemoryView MakeView(uint64_t Size, uint64_t FileOffset); - - template<typename T> - const T* MakeView(uint64_t FileOffset) - { - MemoryView View = MakeView(sizeof(T), FileOffset); - return reinterpret_cast<const T*>(View.GetData()); - } - -private: - BasicFile& m_Base; - uint8_t* m_Buffer; - const uint64_t m_BufferSize; - uint64_t m_Size; - uint64_t m_BufferStart; - uint64_t m_BufferEnd; -}; - -/** Adds a layer of buffered writing to a BasicFile - -This class is not intended for concurrent access, it is not thread safe. - -*/ - -class BasicFileWriter -{ -public: - BasicFileWriter(BasicFile& Base, uint64_t BufferSize); - ~BasicFileWriter(); - - void Write(const void* Data, uint64_t Size, uint64_t FileOffset); - void Flush(); - -private: - BasicFile& m_Base; - uint8_t* m_Buffer; - const uint64_t m_BufferSize; - uint64_t m_BufferStart; - uint64_t m_BufferEnd; -}; - -ZENCORE_API void basicfile_forcelink(); - -} // namespace zen diff --git a/src/zenutil/include/zenutil/jupiter/jupiterclient.h b/src/zenutil/include/zenutil/jupiter/jupiterclient.h new file mode 100644 index 000000000..defe50edc --- /dev/null +++ b/src/zenutil/include/zenutil/jupiter/jupiterclient.h @@ -0,0 +1,57 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenbase/refcount.h> +#include <zencore/logging.h> +#include <zenhttp/httpclient.h> + +#include <chrono> + +namespace zen { + +class IoBuffer; + +struct JupiterClientOptions +{ + std::string_view Name; + std::string_view ServiceUrl; + std::string_view DdcNamespace; + std::string_view BlobStoreNamespace; + std::string_view ComputeCluster; + std::chrono::milliseconds ConnectTimeout{5000}; + std::chrono::milliseconds Timeout{}; + bool AssumeHttp2 = false; + bool AllowResume = false; + uint8_t RetryCount = 0; +}; + +/** + * Jupiter upstream cache client + */ +class JupiterClient : public RefCounted +{ +public: + JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider); + ~JupiterClient(); + + std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } + std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } + std::string_view ComputeCluster() const { return m_ComputeCluster; } + std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); } + + LoggerRef Logger() { return m_Log; } + HttpClient& Client() { return m_HttpClient; } + +private: + LoggerRef m_Log; + const std::string m_DefaultDdcNamespace; + const std::string m_DefaultBlobStoreNamespace; + const std::string m_ComputeCluster; + std::function<HttpClientAccessToken()> m_TokenProvider; + HttpClient m_HttpClient; + + friend class JupiterSession; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h new file mode 100644 index 000000000..6a80332f4 --- /dev/null +++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h @@ -0,0 +1,152 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> +#include <zencore/logging.h> +#include <zenhttp/httpclient.h> + +#include <set> + +namespace zen { + +class IoBuffer; + +struct JupiterResult +{ + IoBuffer Response; + uint64_t SentBytes{}; + uint64_t ReceivedBytes{}; + double ElapsedSeconds{}; + int32_t ErrorCode{}; + std::string Reason; + bool Success = false; +}; + +struct PutRefResult : JupiterResult +{ + std::vector<IoHash> Needs; + IoHash RawHash; +}; + +struct FinalizeRefResult : JupiterResult +{ + std::vector<IoHash> Needs; +}; + +struct JupiterExistsResult : JupiterResult +{ + std::set<IoHash> Needs; +}; + +struct GetObjectReferencesResult : JupiterResult +{ + std::set<IoHash> References; +}; + +struct PutBuildPartResult : JupiterResult +{ + std::vector<IoHash> Needs; + IoHash RawHash; +}; + +struct FinalizeBuildPartResult : JupiterResult +{ + std::vector<IoHash> Needs; +}; + +/** + * Context for performing Jupiter operations + * + * Maintains an HTTP connection so that subsequent operations don't need to go + * through the whole connection setup process + * + */ +class JupiterSession +{ +public: + JupiterSession(LoggerRef InLog, HttpClient& InHttpClient); + ~JupiterSession(); + + JupiterResult Authenticate(); + + JupiterResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); + JupiterResult GetBlob(std::string_view Namespace, const IoHash& Key); + JupiterResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {}); + JupiterResult GetObject(std::string_view Namespace, const IoHash& Key); + JupiterResult GetInlineBlob(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoHash& OutPayloadHash, + std::filesystem::path TempFolderPath = {}); + + PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + JupiterResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob); + JupiterResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); + + FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + + JupiterResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); + + GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key); + + JupiterResult BlobExists(std::string_view Namespace, const IoHash& Key); + JupiterResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key); + JupiterResult ObjectExists(std::string_view Namespace, const IoHash& Key); + + JupiterExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + JupiterExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + JupiterExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys); + + std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + + JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload); + JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + PutBuildPartResult PutBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + std::string_view PartName, + const IoBuffer& Payload); + JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); + JupiterResult PutBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + ZenContentType ContentType, + const CompositeBuffer& Payload); + JupiterResult GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + std::filesystem::path TempFolderPath); + JupiterResult PutBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + const IoBuffer& Payload); + FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& RawHash); + JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); + +private: + inline LoggerRef Log() { return m_Log; } + + JupiterResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); + + JupiterExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); + + LoggerRef m_Log; + HttpClient& m_HttpClient; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h index 3eb9021dd..758722156 100644 --- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h +++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h @@ -2,8 +2,8 @@ #pragma once +#include <zencore/basicfile.h> #include <zencore/memory/llm.h> -#include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START #include <spdlog/details/log_msg.h> diff --git a/src/zenutil/include/zenutil/packageformat.h b/src/zenutil/include/zenutil/packageformat.h deleted file mode 100644 index c90b840da..000000000 --- a/src/zenutil/include/zenutil/packageformat.h +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/compactbinarypackage.h> -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> - -#include <functional> -#include <gsl/gsl-lite.hpp> - -namespace zen { - -class IoBuffer; -class CbPackage; -class CompositeBuffer; - -/** _____ _ _____ _ - / ____| | | __ \ | | - | | | |__ | |__) |_ _ ___| | ____ _ __ _ ___ - | | | '_ \| ___/ _` |/ __| |/ / _` |/ _` |/ _ \ - | |____| |_) | | | (_| | (__| < (_| | (_| | __/ - \_____|_.__/|_| \__,_|\___|_|\_\__,_|\__, |\___| - __/ | - |___/ - - Structures and code related to handling CbPackage transactions - - CbPackage instances are marshaled across the wire using a distinct message - format. We don't use the CbPackage serialization format provided by the - CbPackage implementation itself since that does not provide much flexibility - in how the attachment payloads are transmitted. The scheme below separates - metadata cleanly from payloads and this enables us to more efficiently - transmit them either via sendfile/TransmitFile like mechanisms, or by - reference/memory mapping in the local case. - */ - -struct CbPackageHeader -{ - uint32_t HeaderMagic; - uint32_t AttachmentCount; // TODO: should add ability to opt out of implicit root document? - uint32_t Reserved1; - uint32_t Reserved2; -}; - -static_assert(sizeof(CbPackageHeader) == 16); - -enum : uint32_t -{ - kCbPkgMagic = 0xaa77aacc -}; - -struct CbAttachmentEntry -{ - uint64_t PayloadSize; // Size of the associated payload data in the message - uint32_t Flags; // See flags below - IoHash AttachmentHash; // Content Id for the attachment - - enum - { - kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format - kIsObject = (1u << 1), // Is compact binary object - kIsError = (1u << 2), // Is error (compact binary formatted) object - kIsLocalRef = (1u << 3), // Is "local reference" - }; -}; - -struct CbAttachmentReferenceHeader -{ - uint64_t PayloadByteOffset = 0; - uint64_t PayloadByteSize = ~0u; - uint16_t AbsolutePathLength = 0; - - // This header will be followed by UTF8 encoded absolute path to backing file -}; - -static_assert(sizeof(CbAttachmentEntry) == 32); - -enum class FormatFlags -{ - kDefault = 0, - kAllowLocalReferences = (1u << 0), - kDenyPartialLocalReferences = (1u << 1) -}; - -gsl_DEFINE_ENUM_BITMASK_OPERATORS(FormatFlags); - -enum class RpcAcceptOptions : uint16_t -{ - kNone = 0, - kAllowLocalReferences = (1u << 0), - kAllowPartialLocalReferences = (1u << 1), - kAllowPartialCacheChunks = (1u << 2) -}; - -gsl_DEFINE_ENUM_BITMASK_OPERATORS(RpcAcceptOptions); - -std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle = nullptr); -CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle = nullptr); -CbPackage ParsePackageMessage( - IoBuffer Payload, - std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer { - return IoBuffer{Size}; - }); -bool IsPackageMessage(IoBuffer Payload); - -bool ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage); - -std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, void* TargetProcessHandle = nullptr); -CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, void* TargetProcessHandle = nullptr); - -/** Streaming reader for compact binary packages - - The goal is to ultimately support zero-copy I/O, but for now there'll be some - copying involved on some platforms at least. - - This approach to deserializing CbPackage data is more efficient than - `ParsePackageMessage` since it does not require the entire message to - be resident in a memory buffer - - */ -class CbPackageReader -{ -public: - CbPackageReader(); - ~CbPackageReader(); - - void SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer); - - /** Process compact binary package data stream - - The data stream must be in the serialization format produced by FormatPackageMessage - - \return How many bytes must be fed to this function in the next call - */ - uint64_t ProcessPackageHeaderData(const void* Data, uint64_t DataBytes); - - void Finalize(); - const std::vector<CbAttachment>& GetAttachments() { return m_Attachments; } - CbObject GetRootObject() { return m_RootObject; } - std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; } - -private: - enum class State - { - kInitialState, - kReadingHeader, - kReadingAttachmentEntries, - kReadingBuffers - } m_CurrentState = State::kInitialState; - - std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> m_CreateBuffer; - std::vector<IoBuffer> m_PayloadBuffers; - std::vector<CbAttachmentEntry> m_AttachmentEntries; - std::vector<CbAttachment> m_Attachments; - CbObject m_RootObject; - CbPackageHeader m_PackageHeader; - - IoBuffer MarshalLocalChunkReference(IoBuffer AttachmentBuffer); -}; - -void forcelink_packageformat(); - -} // namespace zen diff --git a/src/zenutil/jupiter/jupiterclient.cpp b/src/zenutil/jupiter/jupiterclient.cpp new file mode 100644 index 000000000..5e5da3750 --- /dev/null +++ b/src/zenutil/jupiter/jupiterclient.cpp @@ -0,0 +1,29 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/jupiter/jupiterclient.h> + +namespace zen { + +using namespace std::literals; + +JupiterClient::JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider) +: m_Log(zen::logging::Get("jupiter"sv)) +, m_DefaultDdcNamespace(Options.DdcNamespace) +, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) +, m_ComputeCluster(Options.ComputeCluster) +, m_TokenProvider(std::move(TokenProvider)) +, m_HttpClient(Options.ServiceUrl, + HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, + .Timeout = Options.Timeout, + .AccessTokenProvider = std::move(TokenProvider), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = Options.AllowResume, + .RetryCount = Options.RetryCount}) +{ +} + +JupiterClient::~JupiterClient() +{ +} + +} // namespace zen diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp new file mode 100644 index 000000000..f706a7efc --- /dev/null +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -0,0 +1,505 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/jupiter/jupitersession.h> + +#include <zencore/compactbinary.h> +#include <zencore/fmtutils.h> +#include <zencore/trace.h> + +ZEN_THIRD_PARTY_INCLUDES_START +//#include <cpr/cpr.h> +//#include <fmt/format.h> +#include <json11.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +using namespace std::literals; + +namespace zen { + +namespace detail { + JupiterResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) + { + if (Response.Error) + { + return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = Response.Error.value().ErrorCode, + .Reason = Response.ErrorMessage(ErrorPrefix), + .Success = false}; + } + if (!Response.IsSuccess()) + { + return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = static_cast<int32_t>(Response.StatusCode), + .Reason = Response.ErrorMessage(ErrorPrefix), + .Success = false}; + } + return {.Response = Response.ResponsePayload, + .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = 0, + .Success = true}; + } +} // namespace detail + +JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient) : m_Log(InLog), m_HttpClient(InHttpClient) +{ +} + +JupiterSession::~JupiterSession() +{ +} + +JupiterResult +JupiterSession::Authenticate() +{ + bool OK = m_HttpClient.Authenticate(); + return {.Success = OK}; +} + +JupiterResult +JupiterSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) +{ + ZEN_TRACE_CPU("JupiterClient::GetRef"); + + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), {HttpClient::Accept(RefType)}); + + return detail::ConvertResponse(Response, "JupiterSession::GetRef"sv); +} + +JupiterResult +JupiterSession::GetBlob(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetBlob"); + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kBinary)}); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) +{ + ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); + + HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + TempFolderPath, + {HttpClient::Accept(ZenContentType::kCompressedBinary)}); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::GetInlineBlob(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoHash& OutPayloadHash, + std::filesystem::path TempFolderPath) +{ + ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); + + HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + TempFolderPath, + {{"Accept", "application/x-jupiter-inline"}}); + + JupiterResult Result = detail::ConvertResponse(Response); + + if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) + { + const std::string& PayloadHashHeader = It->second; + if (PayloadHashHeader.length() == IoHash::StringLength) + { + OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); + } + } + + return Result; +} + +JupiterResult +JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetObject"); + + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); + + return detail::ConvertResponse(Response); +} + +PutRefResult +JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) +{ + ZEN_TRACE_CPU("JupiterClient::PutRef"); + + Ref.SetContentType(RefType); + + IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); + + HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + Ref, + {{"X-Jupiter-IoHash", Hash.ToHexString()}}); + + PutRefResult Result = {detail::ConvertResponse(Response)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + Result.RawHash = Hash; + } + return Result; +} + +FinalizeRefResult +JupiterSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +{ + ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), + {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + + FinalizeRefResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + return Result; +} + +JupiterResult +JupiterSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) +{ + ZEN_TRACE_CPU("JupiterClient::PutBlob"); + + HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) +{ + ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); + + Blob.SetContentType(ZenContentType::kCompressedBinary); + HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) +{ + ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); + + HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + Payload, + ZenContentType::kCompressedBinary); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) +{ + ZEN_TRACE_CPU("JupiterClient::PutObject"); + + Object.SetContentType(ZenContentType::kCbObject); + HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::RefExists"); + + HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); + + return detail::ConvertResponse(Response); +} + +GetObjectReferencesResult +JupiterSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); + + HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); + + GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + const CbObject ReferencesResponse = Response.AsObject(); + for (auto& Item : ReferencesResponse["references"sv]) + { + Result.References.insert(Item.AsHash()); + } + } + return Result; +} + +JupiterResult +JupiterSession::BlobExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "blobs"sv, Key); +} + +JupiterResult +JupiterSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); +} + +JupiterResult +JupiterSession::ObjectExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "objects"sv, Key); +} + +JupiterExistsResult +JupiterSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "blobs"sv, Keys); +} + +JupiterExistsResult +JupiterSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); +} + +JupiterExistsResult +JupiterSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "objects"sv, Keys); +} + +std::vector<IoHash> +JupiterSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) +{ + // ExtendableStringBuilder<256> Uri; + // Uri << m_CacheClient->ServiceUrl(); + // Uri << "/api/v1/s/" << Namespace; + + ZEN_UNUSED(Namespace, BucketId, ChunkHashes); + + return {}; +} + +JupiterResult +JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); + + HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); + + return detail::ConvertResponse(Response); +} + +JupiterExistsResult +JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) +{ + ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); + + ExtendableStringBuilder<256> Body; + Body << "["; + for (const auto& Key : Keys) + { + Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; + } + Body << "]"; + IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); + Payload.SetContentType(ZenContentType::kJSON); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), Payload, {HttpClient::Accept(ZenContentType::kCbObject)}); + + JupiterExistsResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + const CbObject ExistsResponse = Response.AsObject(); + for (auto& Item : ExistsResponse["needs"sv]) + { + Result.Needs.insert(Item.AsHash()); + } + } + return Result; +} + +JupiterResult +JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload); + return detail::ConvertResponse(Response, "JupiterSession::PutBuild"sv); +} + +JupiterResult +JupiterSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +{ + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::GetBuild"sv); +} + +JupiterResult +JupiterSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +{ + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId)); + return detail::ConvertResponse(Response, "JupiterSession::FinalizeBuild"sv); +} + +PutBuildPartResult +JupiterSession::PutBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + std::string_view PartName, + const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + + IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); + + HttpClient::Response Response = + m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName), + Payload, + {{"X-Jupiter-IoHash", Hash.ToHexString()}}); + + PutBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::PutBuildPart"sv)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + Result.RawHash = Hash; + } + return Result; +} + +JupiterResult +JupiterSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) +{ + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::GetBuildPart"sv); +} + +JupiterResult +JupiterSession::PutBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + ZenContentType ContentType, + const CompositeBuffer& Payload) +{ + HttpClient::Response Response = m_HttpClient.Upload( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + Payload, + ContentType); + return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv); +} + +JupiterResult +JupiterSession::GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + std::filesystem::path TempFolderPath) +{ + HttpClient::Response Response = m_HttpClient.Download( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + TempFolderPath); + return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); +} + +JupiterResult +JupiterSession::PutBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = m_HttpClient.Put( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + Payload); + return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv); +} + +FinalizeBuildPartResult +JupiterSession::FinalizeBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& RawHash) +{ + HttpClient::Response Response = m_HttpClient.Post( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()), + HttpClient::Accept(ZenContentType::kCbObject)); + + FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::FinalizeBuildPart"sv)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + return Result; +} + +JupiterResult +JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) +{ + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); +} + +} // namespace zen diff --git a/src/zenutil/packageformat.cpp b/src/zenutil/packageformat.cpp deleted file mode 100644 index 579e0d13c..000000000 --- a/src/zenutil/packageformat.cpp +++ /dev/null @@ -1,894 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/packageformat.h> - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compositebuffer.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> -#include <zencore/logging.h> -#include <zencore/scopeguard.h> -#include <zencore/stream.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> -#include <zencore/trace.h> - -#include <span> -#include <vector> - -#if ZEN_PLATFORM_WINDOWS -# include <zencore/windows.h> -#endif - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_map.h> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -const std::string_view HandlePrefix(":?#:"); - -std::vector<IoBuffer> -FormatPackageMessage(const CbPackage& Data, void* TargetProcessHandle) -{ - return FormatPackageMessage(Data, FormatFlags::kDefault, TargetProcessHandle); -} -CompositeBuffer -FormatPackageMessageBuffer(const CbPackage& Data, void* TargetProcessHandle) -{ - return FormatPackageMessageBuffer(Data, FormatFlags::kDefault, TargetProcessHandle); -} - -CompositeBuffer -FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle) -{ - return CompositeBuffer(FormatPackageMessage(Data, Flags, TargetProcessHandle)); -} - -static void -MarshalLocal(CbAttachmentEntry*& AttachmentInfo, - const std::string& Path8, - CbAttachmentReferenceHeader& LocalRef, - const IoHash& AttachmentHash, - bool IsCompressed, - std::vector<IoBuffer>& ResponseBuffers) -{ - IoBuffer RefBuffer(sizeof(CbAttachmentReferenceHeader) + Path8.size()); - - CbAttachmentReferenceHeader* RefHdr = RefBuffer.MutableData<CbAttachmentReferenceHeader>(); - *RefHdr++ = LocalRef; - memcpy(RefHdr, Path8.data(), Path8.size()); - - *AttachmentInfo++ = {.PayloadSize = RefBuffer.GetSize(), - .Flags = (IsCompressed ? uint32_t(CbAttachmentEntry::kIsCompressed) : 0u) | CbAttachmentEntry::kIsLocalRef, - .AttachmentHash = AttachmentHash}; - - ResponseBuffers.emplace_back(std::move(RefBuffer)); -}; - -static bool -IsLocalRef(tsl::robin_map<void*, std::string>& FileNameMap, - std::vector<void*>& DuplicatedHandles, - const CompositeBuffer& AttachmentBinary, - bool DenyPartialLocalReferences, - void* TargetProcessHandle, - CbAttachmentReferenceHeader& LocalRef, - std::string& Path8) -{ - const SharedBuffer& Segment = AttachmentBinary.GetSegments().front(); - IoBufferFileReference Ref; - const IoBuffer& SegmentBuffer = Segment.AsIoBuffer(); - - if (!SegmentBuffer.GetFileReference(Ref)) - { - return false; - } - - if (DenyPartialLocalReferences && !SegmentBuffer.IsWholeFile()) - { - return false; - } - - if (auto It = FileNameMap.find(Ref.FileHandle); It != FileNameMap.end()) - { - Path8 = It->second; - } - else - { - bool UseFilePath = true; -#if ZEN_PLATFORM_WINDOWS - if (TargetProcessHandle != nullptr) - { - HANDLE TargetHandle = INVALID_HANDLE_VALUE; - BOOL OK = ::DuplicateHandle(GetCurrentProcess(), - Ref.FileHandle, - (HANDLE)TargetProcessHandle, - &TargetHandle, - FILE_GENERIC_READ, - FALSE, - 0); - if (OK) - { - DuplicatedHandles.push_back((void*)TargetHandle); - Path8 = fmt::format("{}{}", HandlePrefix, reinterpret_cast<uint64_t>(TargetHandle)); - UseFilePath = false; - } - } -#else // ZEN_PLATFORM_WINDOWS - ZEN_UNUSED(TargetProcessHandle); - ZEN_UNUSED(DuplicatedHandles); - // Not supported on Linux/Mac. Could potentially use pidfd_getfd() but that requires a fairly new Linux kernel/includes and to - // deal with access rights etc. -#endif // ZEN_PLATFORM_WINDOWS - if (UseFilePath) - { - ExtendablePathBuilder<256> LocalRefFile; - std::error_code Ec; - std::filesystem::path FilePath = PathFromHandle(Ref.FileHandle, Ec); - if (Ec) - { - ZEN_WARN("Failed to get path for file handle {} in IsLocalRef check, reason '{}'", Ref.FileHandle, Ec.message()); - return false; - } - LocalRefFile.Append(std::filesystem::absolute(FilePath)); - Path8 = LocalRefFile.ToUtf8(); - } - FileNameMap.insert_or_assign(Ref.FileHandle, Path8); - } - - LocalRef.AbsolutePathLength = gsl::narrow<uint16_t>(Path8.size()); - LocalRef.PayloadByteOffset = Ref.FileChunkOffset; - LocalRef.PayloadByteSize = Ref.FileChunkSize; - - return true; -}; - -std::vector<IoBuffer> -FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle) -{ - ZEN_TRACE_CPU("FormatPackageMessage"); - - std::vector<void*> DuplicatedHandles; -#if ZEN_PLATFORM_WINDOWS - auto _ = MakeGuard([&DuplicatedHandles, &TargetProcessHandle]() { - if (TargetProcessHandle == nullptr) - { - return; - } - - for (void* DuplicatedHandle : DuplicatedHandles) - { - HANDLE ClosingHandle; - if (::DuplicateHandle((HANDLE)TargetProcessHandle, - (HANDLE)DuplicatedHandle, - GetCurrentProcess(), - &ClosingHandle, - 0, - FALSE, - DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS) == TRUE) - { - ::CloseHandle(ClosingHandle); - } - } - }); -#endif // ZEN_PLATFORM_WINDOWS - - const std::span<const CbAttachment>& Attachments = Data.GetAttachments(); - std::vector<IoBuffer> ResponseBuffers; - - ResponseBuffers.reserve(2 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each - // attachment is likely to consist of several buffers - - IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbPackageHeader) + sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)}; - MutableMemoryView HeaderView = AttachmentMetadataBuffer.GetMutableView(); - // Fixed size header - - CbPackageHeader* Hdr = (CbPackageHeader*)HeaderView.GetData(); - *Hdr = {.HeaderMagic = kCbPkgMagic, .AttachmentCount = gsl::narrow<uint32_t>(Attachments.size())}; - HeaderView.MidInline(sizeof(CbPackageHeader)); - - // Attachment metadata array - CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(HeaderView.GetData()); - ResponseBuffers.emplace_back(std::move(AttachmentMetadataBuffer)); // Attachment metadata - - // Root object - - IoBuffer RootIoBuffer = Data.GetObject().GetBuffer().AsIoBuffer(); - ZEN_ASSERT(RootIoBuffer.GetSize() > 0); - *AttachmentInfo++ = {.PayloadSize = RootIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Data.GetObjectHash()}; - ResponseBuffers.emplace_back(std::move(RootIoBuffer)); // Root object - - // Attachment payloads - tsl::robin_map<void*, std::string> FileNameMap; - - for (const CbAttachment& Attachment : Attachments) - { - if (Attachment.IsNull()) - { - ZEN_NOT_IMPLEMENTED("Null attachments are not supported"); - } - else if (const CompressedBuffer& AttachmentBuffer = Attachment.AsCompressedBinary()) - { - const CompositeBuffer& Compressed = AttachmentBuffer.GetCompressed(); - IoHash AttachmentHash = Attachment.GetHash(); - - // If the data is either not backed by a file, or there are multiple - // fragments then we cannot marshal it by local reference. We might - // want/need to extend this in the future to allow multiple chunk - // segments to be marshaled at once - - bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1); - bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences); - CbAttachmentReferenceHeader LocalRef; - std::string Path8; - - if (MarshalByLocalRef) - { - MarshalByLocalRef = IsLocalRef(FileNameMap, - DuplicatedHandles, - Compressed, - DenyPartialLocalReferences, - TargetProcessHandle, - LocalRef, - Path8); - } - - if (MarshalByLocalRef) - { - const bool IsCompressed = true; - bool IsHandle = false; -#if ZEN_PLATFORM_WINDOWS - IsHandle = Path8.starts_with(HandlePrefix); -#endif - MarshalLocal(AttachmentInfo, Path8, LocalRef, AttachmentHash, IsCompressed, ResponseBuffers); - ZEN_DEBUG("Marshalled '{}' as file {} of {} bytes", Path8, IsHandle ? "handle" : "path", Compressed.GetSize()); - } - else - { - *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), - .Flags = CbAttachmentEntry::kIsCompressed, - .AttachmentHash = AttachmentHash}; - - std::span<const SharedBuffer> Segments = Compressed.GetSegments(); - ResponseBuffers.reserve(ResponseBuffers.size() + Segments.size() - 1); - for (const SharedBuffer& Segment : Segments) - { - ZEN_ASSERT(Segment.GetSize() > 0); - ResponseBuffers.emplace_back(Segment.AsIoBuffer()); - } - } - } - else if (CbObject AttachmentObject = Attachment.AsObject()) - { - IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer(); - ZEN_ASSERT(ObjIoBuffer.GetSize() > 0); - ResponseBuffers.emplace_back(std::move(ObjIoBuffer)); - - *AttachmentInfo++ = {.PayloadSize = ObjIoBuffer.Size(), - .Flags = CbAttachmentEntry::kIsObject, - .AttachmentHash = Attachment.GetHash()}; - } - else if (const CompositeBuffer& AttachmentBinary = Attachment.AsCompositeBinary()) - { - IoHash AttachmentHash = Attachment.GetHash(); - bool MarshalByLocalRef = - EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1); - bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences); - - CbAttachmentReferenceHeader LocalRef; - std::string Path8; - - if (MarshalByLocalRef) - { - MarshalByLocalRef = IsLocalRef(FileNameMap, - DuplicatedHandles, - AttachmentBinary, - DenyPartialLocalReferences, - TargetProcessHandle, - LocalRef, - Path8); - } - - if (MarshalByLocalRef) - { - const bool IsCompressed = false; - bool IsHandle = false; -#if ZEN_PLATFORM_WINDOWS - IsHandle = Path8.starts_with(HandlePrefix); -#endif - MarshalLocal(AttachmentInfo, Path8, LocalRef, AttachmentHash, IsCompressed, ResponseBuffers); - ZEN_DEBUG("Marshalled '{}' as file {} of {} bytes", Path8, IsHandle ? "handle" : "path", AttachmentBinary.GetSize()); - } - else - { - *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; - - std::span<const SharedBuffer> Segments = AttachmentBinary.GetSegments(); - ResponseBuffers.reserve(ResponseBuffers.size() + Segments.size() - 1); - for (const SharedBuffer& Segment : Segments) - { - ZEN_ASSERT(Segment.GetSize() > 0); - ResponseBuffers.emplace_back(Segment.AsIoBuffer()); - } - } - } - else - { - ZEN_NOT_IMPLEMENTED("Unknown attachment kind"); - } - } - FileNameMap.clear(); -#if ZEN_PLATFORM_WINDOWS - DuplicatedHandles.clear(); -#endif // ZEN_PLATFORM_WINDOWS - - return ResponseBuffers; -} - -bool -IsPackageMessage(IoBuffer Payload) -{ - if (Payload.GetSize() < sizeof(CbPackageHeader)) - { - return false; - } - - BinaryReader Reader(Payload); - const CbPackageHeader* Hdr = reinterpret_cast<const CbPackageHeader*>(Reader.GetView(sizeof(CbPackageHeader)).GetData()); - if (Hdr->HeaderMagic != kCbPkgMagic) - { - return false; - } - - return true; -} - -CbPackage -ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint64_t)> CreateBuffer) -{ - ZEN_TRACE_CPU("ParsePackageMessage"); - - if (Payload.GetSize() < sizeof(CbPackageHeader)) - { - throw std::invalid_argument(fmt::format("invalid CbPackage, missing complete header (size {})", Payload.GetSize())); - } - - BinaryReader Reader(Payload); - - const CbPackageHeader* Hdr = reinterpret_cast<const CbPackageHeader*>(Reader.GetView(sizeof(CbPackageHeader)).GetData()); - if (Hdr->HeaderMagic != kCbPkgMagic) - { - throw std::invalid_argument( - fmt::format("invalid CbPackage header magic, expected {0:x}, got {0:x}", static_cast<uint32_t>(kCbPkgMagic), Hdr->HeaderMagic)); - } - Reader.Skip(sizeof(CbPackageHeader)); - - const uint32_t ChunkCount = Hdr->AttachmentCount + 1; - - if (Reader.Remaining() < sizeof(CbAttachmentEntry) * ChunkCount) - { - throw std::invalid_argument(fmt::format("invalid CbPackage, missing attachment entry data (need {} bytes, have {} bytes)", - sizeof(CbAttachmentEntry) * ChunkCount, - Reader.Remaining())); - } - const CbAttachmentEntry* AttachmentEntries = - reinterpret_cast<const CbAttachmentEntry*>(Reader.GetView(sizeof(CbAttachmentEntry) * ChunkCount).GetData()); - Reader.Skip(sizeof(CbAttachmentEntry) * ChunkCount); - - CbPackage Package; - - std::vector<CbAttachment> Attachments; - Attachments.reserve(ChunkCount); // Guessing here... - - tsl::robin_map<std::string, IoBuffer> PartialFileBuffers; - - std::vector<std::pair<uint32_t, std::string>> MalformedAttachments; - - for (uint32_t i = 0; i < ChunkCount; ++i) - { - const CbAttachmentEntry& Entry = AttachmentEntries[i]; - const uint64_t AttachmentSize = Entry.PayloadSize; - - if (Reader.Remaining() < AttachmentSize) - { - throw std::invalid_argument(fmt::format("invalid CbPackage, missing attachment data (need {} bytes, have {} bytes)", - AttachmentSize, - Reader.Remaining())); - } - const IoBuffer AttachmentBuffer(Payload, Reader.CurrentOffset(), AttachmentSize); - Reader.Skip(AttachmentSize); - - if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) - { - // Marshal local reference - a "pointer" to the chunk backing file - - ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader)); - - const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>(); - const char* PathPointer = reinterpret_cast<const char*>(AttachRefHdr + 1); - - ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); - std::string_view PathView(PathPointer, AttachRefHdr->AbsolutePathLength); - - IoBuffer FullFileBuffer; - - std::filesystem::path Path(Utf8ToWide(PathView)); - if (auto It = PartialFileBuffers.find(Path.string()); It != PartialFileBuffers.end()) - { - FullFileBuffer = It->second; - } - else - { - if (PathView.starts_with(HandlePrefix)) - { -#if ZEN_PLATFORM_WINDOWS - std::string_view HandleString(PathView.substr(HandlePrefix.length())); - std::optional<uint64_t> HandleNumber(ParseInt<uint64_t>(HandleString)); - if (HandleNumber.has_value()) - { - HANDLE FileHandle = HANDLE(HandleNumber.value()); - ULARGE_INTEGER liFileSize; - liFileSize.LowPart = ::GetFileSize(FileHandle, &liFileSize.HighPart); - if (liFileSize.LowPart != INVALID_FILE_SIZE) - { - FullFileBuffer = - IoBuffer(IoBuffer::File, (void*)FileHandle, 0, uint64_t(liFileSize.QuadPart), /*IsWholeFile*/ true); - PartialFileBuffers.insert_or_assign(Path.string(), FullFileBuffer); - } - } -#else // ZEN_PLATFORM_WINDOWS - // Not supported on Linux/Mac. Could potentially use pidfd_getfd() but that requires a fairly new Linux kernel/includes - // and to deal with acceess rights etc. - ZEN_ASSERT(false); -#endif // ZEN_PLATFORM_WINDOWS - } - else - { - FullFileBuffer = PartialFileBuffers.insert_or_assign(Path.string(), IoBufferBuilder::MakeFromFile(Path)).first->second; - } - } - - if (FullFileBuffer) - { - IoBuffer ChunkReference = AttachRefHdr->PayloadByteOffset == 0 && AttachRefHdr->PayloadByteSize == FullFileBuffer.GetSize() - ? FullFileBuffer - : IoBuffer(FullFileBuffer, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize); - - CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkReference))); - if (CompBuf) - { - Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash)); - } - else - { - MalformedAttachments.push_back(std::make_pair(i, - fmt::format("Invalid format in '{}' (offset {}, size {}) for {}", - Path, - AttachRefHdr->PayloadByteOffset, - AttachRefHdr->PayloadByteSize, - Entry.AttachmentHash))); - } - } - else - { - MalformedAttachments.push_back(std::make_pair(i, - fmt::format("Unable to resolve chunk at '{}' (offset {}, size {}) for {}", - Path, - AttachRefHdr->PayloadByteOffset, - AttachRefHdr->PayloadByteSize, - Entry.AttachmentHash))); - } - } - else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) - { - if (Entry.Flags & CbAttachmentEntry::kIsObject) - { - if (i == 0) - { - CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer))); - if (CompBuf) - { - Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); - } - else - { - // First payload is always a compact binary object - MalformedAttachments.push_back( - std::make_pair(i, - fmt::format("Invalid format, expected compressed buffer for CbObject (size {}) for {}", - AttachmentBuffer.GetSize(), - Entry.AttachmentHash))); - } - } - else - { - MalformedAttachments.push_back(std::make_pair( - i, - fmt::format("Invalid format, compressed object attachments are not currently supported (size {}) for {}", - AttachmentBuffer.GetSize(), - Entry.AttachmentHash))); - } - } - else - { - CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer))); - if (CompBuf) - { - Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash)); - } - else - { - MalformedAttachments.push_back( - std::make_pair(i, - fmt::format("Invalid format, expected compressed buffer for attachment (size {}) for {}", - AttachmentBuffer.GetSize(), - Entry.AttachmentHash))); - } - } - } - else /* not compressed */ - { - if (Entry.Flags & CbAttachmentEntry::kIsObject) - { - if (i == 0) - { - Package.SetObject(LoadCompactBinaryObject(AttachmentBuffer)); - } - else - { - MalformedAttachments.push_back( - std::make_pair(i, - fmt::format("Invalid format, object attachments are not currently supported (size {}) for {}", - AttachmentBuffer.GetSize(), - Entry.AttachmentHash))); - } - } - else if (AttachmentSize > 0) - { - // Make a copy of the buffer so the attachments don't reference the entire payload - IoBuffer AttachmentBufferCopy = CreateBuffer(Entry.AttachmentHash, AttachmentSize); - ZEN_ASSERT(AttachmentBufferCopy); - ZEN_ASSERT(AttachmentBufferCopy.Size() == AttachmentSize); - AttachmentBufferCopy.GetMutableView().CopyFrom(AttachmentBuffer.GetView()); - - Attachments.emplace_back(SharedBuffer{AttachmentBufferCopy}); - } - else - { - MalformedAttachments.push_back( - std::make_pair(i, fmt::format("Invalid format, attachment of size zero detected for {}", Entry.AttachmentHash))); - } - } - } - PartialFileBuffers.clear(); - - Package.AddAttachments(Attachments); - - using namespace std::literals; - - if (!MalformedAttachments.empty()) - { - StringBuilder<1024> SB; - SB << (uint64_t)MalformedAttachments.size() << " malformed attachments in package message:\n"; - for (const auto& It : MalformedAttachments) - { - SB << " #"sv << It.first << ": " << It.second << "\n"; - } - ZEN_WARN("{}", SB.ToView()); - throw std::invalid_argument(SB.ToString()); - } - - return Package; -} - -bool -ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage) -{ - if (IsPackageMessage(Response)) - { - OutPackage = ParsePackageMessage(Response); - return true; - } - return OutPackage.TryLoad(Response); -} - -CbPackageReader::CbPackageReader() : m_CreateBuffer([](const IoHash&, uint64_t Size) -> IoBuffer { return IoBuffer{Size}; }) -{ -} - -CbPackageReader::~CbPackageReader() -{ -} - -void -CbPackageReader::SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer) -{ - m_CreateBuffer = CreateBuffer; -} - -uint64_t -CbPackageReader::ProcessPackageHeaderData(const void* Data, uint64_t DataBytes) -{ - ZEN_ASSERT(m_CurrentState != State::kReadingBuffers); - - switch (m_CurrentState) - { - case State::kInitialState: - ZEN_ASSERT(Data == nullptr); - m_CurrentState = State::kReadingHeader; - return sizeof m_PackageHeader; - - case State::kReadingHeader: - ZEN_ASSERT(DataBytes == sizeof m_PackageHeader); - memcpy(&m_PackageHeader, Data, sizeof m_PackageHeader); - ZEN_ASSERT(m_PackageHeader.HeaderMagic == kCbPkgMagic); - m_CurrentState = State::kReadingAttachmentEntries; - m_AttachmentEntries.resize(m_PackageHeader.AttachmentCount + 1); - return (m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry); - - case State::kReadingAttachmentEntries: - ZEN_ASSERT(DataBytes == ((m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry))); - memcpy(m_AttachmentEntries.data(), Data, DataBytes); - - for (CbAttachmentEntry& Entry : m_AttachmentEntries) - { - // This preallocates memory for payloads but note that for the local references - // the caller will need to handle the payload differently (i.e it's a - // CbAttachmentReferenceHeader not the actual payload) - - m_PayloadBuffers.emplace_back(IoBuffer{Entry.PayloadSize}); - } - - m_CurrentState = State::kReadingBuffers; - return 0; - - default: - ZEN_ASSERT(false); - return 0; - } -} - -IoBuffer -CbPackageReader::MarshalLocalChunkReference(IoBuffer AttachmentBuffer) -{ - // Marshal local reference - a "pointer" to the chunk backing file - - ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader)); - - const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>(); - const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1); - - ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); - - std::u8string_view PathView{PathPointer, AttachRefHdr->AbsolutePathLength}; - - std::filesystem::path Path{PathView}; - - IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize); - - if (!ChunkReference) - { - // Unable to open chunk reference - - throw std::runtime_error(fmt::format("unable to resolve local reference to '{}' (offset {}, size {})", - PathToUtf8(Path), - AttachRefHdr->PayloadByteOffset, - AttachRefHdr->PayloadByteSize)); - } - - return ChunkReference; -}; - -void -CbPackageReader::Finalize() -{ - if (m_AttachmentEntries.empty()) - { - return; - } - - m_Attachments.reserve(m_AttachmentEntries.size() - 1); - - int CurrentAttachmentIndex = 0; - for (CbAttachmentEntry& Entry : m_AttachmentEntries) - { - IoBuffer AttachmentBuffer = m_PayloadBuffers[CurrentAttachmentIndex]; - - if (CurrentAttachmentIndex == 0) - { - // Root object - if (Entry.Flags & CbAttachmentEntry::kIsObject) - { - if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) - { - m_RootObject = LoadCompactBinaryObject(MarshalLocalChunkReference(AttachmentBuffer)); - } - else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer), RawHash, RawSize); - if (RawHash == Entry.AttachmentHash) - { - m_RootObject = LoadCompactBinaryObject(Compressed); - } - } - else - { - m_RootObject = LoadCompactBinaryObject(std::move(AttachmentBuffer)); - } - } - else - { - throw std::runtime_error("missing or invalid root object"); - } - } - else if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) - { - IoBuffer ChunkReference = MarshalLocalChunkReference(AttachmentBuffer); - - if (Entry.Flags & CbAttachmentEntry::kIsCompressed) - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference), RawHash, RawSize); - if (RawHash == Entry.AttachmentHash) - { - m_Attachments.emplace_back(CbAttachment(Compressed, Entry.AttachmentHash)); - } - } - else - { - CompressedBuffer Compressed = - CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None); - m_Attachments.emplace_back(CbAttachment(std::move(Compressed), Compressed.DecodeRawHash())); - } - } - - ++CurrentAttachmentIndex; - } -} - -/** - ______________________ _____________________________ - \__ ___/\_ _____// _____/\__ ___/ _____/ - | | | __)_ \_____ \ | | \_____ \ - | | | \/ \ | | / \ - |____| /_______ /_______ / |____| /_______ / - \/ \/ \/ - */ - -#if ZEN_WITH_TESTS - -TEST_CASE("CbPackage.Serialization") -{ - // Make a test package - - CbAttachment Attach1{SharedBuffer::MakeView(MakeMemoryView("abcd"))}; - CbAttachment Attach2{SharedBuffer::MakeView(MakeMemoryView("efgh"))}; - - CbObjectWriter Cbo; - Cbo.AddAttachment("abcd", Attach1); - Cbo.AddAttachment("efgh", Attach2); - - CbPackage Pkg; - Pkg.AddAttachment(Attach1); - Pkg.AddAttachment(Attach2); - Pkg.SetObject(Cbo.Save()); - - SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg).Flatten(); - const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData()); - uint64_t RemainingBytes = Buffer.GetSize(); - - auto ConsumeBytes = [&](uint64_t ByteCount) { - ZEN_ASSERT(ByteCount <= RemainingBytes); - void* ReturnPtr = (void*)CursorPtr; - CursorPtr += ByteCount; - RemainingBytes -= ByteCount; - return ReturnPtr; - }; - - auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) { - ZEN_ASSERT(ByteCount <= RemainingBytes); - memcpy(TargetBuffer, CursorPtr, ByteCount); - CursorPtr += ByteCount; - RemainingBytes -= ByteCount; - }; - - CbPackageReader Reader; - uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); - uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); - NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); - auto Buffers = Reader.GetPayloadBuffers(); - - for (auto& PayloadBuffer : Buffers) - { - CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); - } - - Reader.Finalize(); -} - -TEST_CASE("CbPackage.EmptyObject") -{ - CbPackage Pkg; - Pkg.SetObject({}); - std::vector<IoBuffer> Result = FormatPackageMessage(Pkg, nullptr); -} - -TEST_CASE("CbPackage.LocalRef") -{ - ScopedTemporaryDirectory TempDir; - - auto Path1 = TempDir.Path() / "abcd"; - auto Path2 = TempDir.Path() / "efgh"; - - { - IoBuffer Buffer1 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("abcd")); - IoBuffer Buffer2 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("efgh")); - - WriteFile(Path1, Buffer1); - WriteFile(Path2, Buffer2); - } - - // Make a test package - - IoBuffer FileBuffer1 = IoBufferBuilder::MakeFromFile(Path1); - IoBuffer FileBuffer2 = IoBufferBuilder::MakeFromFile(Path2); - - CbAttachment Attach1{SharedBuffer(FileBuffer1)}; - CbAttachment Attach2{SharedBuffer(FileBuffer2)}; - - CbObjectWriter Cbo; - Cbo.AddAttachment("abcd", Attach1); - Cbo.AddAttachment("efgh", Attach2); - - CbPackage Pkg; - Pkg.AddAttachment(Attach1); - Pkg.AddAttachment(Attach2); - Pkg.SetObject(Cbo.Save()); - - SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg, FormatFlags::kAllowLocalReferences).Flatten(); - const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData()); - uint64_t RemainingBytes = Buffer.GetSize(); - - auto ConsumeBytes = [&](uint64_t ByteCount) { - ZEN_ASSERT(ByteCount <= RemainingBytes); - void* ReturnPtr = (void*)CursorPtr; - CursorPtr += ByteCount; - RemainingBytes -= ByteCount; - return ReturnPtr; - }; - - auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) { - ZEN_ASSERT(ByteCount <= RemainingBytes); - memcpy(TargetBuffer, CursorPtr, ByteCount); - CursorPtr += ByteCount; - RemainingBytes -= ByteCount; - }; - - CbPackageReader Reader; - uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); - uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); - NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); - auto Buffers = Reader.GetPayloadBuffers(); - - for (auto& PayloadBuffer : Buffers) - { - CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); - } - - Reader.Finalize(); -} - -void -forcelink_packageformat() -{ -} - -#endif - -} // namespace zen diff --git a/src/zenutil/xmake.lua b/src/zenutil/xmake.lua index 744dff737..3d95651f2 100644 --- a/src/zenutil/xmake.lua +++ b/src/zenutil/xmake.lua @@ -6,5 +6,5 @@ target('zenutil') add_headerfiles("**.h") add_files("**.cpp") add_includedirs("include", {public=true}) - add_deps("zencore") + add_deps("zencore", "zenhttp") add_packages("vcpkg::robin-map", "vcpkg::spdlog") diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index 214737425..b36f11741 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -2,6 +2,7 @@ #include "zenutil/zenserverprocess.h" +#include <zencore/basicfile.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/except.h> @@ -12,7 +13,6 @@ #include <zencore/string.h> #include <zencore/thread.h> #include <zencore/timer.h> -#include <zenutil/basicfile.h> #include <atomic> diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index 97ebeb01d..c54144549 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -4,20 +4,16 @@ #if ZEN_WITH_TESTS -# include <zenutil/basicfile.h> # include <zenutil/cache/cacherequests.h> # include <zenutil/cache/rpcrecording.h> -# include <zenutil/packageformat.h> namespace zen { void zenutil_forcelinktests() { - basicfile_forcelink(); cachepolicy_forcelink(); cache::rpcrecord_forcelink(); - forcelink_packageformat(); cacherequests_forcelink(); } |