aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/basicfile.cpp1033
-rw-r--r--src/zenutil/cache/rpcrecording.cpp5
-rw-r--r--src/zenutil/include/zenutil/basicfile.h185
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupiterclient.h57
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h152
-rw-r--r--src/zenutil/include/zenutil/logging/rotatingfilesink.h2
-rw-r--r--src/zenutil/include/zenutil/packageformat.h164
-rw-r--r--src/zenutil/jupiter/jupiterclient.cpp29
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp505
-rw-r--r--src/zenutil/packageformat.cpp894
-rw-r--r--src/zenutil/xmake.lua2
-rw-r--r--src/zenutil/zenserverprocess.cpp2
-rw-r--r--src/zenutil/zenutil.cpp4
13 files changed, 749 insertions, 2285 deletions
diff --git a/src/zenutil/basicfile.cpp b/src/zenutil/basicfile.cpp
deleted file mode 100644
index 391c150c6..000000000
--- a/src/zenutil/basicfile.cpp
+++ /dev/null
@@ -1,1033 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "zenutil/basicfile.h"
-
-#include <zencore/compactbinary.h>
-#include <zencore/except.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/memory/memory.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
-
-#if ZEN_PLATFORM_WINDOWS
-# include <zencore/windows.h>
-#else
-# include <fcntl.h>
-# include <sys/file.h>
-# include <sys/stat.h>
-# include <unistd.h>
-#endif
-
-#include <fmt/format.h>
-#include <gsl/gsl-lite.hpp>
-
-namespace zen {
-
-BasicFile::~BasicFile()
-{
- Close();
-}
-
-void
-BasicFile::Open(const std::filesystem::path& FileName, Mode Mode)
-{
- std::error_code Ec;
- Open(FileName, Mode, Ec);
-
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("failed to open file '{}', mode: {:x}", FileName, uint32_t(Mode)));
- }
-}
-
-void
-BasicFile::Open(const std::filesystem::path& FileName, Mode InMode, std::error_code& Ec)
-{
- Ec.clear();
-
- Mode Mode = InMode & Mode::kModeMask;
-
-#if ZEN_PLATFORM_WINDOWS
- DWORD dwCreationDisposition = 0;
- DWORD dwDesiredAccess = 0;
- switch (Mode)
- {
- case Mode::kRead:
- dwCreationDisposition |= OPEN_EXISTING;
- dwDesiredAccess |= GENERIC_READ;
- break;
- case Mode::kWrite:
- dwCreationDisposition |= OPEN_ALWAYS;
- dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE);
- break;
- case Mode::kDelete:
- dwCreationDisposition |= OPEN_ALWAYS;
- dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE | DELETE);
- break;
- case Mode::kTruncate:
- dwCreationDisposition |= CREATE_ALWAYS;
- dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE);
- break;
- case Mode::kTruncateDelete:
- dwCreationDisposition |= CREATE_ALWAYS;
- dwDesiredAccess |= (GENERIC_READ | GENERIC_WRITE | DELETE);
- break;
- }
-
- const DWORD dwShareMode = FILE_SHARE_READ | (EnumHasAllFlags(InMode, Mode::kPreventWrite) ? 0 : FILE_SHARE_WRITE) |
- (EnumHasAllFlags(InMode, Mode::kPreventDelete) ? 0 : FILE_SHARE_DELETE);
- const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL;
- const HANDLE hTemplateFile = nullptr;
- const HANDLE FileHandle = CreateFile(FileName.c_str(),
- dwDesiredAccess,
- dwShareMode,
- /* lpSecurityAttributes */ nullptr,
- dwCreationDisposition,
- dwFlagsAndAttributes,
- hTemplateFile);
-
- if (FileHandle == INVALID_HANDLE_VALUE)
- {
- Ec = MakeErrorCodeFromLastError();
-
- return;
- }
-#else
- int OpenFlags = O_CLOEXEC;
- switch (Mode)
- {
- case Mode::kRead:
- OpenFlags |= O_RDONLY;
- break;
- case Mode::kWrite:
- case Mode::kDelete:
- OpenFlags |= (O_RDWR | O_CREAT);
- break;
- case Mode::kTruncate:
- case Mode::kTruncateDelete:
- OpenFlags |= (O_RDWR | O_CREAT | O_TRUNC);
- break;
- }
-
- int Fd = open(FileName.c_str(), OpenFlags, 0666);
- if (Fd < 0)
- {
- Ec = MakeErrorCodeFromLastError();
- return;
- }
- if (Mode != Mode::kRead)
- {
- fchmod(Fd, 0666);
- }
-
- void* FileHandle = (void*)(uintptr_t(Fd));
-#endif
-
- m_FileHandle = FileHandle;
-}
-
-void
-BasicFile::Open(const std::filesystem::path& FileName, Mode Mode, std::function<bool(std::error_code& Ec)>&& RetryCallback)
-{
- std::error_code Ec;
- Open(FileName, Mode, Ec);
- while (Ec && RetryCallback(Ec))
- {
- Ec.clear();
- Open(FileName, Mode, Ec);
- }
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("failed to open file '{}', mode: {:x}", FileName, uint32_t(Mode)));
- }
-}
-
-void
-BasicFile::Close()
-{
- if (m_FileHandle)
- {
-#if ZEN_PLATFORM_WINDOWS
- ::CloseHandle(m_FileHandle);
-#else
- int Fd = int(uintptr_t(m_FileHandle));
- close(Fd);
-#endif
- m_FileHandle = nullptr;
- }
-}
-
-IoBuffer
-BasicFile::ReadRange(uint64_t FileOffset, uint64_t ByteCount)
-{
- return IoBufferBuilder::MakeFromFileHandle(m_FileHandle, FileOffset, ByteCount);
-}
-
-void
-BasicFile::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset)
-{
- const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
-
- while (BytesToRead)
- {
- const uint64_t NumberOfBytesToRead = Min(BytesToRead, MaxChunkSize);
- int32_t Error = 0;
- size_t BytesRead = 0;
-
-#if ZEN_PLATFORM_WINDOWS
- OVERLAPPED Ovl{};
-
- Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu);
- Ovl.OffsetHigh = DWORD(FileOffset >> 32);
-
- DWORD dwNumberOfBytesRead = 0;
- BOOL Success = ::ReadFile(m_FileHandle, Data, DWORD(NumberOfBytesToRead), &dwNumberOfBytesRead, &Ovl);
- if (Success)
- {
- BytesRead = size_t(dwNumberOfBytesRead);
- }
- else
- {
- Error = zen::GetLastError();
- }
-#else
- static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files");
- int Fd = int(uintptr_t(m_FileHandle));
- ssize_t ReadResult = pread(Fd, Data, NumberOfBytesToRead, FileOffset);
- if (ReadResult != -1)
- {
- BytesRead = size_t(ReadResult);
- }
- else
- {
- Error = zen::GetLastError();
- }
-#endif
-
- if (Error || (BytesRead != NumberOfBytesToRead))
- {
- std::error_code DummyEc;
- throw std::system_error(std::error_code(Error, std::system_category()),
- fmt::format("ReadFile/pread failed (offset {:#x}, size {:#x}) file: '{}' (size {:#x})",
- FileOffset,
- NumberOfBytesToRead,
- PathFromHandle(m_FileHandle, DummyEc),
- FileSizeFromHandle(m_FileHandle)));
- }
-
- BytesToRead -= NumberOfBytesToRead;
- FileOffset += NumberOfBytesToRead;
- Data = reinterpret_cast<uint8_t*>(Data) + NumberOfBytesToRead;
- }
-}
-
-IoBuffer
-BasicFile::ReadAll()
-{
- if (const uint64_t Size = FileSize())
- {
- IoBuffer Buffer(Size);
- Read(Buffer.MutableData(), Size, 0);
- return Buffer;
- }
- else
- {
- return {};
- }
-}
-
-void
-BasicFile::StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun)
-{
- StreamByteRange(0, FileSize(), std::move(ChunkFun));
-}
-
-void
-BasicFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun)
-{
- const uint64_t ChunkSize = 128 * 1024;
- IoBuffer ReadBuffer{ChunkSize};
- void* BufferPtr = ReadBuffer.MutableData();
-
- uint64_t RemainBytes = Size;
- uint64_t CurrentOffset = FileOffset;
-
- while (RemainBytes)
- {
- const uint64_t ThisChunkBytes = zen::Min(ChunkSize, RemainBytes);
-
- Read(BufferPtr, ThisChunkBytes, CurrentOffset);
-
- ChunkFun(BufferPtr, ThisChunkBytes);
-
- CurrentOffset += ThisChunkBytes;
- RemainBytes -= ThisChunkBytes;
- }
-}
-
-uint64_t
-BasicFile::Write(CompositeBuffer Data, uint64_t FileOffset, std::error_code& Ec)
-{
- uint64_t WrittenBytes = 0;
- for (const SharedBuffer& Buffer : Data.GetSegments())
- {
- MemoryView BlockView = Buffer.GetView();
- Write(BlockView, FileOffset + WrittenBytes, Ec);
-
- if (Ec)
- {
- return WrittenBytes;
- }
-
- WrittenBytes += BlockView.GetSize();
- }
-
- return WrittenBytes;
-}
-
-void
-BasicFile::Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec)
-{
- Write(Data.GetData(), Data.GetSize(), FileOffset, Ec);
-}
-
-void
-BasicFile::Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec)
-{
- Ec.clear();
-
- const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
-
- while (Size)
- {
- const uint64_t NumberOfBytesToWrite = Min(Size, MaxChunkSize);
-
-#if ZEN_PLATFORM_WINDOWS
- OVERLAPPED Ovl{};
-
- Ovl.Offset = DWORD(FileOffset & 0xffff'ffffu);
- Ovl.OffsetHigh = DWORD(FileOffset >> 32);
-
- DWORD dwNumberOfBytesWritten = 0;
-
- BOOL Success = ::WriteFile(m_FileHandle, Data, DWORD(NumberOfBytesToWrite), &dwNumberOfBytesWritten, &Ovl);
-#else
- static_assert(sizeof(off_t) >= sizeof(uint64_t), "sizeof(off_t) does not support large files");
- int Fd = int(uintptr_t(m_FileHandle));
- int BytesWritten = pwrite(Fd, Data, NumberOfBytesToWrite, FileOffset);
- bool Success = (BytesWritten > 0);
-#endif
-
- if (!Success)
- {
- Ec = MakeErrorCodeFromLastError();
-
- return;
- }
-
- Size -= NumberOfBytesToWrite;
- FileOffset += NumberOfBytesToWrite;
- Data = reinterpret_cast<const uint8_t*>(Data) + NumberOfBytesToWrite;
- }
-}
-
-void
-BasicFile::Write(MemoryView Data, uint64_t FileOffset)
-{
- Write(Data.GetData(), Data.GetSize(), FileOffset);
-}
-
-void
-BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset)
-{
- std::error_code Ec;
- Write(Data, Size, Offset, Ec);
-
- if (Ec)
- {
- std::error_code Dummy;
- throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle, Dummy)));
- }
-}
-
-void
-BasicFile::WriteAll(IoBuffer Data, std::error_code& Ec)
-{
- Write(Data.Data(), Data.Size(), 0, Ec);
-}
-
-void
-BasicFile::Flush()
-{
- if (m_FileHandle == nullptr)
- {
- return;
- }
-#if ZEN_PLATFORM_WINDOWS
- FlushFileBuffers(m_FileHandle);
-#else
- int Fd = int(uintptr_t(m_FileHandle));
- fsync(Fd);
-#endif
-}
-
-uint64_t
-BasicFile::FileSize() const
-{
-#if ZEN_PLATFORM_WINDOWS
- ULARGE_INTEGER liFileSize;
- liFileSize.LowPart = ::GetFileSize(m_FileHandle, &liFileSize.HighPart);
- if (liFileSize.LowPart == INVALID_FILE_SIZE)
- {
- int Error = zen::GetLastError();
- if (Error)
- {
- std::error_code Dummy;
- ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy)));
- }
- }
- return uint64_t(liFileSize.QuadPart);
-#else
- int Fd = int(uintptr_t(m_FileHandle));
- static_assert(sizeof(decltype(stat::st_size)) == sizeof(uint64_t), "fstat() doesn't support large files");
- struct stat Stat;
- if (fstat(Fd, &Stat) == -1)
- {
- std::error_code Dummy;
- ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy)));
- }
- return uint64_t(Stat.st_size);
-#endif
-}
-
-uint64_t
-BasicFile::FileSize(std::error_code& Ec) const
-{
-#if ZEN_PLATFORM_WINDOWS
- ULARGE_INTEGER liFileSize;
- liFileSize.LowPart = ::GetFileSize(m_FileHandle, &liFileSize.HighPart);
- if (liFileSize.LowPart == INVALID_FILE_SIZE)
- {
- int Error = zen::GetLastError();
- if (Error)
- {
- Ec = MakeErrorCode(Error);
- return 0;
- }
- }
- return uint64_t(liFileSize.QuadPart);
-#else
- int Fd = int(uintptr_t(m_FileHandle));
- static_assert(sizeof(decltype(stat::st_size)) == sizeof(uint64_t), "fstat() doesn't support large files");
- struct stat Stat;
- if (fstat(Fd, &Stat) == -1)
- {
- Ec = MakeErrorCodeFromLastError();
- return 0;
- }
- return uint64_t(Stat.st_size);
-#endif
-}
-
-void
-BasicFile::SetFileSize(uint64_t FileSize)
-{
-#if ZEN_PLATFORM_WINDOWS
- LARGE_INTEGER liFileSize;
- liFileSize.QuadPart = FileSize;
- BOOL OK = ::SetFilePointerEx(m_FileHandle, liFileSize, 0, FILE_BEGIN);
- if (OK == FALSE)
- {
- int Error = zen::GetLastError();
- if (Error)
- {
- std::error_code Dummy;
- ThrowSystemError(Error,
- fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
- }
- }
- OK = ::SetEndOfFile(m_FileHandle);
- if (OK == FALSE)
- {
- int Error = zen::GetLastError();
- if (Error)
- {
- std::error_code Dummy;
- ThrowSystemError(Error,
- fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
- }
- }
-#elif ZEN_PLATFORM_MAC
- int Fd = int(intptr_t(m_FileHandle));
- if (ftruncate(Fd, (off_t)FileSize) < 0)
- {
- int Error = zen::GetLastError();
- if (Error)
- {
- std::error_code Dummy;
- ThrowSystemError(Error,
- fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
- }
- }
-#else
- int Fd = int(intptr_t(m_FileHandle));
- if (ftruncate64(Fd, (off64_t)FileSize) < 0)
- {
- int Error = zen::GetLastError();
- if (Error)
- {
- std::error_code Dummy;
- ThrowSystemError(Error,
- fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
- }
- }
- if (FileSize > 0)
- {
- int Error = posix_fallocate64(Fd, 0, (off64_t)FileSize);
- if (Error)
- {
- std::error_code Dummy;
- ThrowSystemError(Error,
- fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
- }
- }
-#endif
-}
-
-void
-BasicFile::Attach(void* Handle)
-{
- ZEN_ASSERT(Handle != nullptr);
- ZEN_ASSERT(m_FileHandle == nullptr);
- m_FileHandle = Handle;
-}
-
-void*
-BasicFile::Detach()
-{
- void* FileHandle = m_FileHandle;
- m_FileHandle = 0;
- return FileHandle;
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-TemporaryFile::~TemporaryFile()
-{
- Close();
-}
-
-void
-TemporaryFile::Close()
-{
- if (m_FileHandle)
- {
-#if ZEN_PLATFORM_WINDOWS
- // Mark file for deletion when final handle is closed
-
- FILE_DISPOSITION_INFO Fdi{.DeleteFile = TRUE};
-
- SetFileInformationByHandle(m_FileHandle, FileDispositionInfo, &Fdi, sizeof Fdi);
-#else
- std::error_code Ec;
- std::filesystem::path FilePath = zen::PathFromHandle(m_FileHandle, Ec);
- if (!Ec)
- {
- unlink(FilePath.c_str());
- }
-#endif
-
- BasicFile::Close();
- }
-}
-
-void
-TemporaryFile::CreateTemporary(std::filesystem::path TempDirName, std::error_code& Ec)
-{
- StringBuilder<64> TempName;
- Oid::NewOid().ToString(TempName);
-
- m_TempPath = TempDirName / TempName.c_str();
-
- Open(m_TempPath, BasicFile::Mode::kTruncateDelete, Ec);
-}
-
-void
-TemporaryFile::MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::error_code& Ec)
-{
- // We intentionally call the base class Close() since otherwise we'll end up
- // deleting the temporary file
- BasicFile::Close();
-
- std::filesystem::rename(m_TempPath, FinalFileName, Ec);
-
- if (Ec)
- {
- // Try to re-open the temp file so we clean up after us when TemporaryFile is destructed
- std::error_code DummyEc;
- Open(m_TempPath, BasicFile::Mode::kWrite, DummyEc);
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-void
-TemporaryFile::SafeWriteFile(const std::filesystem::path& Path, MemoryView Data)
-{
- TemporaryFile TempFile;
- std::error_code Ec;
- SafeWriteFile(Path, Data, Ec);
- if (Ec)
- {
- throw std::system_error(Ec, fmt::format("Failed to safely write file '{}'", Path));
- }
-}
-
-void
-TemporaryFile::SafeWriteFile(const std::filesystem::path& Path, MemoryView Data, std::error_code& OutEc)
-{
- TemporaryFile TempFile;
- if (TempFile.CreateTemporary(Path.parent_path(), OutEc); !OutEc)
- {
- if (TempFile.Write(Data, 0, OutEc); !OutEc)
- {
- TempFile.MoveTemporaryIntoPlace(Path, OutEc);
- }
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-LockFile::LockFile()
-{
-}
-
-LockFile::~LockFile()
-{
-#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- int Fd = int(intptr_t(m_FileHandle));
- flock(Fd, LOCK_UN | LOCK_NB);
-#endif
-}
-
-void
-LockFile::Create(std::filesystem::path FileName, CbObject Payload, std::error_code& Ec)
-{
-#if ZEN_PLATFORM_WINDOWS
- Ec.clear();
-
- const DWORD dwCreationDisposition = CREATE_ALWAYS;
- DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE | DELETE;
- const DWORD dwShareMode = FILE_SHARE_READ;
- const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_DELETE_ON_CLOSE;
- HANDLE hTemplateFile = nullptr;
-
- HANDLE FileHandle = CreateFile(FileName.c_str(),
- dwDesiredAccess,
- dwShareMode,
- /* lpSecurityAttributes */ nullptr,
- dwCreationDisposition,
- dwFlagsAndAttributes,
- hTemplateFile);
-
- if (FileHandle == INVALID_HANDLE_VALUE)
- {
- Ec = zen::MakeErrorCodeFromLastError();
-
- return;
- }
-#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
- int Fd = open(FileName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666);
- if (Fd < 0)
- {
- Ec = zen::MakeErrorCodeFromLastError();
- return;
- }
- fchmod(Fd, 0666);
-
- int LockRet = flock(Fd, LOCK_EX | LOCK_NB);
- if (LockRet < 0)
- {
- Ec = zen::MakeErrorCodeFromLastError();
- close(Fd);
- return;
- }
-
- void* FileHandle = (void*)uintptr_t(Fd);
-#endif
-
- m_FileHandle = FileHandle;
-
- BasicFile::Write(Payload.GetBuffer(), 0, Ec);
-}
-
-void
-LockFile::Update(CbObject Payload, std::error_code& Ec)
-{
- BasicFile::Write(Payload.GetBuffer(), 0, Ec);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-BasicFileBuffer::BasicFileBuffer(BasicFile& Base, uint64_t BufferSize)
-: m_Base(Base)
-, m_Buffer(nullptr)
-, m_BufferSize(BufferSize)
-, m_Size(Base.FileSize())
-, m_BufferStart(0)
-, m_BufferEnd(0)
-{
- m_Buffer = (uint8_t*)Memory::Alloc(m_BufferSize);
-}
-
-BasicFileBuffer::~BasicFileBuffer()
-{
- Memory::Free(m_Buffer);
-}
-
-void
-BasicFileBuffer::Read(void* Data, uint64_t Size, uint64_t FileOffset)
-{
- if (m_Buffer == nullptr || (Size > m_BufferSize) || (FileOffset + Size > m_Size))
- {
- m_Base.Read(Data, Size, FileOffset);
- return;
- }
- uint8_t* WritePtr = ((uint8_t*)Data);
- uint64_t Begin = FileOffset;
- uint64_t End = FileOffset + Size;
- if (FileOffset <= m_BufferStart)
- {
- if (End > m_BufferStart)
- {
- uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart;
- memcpy(WritePtr + End - Count - FileOffset, m_Buffer, Count);
- End -= Count;
- if (Begin == End)
- {
- return;
- }
- }
- }
- else if (FileOffset < m_BufferEnd)
- {
- uint64_t Count = Min(m_BufferEnd, End) - FileOffset;
- memcpy(WritePtr + Begin - FileOffset, m_Buffer + Begin - m_BufferStart, Count);
- Begin += Count;
- if (Begin == End)
- {
- return;
- }
- }
- m_BufferStart = Begin;
- m_BufferEnd = Min(Begin + m_BufferSize, m_Size);
- m_Base.Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart);
- uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart;
- memcpy(WritePtr + Begin - FileOffset, m_Buffer, Count);
- ZEN_ASSERT(Begin + Count == End);
-}
-
-MemoryView
-BasicFileBuffer::MakeView(uint64_t Size, uint64_t FileOffset)
-{
- if (FileOffset < m_BufferStart || (FileOffset + Size) > m_BufferEnd)
- {
- if (m_Buffer == nullptr || (Size > m_BufferSize) || (FileOffset + Size > m_Size))
- {
- return {};
- }
- m_BufferStart = FileOffset;
- m_BufferEnd = Min(m_BufferStart + m_BufferSize, m_Size);
- m_Base.Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart);
- }
- return MemoryView(m_Buffer + (FileOffset - m_BufferStart), Size);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-BasicFileWriter::BasicFileWriter(BasicFile& Base, uint64_t BufferSize)
-: m_Base(Base)
-, m_Buffer(nullptr)
-, m_BufferSize(BufferSize)
-, m_BufferStart(0)
-, m_BufferEnd(0)
-{
- m_Buffer = (uint8_t*)Memory::Alloc(m_BufferSize);
-}
-
-BasicFileWriter::~BasicFileWriter()
-{
- Flush();
- Memory::Free(m_Buffer);
-}
-
-void
-BasicFileWriter::Write(const void* Data, uint64_t Size, uint64_t FileOffset)
-{
- if (m_Buffer == nullptr || (Size >= m_BufferSize))
- {
- m_Base.Write(Data, Size, FileOffset);
- return;
- }
-
- // Note that this only supports buffering of sequential writes!
-
- if (FileOffset != m_BufferEnd)
- {
- Flush();
- m_BufferStart = m_BufferEnd = FileOffset;
- }
-
- const uint8_t* DataPtr = (const uint8_t*)Data;
- while (Size)
- {
- const uint64_t RemainingBufferCapacity = m_BufferStart + m_BufferSize - m_BufferEnd;
- const uint64_t BlockWriteBytes = Min(RemainingBufferCapacity, Size);
- const uint64_t BufferWriteOffset = FileOffset - m_BufferStart;
-
- ZEN_ASSERT_SLOW(BufferWriteOffset < m_BufferSize);
- ZEN_ASSERT_SLOW((BufferWriteOffset + BlockWriteBytes) <= m_BufferSize);
-
- memcpy(m_Buffer + BufferWriteOffset, DataPtr, BlockWriteBytes);
-
- Size -= BlockWriteBytes;
- m_BufferEnd += BlockWriteBytes;
- FileOffset += BlockWriteBytes;
- DataPtr += BlockWriteBytes;
-
- if ((m_BufferEnd - m_BufferStart) == m_BufferSize)
- {
- Flush();
- }
- }
-}
-
-void
-BasicFileWriter::Flush()
-{
- const uint64_t BufferedBytes = m_BufferEnd - m_BufferStart;
-
- if (BufferedBytes == 0)
- return;
-
- const uint64_t WriteOffset = m_BufferStart;
- m_BufferStart = m_BufferEnd;
-
- m_Base.Write(m_Buffer, BufferedBytes, WriteOffset);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-/*
- ___________ __
- \__ ___/___ _______/ |_ ______
- | |_/ __ \ / ___/\ __\/ ___/
- | |\ ___/ \___ \ | | \___ \
- |____| \___ >____ > |__| /____ >
- \/ \/ \/
-*/
-
-#if ZEN_WITH_TESTS
-
-TEST_CASE("BasicFile")
-{
- ScopedCurrentDirectoryChange _;
-
- BasicFile File1;
- CHECK_THROWS(File1.Open("zonk", BasicFile::Mode::kRead));
- CHECK_NOTHROW(File1.Open("zonk", BasicFile::Mode::kTruncate));
- CHECK_NOTHROW(File1.Write("abcd", 4, 0));
- CHECK(File1.FileSize() == 4);
- {
- IoBuffer Data = File1.ReadAll();
- CHECK(Data.Size() == 4);
- CHECK_EQ(memcmp(Data.Data(), "abcd", 4), 0);
- }
- CHECK_NOTHROW(File1.Write("efgh", 4, 2));
- CHECK(File1.FileSize() == 6);
- {
- IoBuffer Data = File1.ReadAll();
- CHECK(Data.Size() == 6);
- CHECK_EQ(memcmp(Data.Data(), "abefgh", 6), 0);
- }
-}
-
-TEST_CASE("TemporaryFile")
-{
- ScopedCurrentDirectoryChange _;
-
- SUBCASE("DeleteOnClose")
- {
- std::filesystem::path Path;
- {
- TemporaryFile TmpFile;
- std::error_code Ec;
- TmpFile.CreateTemporary(std::filesystem::current_path(), Ec);
- CHECK(!Ec);
- Path = TmpFile.GetPath();
- CHECK(std::filesystem::exists(Path));
- }
- CHECK(std::filesystem::exists(Path) == false);
- }
-
- SUBCASE("MoveIntoPlace")
- {
- TemporaryFile TmpFile;
- std::error_code Ec;
- TmpFile.CreateTemporary(std::filesystem::current_path(), Ec);
- CHECK(!Ec);
- std::filesystem::path TempPath = TmpFile.GetPath();
- std::filesystem::path FinalPath = std::filesystem::current_path() / "final";
- CHECK(std::filesystem::exists(TempPath));
- TmpFile.MoveTemporaryIntoPlace(FinalPath, Ec);
- CHECK(!Ec);
- CHECK(std::filesystem::exists(TempPath) == false);
- CHECK(std::filesystem::exists(FinalPath));
- }
-}
-
-TEST_CASE("BasicFileBuffer")
-{
- ScopedCurrentDirectoryChange _;
- {
- BasicFile File1;
- const std::string_view Data = "0123456789abcdef";
- File1.Open("buffered", BasicFile::Mode::kTruncate);
- for (uint32_t I = 0; I < 16; ++I)
- {
- File1.Write(Data.data(), Data.size(), I * Data.size());
- }
- }
- SUBCASE("EvenBuffer")
- {
- BasicFile File1;
- File1.Open("buffered", BasicFile::Mode::kRead);
- BasicFileBuffer File1Buffer(File1, 16);
- // Non-primed
- {
- char Buffer[16] = {0};
- File1Buffer.Read(Buffer, 16, 1 * 16);
- std::string_view Verify(Buffer, 16);
- CHECK(Verify == "0123456789abcdef");
- }
- // Primed
- {
- char Buffer[16] = {0};
- File1Buffer.Read(Buffer, 16, 1 * 16);
- std::string_view Verify(Buffer, 16);
- CHECK(Verify == "0123456789abcdef");
- }
- }
- SUBCASE("UnevenBuffer")
- {
- BasicFile File1;
- File1.Open("buffered", BasicFile::Mode::kRead);
- BasicFileBuffer File1Buffer(File1, 16);
- // Non-primed
- {
- char Buffer[16] = {0};
- File1Buffer.Read(Buffer, 16, 7);
- std::string_view Verify(Buffer, 16);
- CHECK(Verify == "789abcdef0123456");
- }
- // Primed
- {
- char Buffer[16] = {0};
- File1Buffer.Read(Buffer, 16, 7);
- std::string_view Verify(Buffer, 16);
- CHECK(Verify == "789abcdef0123456");
- }
- }
- SUBCASE("BiggerThanBuffer")
- {
- BasicFile File1;
- File1.Open("buffered", BasicFile::Mode::kRead);
- BasicFileBuffer File1Buffer(File1, 16);
- char Buffer[17] = {0};
- File1Buffer.Read(Buffer, 17, 0 * 16);
- std::string_view Verify(Buffer, 17);
- CHECK(Verify == "0123456789abcdef0");
- }
- SUBCASE("InsideBuffer")
- {
- BasicFile File1;
- File1.Open("buffered", BasicFile::Mode::kRead);
- BasicFileBuffer File1Buffer(File1, 16);
- char Buffer[16] = {0};
- File1Buffer.Read(Buffer, 16, 0 * 16);
-
- File1Buffer.Read(Buffer, 8, 2);
- std::string_view Verify(Buffer, 8);
- CHECK(Verify == "23456789");
- }
- SUBCASE("BeginningOfBuffer")
- {
- BasicFile File1;
- File1.Open("buffered", BasicFile::Mode::kRead);
- BasicFileBuffer File1Buffer(File1, 16);
- char Buffer[16] = {0};
- File1Buffer.Read(Buffer, 16, 8);
-
- File1Buffer.Read(Buffer, 8, 8);
- std::string_view Verify(Buffer, 8);
- CHECK(Verify == "89abcdef");
- }
- SUBCASE("EndOfBuffer")
- {
- BasicFile File1;
- File1.Open("buffered", BasicFile::Mode::kRead);
- BasicFileBuffer File1Buffer(File1, 16);
- char Buffer[16] = {0};
- File1Buffer.Read(Buffer, 16, 0 * 16);
-
- File1Buffer.Read(Buffer, 8, 8);
- std::string_view Verify(Buffer, 8);
- CHECK(Verify == "89abcdef");
- }
- SUBCASE("OverEnd")
- {
- BasicFile File1;
- File1.Open("buffered", BasicFile::Mode::kRead);
- BasicFileBuffer File1Buffer(File1, 16);
- char Buffer[16] = {0};
- File1Buffer.Read(Buffer, 16, 0 * 16);
-
- File1Buffer.Read(Buffer, 16, 8);
- std::string_view Verify(Buffer, 16);
- CHECK(Verify == "89abcdef01234567");
- }
- SUBCASE("OverBegin")
- {
- BasicFile File1;
- File1.Open("buffered", BasicFile::Mode::kRead);
- BasicFileBuffer File1Buffer(File1, 16);
- char Buffer[16] = {0};
- File1Buffer.Read(Buffer, 16, 1 * 16);
-
- File1Buffer.Read(Buffer, 16, 8);
- std::string_view Verify(Buffer, 16);
- CHECK(Verify == "89abcdef01234567");
- }
- SUBCASE("EndOfFile")
- {
- BasicFile File1;
- File1.Open("buffered", BasicFile::Mode::kRead);
- BasicFileBuffer File1Buffer(File1, 16);
- char Buffer[16] = {0};
- File1Buffer.Read(Buffer, 16, 0 * 16);
-
- File1Buffer.Read(Buffer, 8, 256 - 8);
- std::string_view Verify(Buffer, 8);
- CHECK(Verify == "89abcdef");
- }
-}
-
-void
-basicfile_forcelink()
-{
-}
-
-#endif
-
-} // namespace zen
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 9bef4d1a4..1f951167d 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -1,5 +1,8 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zenutil/cache/rpcrecording.h>
+
+#include <zencore/basicfile.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -8,8 +11,6 @@
#include <zencore/system.h>
#include <zencore/testing.h>
#include <zencore/testutils.h>
-#include <zenutil/basicfile.h>
-#include <zenutil/cache/rpcrecording.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <fmt/format.h>
diff --git a/src/zenutil/include/zenutil/basicfile.h b/src/zenutil/include/zenutil/basicfile.h
deleted file mode 100644
index 03c5605df..000000000
--- a/src/zenutil/include/zenutil/basicfile.h
+++ /dev/null
@@ -1,185 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/zencore.h>
-
-#include <zencore/compositebuffer.h>
-#include <zencore/enumflags.h>
-#include <zencore/iobuffer.h>
-
-#include <filesystem>
-#include <functional>
-
-namespace zen {
-
-class CbObject;
-
-/**
- * Probably the most basic file abstraction in the universe
- *
- * One thing of note is that there is no notion of a "current file position"
- * in this API -- all reads and writes are done from explicit offsets in
- * the file. This avoids concurrency issues which can occur otherwise.
- *
- */
-
-class BasicFile
-{
-public:
- BasicFile() = default;
- ~BasicFile();
-
- BasicFile(const BasicFile&) = delete;
- BasicFile& operator=(const BasicFile&) = delete;
-
- enum class Mode : uint32_t
- {
- kRead = 0, // Opens a existing file for read only
- kWrite = 1, // Opens (or creates) a file for read and write
- kTruncate = 2, // Opens (or creates) a file for read and write and sets the size to zero
- kDelete = 3, // Opens (or creates) a file for read and write allowing .DeleteFile file disposition to be set
- kTruncateDelete =
- 4, // Opens (or creates) a file for read and write and sets the size to zero allowing .DeleteFile file disposition to be set
- kModeMask = 0x0007,
- kPreventDelete = 0x1000'0000, // Do not open with delete sharing mode (prevent other processes from deleting file while open)
- kPreventWrite = 0x2000'0000, // Do not open with write sharing mode (prevent other processes from writing to file while open)
- };
-
- void Open(const std::filesystem::path& FileName, Mode Mode);
- void Open(const std::filesystem::path& FileName, Mode Mode, std::error_code& Ec);
- void Open(const std::filesystem::path& FileName, Mode Mode, std::function<bool(std::error_code& Ec)>&& RetryCallback);
- void Close();
- void Read(void* Data, uint64_t Size, uint64_t FileOffset);
- IoBuffer ReadRange(uint64_t FileOffset, uint64_t ByteCount);
- void StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
- void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun);
- void Write(MemoryView Data, uint64_t FileOffset);
- void Write(MemoryView Data, uint64_t FileOffset, std::error_code& Ec);
- uint64_t Write(CompositeBuffer Data, uint64_t FileOffset, std::error_code& Ec);
- void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
- void Write(const void* Data, uint64_t Size, uint64_t FileOffset, std::error_code& Ec);
- void Flush();
- [[nodiscard]] uint64_t FileSize() const;
- [[nodiscard]] uint64_t FileSize(std::error_code& Ec) const;
- void SetFileSize(uint64_t FileSize);
- IoBuffer ReadAll();
- void WriteAll(IoBuffer Data, std::error_code& Ec);
- void Attach(void* Handle);
- void* Detach();
-
- inline void* Handle() { return m_FileHandle; }
- bool IsOpen() const { return m_FileHandle != nullptr; }
-
-protected:
- void* m_FileHandle = nullptr; // This is either null or valid
-private:
-};
-
-ENUM_CLASS_FLAGS(BasicFile::Mode);
-
-/**
- * Simple abstraction for a temporary file
- *
- * Works like a regular BasicFile but implements a simple mechanism to allow creating
- * a temporary file for writing in a directory which may later be moved atomically
- * into the intended location after it has been fully written to.
- *
- */
-
-class TemporaryFile : public BasicFile
-{
-public:
- TemporaryFile() = default;
- ~TemporaryFile();
-
- TemporaryFile(const TemporaryFile&) = delete;
- TemporaryFile& operator=(const TemporaryFile&) = delete;
-
- void CreateTemporary(std::filesystem::path TempDirName, std::error_code& Ec);
- void MoveTemporaryIntoPlace(std::filesystem::path FinalFileName, std::error_code& Ec);
- const std::filesystem::path& GetPath() const { return m_TempPath; }
-
- static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data);
- static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data, std::error_code& OutEc);
-
-private:
- void Close();
- std::filesystem::path m_TempPath;
-
- using BasicFile::Open;
-};
-
-/** Lock file abstraction
-
- */
-
-class LockFile : protected BasicFile
-{
-public:
- LockFile();
- ~LockFile();
-
- void Create(std::filesystem::path FileName, CbObject Payload, std::error_code& Ec);
- void Update(CbObject Payload, std::error_code& Ec);
-
-private:
-};
-
-/** Adds a layer of buffered reading to a BasicFile
-
-This class is not intended for concurrent access, it is not thread safe.
-
-*/
-
-class BasicFileBuffer
-{
-public:
- BasicFileBuffer(BasicFile& Base, uint64_t BufferSize);
- ~BasicFileBuffer();
-
- void Read(void* Data, uint64_t Size, uint64_t FileOffset);
- MemoryView MakeView(uint64_t Size, uint64_t FileOffset);
-
- template<typename T>
- const T* MakeView(uint64_t FileOffset)
- {
- MemoryView View = MakeView(sizeof(T), FileOffset);
- return reinterpret_cast<const T*>(View.GetData());
- }
-
-private:
- BasicFile& m_Base;
- uint8_t* m_Buffer;
- const uint64_t m_BufferSize;
- uint64_t m_Size;
- uint64_t m_BufferStart;
- uint64_t m_BufferEnd;
-};
-
-/** Adds a layer of buffered writing to a BasicFile
-
-This class is not intended for concurrent access, it is not thread safe.
-
-*/
-
-class BasicFileWriter
-{
-public:
- BasicFileWriter(BasicFile& Base, uint64_t BufferSize);
- ~BasicFileWriter();
-
- void Write(const void* Data, uint64_t Size, uint64_t FileOffset);
- void Flush();
-
-private:
- BasicFile& m_Base;
- uint8_t* m_Buffer;
- const uint64_t m_BufferSize;
- uint64_t m_BufferStart;
- uint64_t m_BufferEnd;
-};
-
-ZENCORE_API void basicfile_forcelink();
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupiterclient.h b/src/zenutil/include/zenutil/jupiter/jupiterclient.h
new file mode 100644
index 000000000..defe50edc
--- /dev/null
+++ b/src/zenutil/include/zenutil/jupiter/jupiterclient.h
@@ -0,0 +1,57 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenbase/refcount.h>
+#include <zencore/logging.h>
+#include <zenhttp/httpclient.h>
+
+#include <chrono>
+
+namespace zen {
+
+class IoBuffer;
+
+struct JupiterClientOptions
+{
+ std::string_view Name;
+ std::string_view ServiceUrl;
+ std::string_view DdcNamespace;
+ std::string_view BlobStoreNamespace;
+ std::string_view ComputeCluster;
+ std::chrono::milliseconds ConnectTimeout{5000};
+ std::chrono::milliseconds Timeout{};
+ bool AssumeHttp2 = false;
+ bool AllowResume = false;
+ uint8_t RetryCount = 0;
+};
+
+/**
+ * Jupiter upstream cache client
+ */
+class JupiterClient : public RefCounted
+{
+public:
+ JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider);
+ ~JupiterClient();
+
+ std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
+ std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
+ std::string_view ComputeCluster() const { return m_ComputeCluster; }
+ std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); }
+
+ LoggerRef Logger() { return m_Log; }
+ HttpClient& Client() { return m_HttpClient; }
+
+private:
+ LoggerRef m_Log;
+ const std::string m_DefaultDdcNamespace;
+ const std::string m_DefaultBlobStoreNamespace;
+ const std::string m_ComputeCluster;
+ std::function<HttpClientAccessToken()> m_TokenProvider;
+ HttpClient m_HttpClient;
+
+ friend class JupiterSession;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
new file mode 100644
index 000000000..6a80332f4
--- /dev/null
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -0,0 +1,152 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iohash.h>
+#include <zencore/logging.h>
+#include <zenhttp/httpclient.h>
+
+#include <set>
+
+namespace zen {
+
+class IoBuffer;
+
+struct JupiterResult
+{
+ IoBuffer Response;
+ uint64_t SentBytes{};
+ uint64_t ReceivedBytes{};
+ double ElapsedSeconds{};
+ int32_t ErrorCode{};
+ std::string Reason;
+ bool Success = false;
+};
+
+struct PutRefResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+ IoHash RawHash;
+};
+
+struct FinalizeRefResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+};
+
+struct JupiterExistsResult : JupiterResult
+{
+ std::set<IoHash> Needs;
+};
+
+struct GetObjectReferencesResult : JupiterResult
+{
+ std::set<IoHash> References;
+};
+
+struct PutBuildPartResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+ IoHash RawHash;
+};
+
+struct FinalizeBuildPartResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+};
+
+/**
+ * Context for performing Jupiter operations
+ *
+ * Maintains an HTTP connection so that subsequent operations don't need to go
+ * through the whole connection setup process
+ *
+ */
+class JupiterSession
+{
+public:
+ JupiterSession(LoggerRef InLog, HttpClient& InHttpClient);
+ ~JupiterSession();
+
+ JupiterResult Authenticate();
+
+ JupiterResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
+ JupiterResult GetBlob(std::string_view Namespace, const IoHash& Key);
+ JupiterResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
+ JupiterResult GetObject(std::string_view Namespace, const IoHash& Key);
+ JupiterResult GetInlineBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoHash& OutPayloadHash,
+ std::filesystem::path TempFolderPath = {});
+
+ PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
+ JupiterResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob);
+ JupiterResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object);
+
+ FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
+
+ JupiterResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key);
+
+ GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key);
+
+ JupiterResult BlobExists(std::string_view Namespace, const IoHash& Key);
+ JupiterResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key);
+ JupiterResult ObjectExists(std::string_view Namespace, const IoHash& Key);
+
+ JupiterExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ JupiterExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ JupiterExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+
+ std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
+
+ JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
+ JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ PutBuildPartResult PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload);
+ JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+ JupiterResult PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload);
+ JupiterResult GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath);
+ JupiterResult PutBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ const IoBuffer& Payload);
+ FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& RawHash);
+ JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+
+private:
+ inline LoggerRef Log() { return m_Log; }
+
+ JupiterResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key);
+
+ JupiterExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys);
+
+ LoggerRef m_Log;
+ HttpClient& m_HttpClient;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
index 3eb9021dd..758722156 100644
--- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h
+++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h
@@ -2,8 +2,8 @@
#pragma once
+#include <zencore/basicfile.h>
#include <zencore/memory/llm.h>
-#include <zenutil/basicfile.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <spdlog/details/log_msg.h>
diff --git a/src/zenutil/include/zenutil/packageformat.h b/src/zenutil/include/zenutil/packageformat.h
deleted file mode 100644
index c90b840da..000000000
--- a/src/zenutil/include/zenutil/packageformat.h
+++ /dev/null
@@ -1,164 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/compactbinarypackage.h>
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-
-#include <functional>
-#include <gsl/gsl-lite.hpp>
-
-namespace zen {
-
-class IoBuffer;
-class CbPackage;
-class CompositeBuffer;
-
-/** _____ _ _____ _
- / ____| | | __ \ | |
- | | | |__ | |__) |_ _ ___| | ____ _ __ _ ___
- | | | '_ \| ___/ _` |/ __| |/ / _` |/ _` |/ _ \
- | |____| |_) | | | (_| | (__| < (_| | (_| | __/
- \_____|_.__/|_| \__,_|\___|_|\_\__,_|\__, |\___|
- __/ |
- |___/
-
- Structures and code related to handling CbPackage transactions
-
- CbPackage instances are marshaled across the wire using a distinct message
- format. We don't use the CbPackage serialization format provided by the
- CbPackage implementation itself since that does not provide much flexibility
- in how the attachment payloads are transmitted. The scheme below separates
- metadata cleanly from payloads and this enables us to more efficiently
- transmit them either via sendfile/TransmitFile like mechanisms, or by
- reference/memory mapping in the local case.
- */
-
-struct CbPackageHeader
-{
- uint32_t HeaderMagic;
- uint32_t AttachmentCount; // TODO: should add ability to opt out of implicit root document?
- uint32_t Reserved1;
- uint32_t Reserved2;
-};
-
-static_assert(sizeof(CbPackageHeader) == 16);
-
-enum : uint32_t
-{
- kCbPkgMagic = 0xaa77aacc
-};
-
-struct CbAttachmentEntry
-{
- uint64_t PayloadSize; // Size of the associated payload data in the message
- uint32_t Flags; // See flags below
- IoHash AttachmentHash; // Content Id for the attachment
-
- enum
- {
- kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format
- kIsObject = (1u << 1), // Is compact binary object
- kIsError = (1u << 2), // Is error (compact binary formatted) object
- kIsLocalRef = (1u << 3), // Is "local reference"
- };
-};
-
-struct CbAttachmentReferenceHeader
-{
- uint64_t PayloadByteOffset = 0;
- uint64_t PayloadByteSize = ~0u;
- uint16_t AbsolutePathLength = 0;
-
- // This header will be followed by UTF8 encoded absolute path to backing file
-};
-
-static_assert(sizeof(CbAttachmentEntry) == 32);
-
-enum class FormatFlags
-{
- kDefault = 0,
- kAllowLocalReferences = (1u << 0),
- kDenyPartialLocalReferences = (1u << 1)
-};
-
-gsl_DEFINE_ENUM_BITMASK_OPERATORS(FormatFlags);
-
-enum class RpcAcceptOptions : uint16_t
-{
- kNone = 0,
- kAllowLocalReferences = (1u << 0),
- kAllowPartialLocalReferences = (1u << 1),
- kAllowPartialCacheChunks = (1u << 2)
-};
-
-gsl_DEFINE_ENUM_BITMASK_OPERATORS(RpcAcceptOptions);
-
-std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle = nullptr);
-CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle = nullptr);
-CbPackage ParsePackageMessage(
- IoBuffer Payload,
- std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer {
- return IoBuffer{Size};
- });
-bool IsPackageMessage(IoBuffer Payload);
-
-bool ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage);
-
-std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, void* TargetProcessHandle = nullptr);
-CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, void* TargetProcessHandle = nullptr);
-
-/** Streaming reader for compact binary packages
-
- The goal is to ultimately support zero-copy I/O, but for now there'll be some
- copying involved on some platforms at least.
-
- This approach to deserializing CbPackage data is more efficient than
- `ParsePackageMessage` since it does not require the entire message to
- be resident in a memory buffer
-
- */
-class CbPackageReader
-{
-public:
- CbPackageReader();
- ~CbPackageReader();
-
- void SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer);
-
- /** Process compact binary package data stream
-
- The data stream must be in the serialization format produced by FormatPackageMessage
-
- \return How many bytes must be fed to this function in the next call
- */
- uint64_t ProcessPackageHeaderData(const void* Data, uint64_t DataBytes);
-
- void Finalize();
- const std::vector<CbAttachment>& GetAttachments() { return m_Attachments; }
- CbObject GetRootObject() { return m_RootObject; }
- std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; }
-
-private:
- enum class State
- {
- kInitialState,
- kReadingHeader,
- kReadingAttachmentEntries,
- kReadingBuffers
- } m_CurrentState = State::kInitialState;
-
- std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> m_CreateBuffer;
- std::vector<IoBuffer> m_PayloadBuffers;
- std::vector<CbAttachmentEntry> m_AttachmentEntries;
- std::vector<CbAttachment> m_Attachments;
- CbObject m_RootObject;
- CbPackageHeader m_PackageHeader;
-
- IoBuffer MarshalLocalChunkReference(IoBuffer AttachmentBuffer);
-};
-
-void forcelink_packageformat();
-
-} // namespace zen
diff --git a/src/zenutil/jupiter/jupiterclient.cpp b/src/zenutil/jupiter/jupiterclient.cpp
new file mode 100644
index 000000000..5e5da3750
--- /dev/null
+++ b/src/zenutil/jupiter/jupiterclient.cpp
@@ -0,0 +1,29 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/jupiter/jupiterclient.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+JupiterClient::JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider)
+: m_Log(zen::logging::Get("jupiter"sv))
+, m_DefaultDdcNamespace(Options.DdcNamespace)
+, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
+, m_ComputeCluster(Options.ComputeCluster)
+, m_TokenProvider(std::move(TokenProvider))
+, m_HttpClient(Options.ServiceUrl,
+ HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout,
+ .Timeout = Options.Timeout,
+ .AccessTokenProvider = std::move(TokenProvider),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = Options.AllowResume,
+ .RetryCount = Options.RetryCount})
+{
+}
+
+JupiterClient::~JupiterClient()
+{
+}
+
+} // namespace zen
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
new file mode 100644
index 000000000..f706a7efc
--- /dev/null
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -0,0 +1,505 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/jupiter/jupitersession.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/fmtutils.h>
+#include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+//#include <cpr/cpr.h>
+//#include <fmt/format.h>
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+using namespace std::literals;
+
+namespace zen {
+
+namespace detail {
+ JupiterResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
+ {
+ if (Response.Error)
+ {
+ return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = Response.Error.value().ErrorCode,
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Success = false};
+ }
+ if (!Response.IsSuccess())
+ {
+ return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = static_cast<int32_t>(Response.StatusCode),
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Success = false};
+ }
+ return {.Response = Response.ResponsePayload,
+ .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = 0,
+ .Success = true};
+ }
+} // namespace detail
+
+JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient) : m_Log(InLog), m_HttpClient(InHttpClient)
+{
+}
+
+JupiterSession::~JupiterSession()
+{
+}
+
+JupiterResult
+JupiterSession::Authenticate()
+{
+ bool OK = m_HttpClient.Authenticate();
+ return {.Success = OK};
+}
+
+JupiterResult
+JupiterSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetRef");
+
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), {HttpClient::Accept(RefType)});
+
+ return detail::ConvertResponse(Response, "JupiterSession::GetRef"sv);
+}
+
+JupiterResult
+JupiterSession::GetBlob(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetBlob");
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kBinary)});
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");
+
+ HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ TempFolderPath,
+ {HttpClient::Accept(ZenContentType::kCompressedBinary)});
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::GetInlineBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoHash& OutPayloadHash,
+ std::filesystem::path TempFolderPath)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");
+
+ HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ TempFolderPath,
+ {{"Accept", "application/x-jupiter-inline"}});
+
+ JupiterResult Result = detail::ConvertResponse(Response);
+
+ if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end())
+ {
+ const std::string& PayloadHashHeader = It->second;
+ if (PayloadHashHeader.length() == IoHash::StringLength)
+ {
+ OutPayloadHash = IoHash::FromHexString(PayloadHashHeader);
+ }
+ }
+
+ return Result;
+}
+
+JupiterResult
+JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetObject");
+
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ return detail::ConvertResponse(Response);
+}
+
+PutRefResult
+JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutRef");
+
+ Ref.SetContentType(RefType);
+
+ IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
+
+ HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ Ref,
+ {{"X-Jupiter-IoHash", Hash.ToHexString()}});
+
+ PutRefResult Result = {detail::ConvertResponse(Response)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ Result.RawHash = Hash;
+ }
+ return Result;
+}
+
+FinalizeRefResult
+JupiterSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
+{
+ ZEN_TRACE_CPU("JupiterClient::FinalizeRef");
+
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()),
+ {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
+
+ FinalizeRefResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutBlob");
+
+ HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
+
+ Blob.SetContentType(ZenContentType::kCompressedBinary);
+ HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
+
+ HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ Payload,
+ ZenContentType::kCompressedBinary);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutObject");
+
+ Object.SetContentType(ZenContentType::kCbObject);
+ HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::RefExists");
+
+ HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()));
+
+ return detail::ConvertResponse(Response);
+}
+
+GetObjectReferencesResult
+JupiterSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");
+
+ HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ GetObjectReferencesResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ const CbObject ReferencesResponse = Response.AsObject();
+ for (auto& Item : ReferencesResponse["references"sv])
+ {
+ Result.References.insert(Item.AsHash());
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::BlobExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "blobs"sv, Key);
+}
+
+JupiterResult
+JupiterSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "compressed-blobs"sv, Key);
+}
+
+JupiterResult
+JupiterSession::ObjectExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "objects"sv, Key);
+}
+
+JupiterExistsResult
+JupiterSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "blobs"sv, Keys);
+}
+
+JupiterExistsResult
+JupiterSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys);
+}
+
+JupiterExistsResult
+JupiterSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "objects"sv, Keys);
+}
+
+std::vector<IoHash>
+JupiterSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
+{
+ // ExtendableStringBuilder<256> Uri;
+ // Uri << m_CacheClient->ServiceUrl();
+ // Uri << "/api/v1/s/" << Namespace;
+
+ ZEN_UNUSED(Namespace, BucketId, ChunkHashes);
+
+ return {};
+}
+
+JupiterResult
+JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
+
+ HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString()));
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterExistsResult
+JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys)
+{
+ ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
+
+ ExtendableStringBuilder<256> Body;
+ Body << "[";
+ for (const auto& Key : Keys)
+ {
+ Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\"";
+ }
+ Body << "]";
+ IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size());
+ Payload.SetContentType(ZenContentType::kJSON);
+
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), Payload, {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ JupiterExistsResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ const CbObject ExistsResponse = Response.AsObject();
+ for (auto& Item : ExistsResponse["needs"sv])
+ {
+ Result.Needs.insert(Item.AsHash());
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload);
+ return detail::ConvertResponse(Response, "JupiterSession::PutBuild"sv);
+}
+
+JupiterResult
+JupiterSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+{
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::GetBuild"sv);
+}
+
+JupiterResult
+JupiterSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+{
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId));
+ return detail::ConvertResponse(Response, "JupiterSession::FinalizeBuild"sv);
+}
+
+PutBuildPartResult
+JupiterSession::PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+
+ IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
+
+ HttpClient::Response Response =
+ m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName),
+ Payload,
+ {{"X-Jupiter-IoHash", Hash.ToHexString()}});
+
+ PutBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::PutBuildPart"sv)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ Result.RawHash = Hash;
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
+{
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::GetBuildPart"sv);
+}
+
+JupiterResult
+JupiterSession::PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload)
+{
+ HttpClient::Response Response = m_HttpClient.Upload(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ Payload,
+ ContentType);
+ return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv);
+}
+
+JupiterResult
+JupiterSession::GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath)
+{
+ HttpClient::Response Response = m_HttpClient.Download(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ TempFolderPath);
+ return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
+}
+
+JupiterResult
+JupiterSession::PutBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response = m_HttpClient.Put(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ Payload);
+ return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv);
+}
+
+FinalizeBuildPartResult
+JupiterSession::FinalizeBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& RawHash)
+{
+ HttpClient::Response Response = m_HttpClient.Post(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()),
+ HttpClient::Accept(ZenContentType::kCbObject));
+
+ FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::FinalizeBuildPart"sv)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
+{
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv);
+}
+
+} // namespace zen
diff --git a/src/zenutil/packageformat.cpp b/src/zenutil/packageformat.cpp
deleted file mode 100644
index 579e0d13c..000000000
--- a/src/zenutil/packageformat.cpp
+++ /dev/null
@@ -1,894 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/packageformat.h>
-
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
-#include <zencore/compositebuffer.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-#include <zencore/logging.h>
-#include <zencore/scopeguard.h>
-#include <zencore/stream.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
-#include <zencore/trace.h>
-
-#include <span>
-#include <vector>
-
-#if ZEN_PLATFORM_WINDOWS
-# include <zencore/windows.h>
-#endif
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <tsl/robin_map.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-namespace zen {
-
-const std::string_view HandlePrefix(":?#:");
-
-std::vector<IoBuffer>
-FormatPackageMessage(const CbPackage& Data, void* TargetProcessHandle)
-{
- return FormatPackageMessage(Data, FormatFlags::kDefault, TargetProcessHandle);
-}
-CompositeBuffer
-FormatPackageMessageBuffer(const CbPackage& Data, void* TargetProcessHandle)
-{
- return FormatPackageMessageBuffer(Data, FormatFlags::kDefault, TargetProcessHandle);
-}
-
-CompositeBuffer
-FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle)
-{
- return CompositeBuffer(FormatPackageMessage(Data, Flags, TargetProcessHandle));
-}
-
-static void
-MarshalLocal(CbAttachmentEntry*& AttachmentInfo,
- const std::string& Path8,
- CbAttachmentReferenceHeader& LocalRef,
- const IoHash& AttachmentHash,
- bool IsCompressed,
- std::vector<IoBuffer>& ResponseBuffers)
-{
- IoBuffer RefBuffer(sizeof(CbAttachmentReferenceHeader) + Path8.size());
-
- CbAttachmentReferenceHeader* RefHdr = RefBuffer.MutableData<CbAttachmentReferenceHeader>();
- *RefHdr++ = LocalRef;
- memcpy(RefHdr, Path8.data(), Path8.size());
-
- *AttachmentInfo++ = {.PayloadSize = RefBuffer.GetSize(),
- .Flags = (IsCompressed ? uint32_t(CbAttachmentEntry::kIsCompressed) : 0u) | CbAttachmentEntry::kIsLocalRef,
- .AttachmentHash = AttachmentHash};
-
- ResponseBuffers.emplace_back(std::move(RefBuffer));
-};
-
-static bool
-IsLocalRef(tsl::robin_map<void*, std::string>& FileNameMap,
- std::vector<void*>& DuplicatedHandles,
- const CompositeBuffer& AttachmentBinary,
- bool DenyPartialLocalReferences,
- void* TargetProcessHandle,
- CbAttachmentReferenceHeader& LocalRef,
- std::string& Path8)
-{
- const SharedBuffer& Segment = AttachmentBinary.GetSegments().front();
- IoBufferFileReference Ref;
- const IoBuffer& SegmentBuffer = Segment.AsIoBuffer();
-
- if (!SegmentBuffer.GetFileReference(Ref))
- {
- return false;
- }
-
- if (DenyPartialLocalReferences && !SegmentBuffer.IsWholeFile())
- {
- return false;
- }
-
- if (auto It = FileNameMap.find(Ref.FileHandle); It != FileNameMap.end())
- {
- Path8 = It->second;
- }
- else
- {
- bool UseFilePath = true;
-#if ZEN_PLATFORM_WINDOWS
- if (TargetProcessHandle != nullptr)
- {
- HANDLE TargetHandle = INVALID_HANDLE_VALUE;
- BOOL OK = ::DuplicateHandle(GetCurrentProcess(),
- Ref.FileHandle,
- (HANDLE)TargetProcessHandle,
- &TargetHandle,
- FILE_GENERIC_READ,
- FALSE,
- 0);
- if (OK)
- {
- DuplicatedHandles.push_back((void*)TargetHandle);
- Path8 = fmt::format("{}{}", HandlePrefix, reinterpret_cast<uint64_t>(TargetHandle));
- UseFilePath = false;
- }
- }
-#else // ZEN_PLATFORM_WINDOWS
- ZEN_UNUSED(TargetProcessHandle);
- ZEN_UNUSED(DuplicatedHandles);
- // Not supported on Linux/Mac. Could potentially use pidfd_getfd() but that requires a fairly new Linux kernel/includes and to
- // deal with access rights etc.
-#endif // ZEN_PLATFORM_WINDOWS
- if (UseFilePath)
- {
- ExtendablePathBuilder<256> LocalRefFile;
- std::error_code Ec;
- std::filesystem::path FilePath = PathFromHandle(Ref.FileHandle, Ec);
- if (Ec)
- {
- ZEN_WARN("Failed to get path for file handle {} in IsLocalRef check, reason '{}'", Ref.FileHandle, Ec.message());
- return false;
- }
- LocalRefFile.Append(std::filesystem::absolute(FilePath));
- Path8 = LocalRefFile.ToUtf8();
- }
- FileNameMap.insert_or_assign(Ref.FileHandle, Path8);
- }
-
- LocalRef.AbsolutePathLength = gsl::narrow<uint16_t>(Path8.size());
- LocalRef.PayloadByteOffset = Ref.FileChunkOffset;
- LocalRef.PayloadByteSize = Ref.FileChunkSize;
-
- return true;
-};
-
-std::vector<IoBuffer>
-FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, void* TargetProcessHandle)
-{
- ZEN_TRACE_CPU("FormatPackageMessage");
-
- std::vector<void*> DuplicatedHandles;
-#if ZEN_PLATFORM_WINDOWS
- auto _ = MakeGuard([&DuplicatedHandles, &TargetProcessHandle]() {
- if (TargetProcessHandle == nullptr)
- {
- return;
- }
-
- for (void* DuplicatedHandle : DuplicatedHandles)
- {
- HANDLE ClosingHandle;
- if (::DuplicateHandle((HANDLE)TargetProcessHandle,
- (HANDLE)DuplicatedHandle,
- GetCurrentProcess(),
- &ClosingHandle,
- 0,
- FALSE,
- DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS) == TRUE)
- {
- ::CloseHandle(ClosingHandle);
- }
- }
- });
-#endif // ZEN_PLATFORM_WINDOWS
-
- const std::span<const CbAttachment>& Attachments = Data.GetAttachments();
- std::vector<IoBuffer> ResponseBuffers;
-
- ResponseBuffers.reserve(2 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each
- // attachment is likely to consist of several buffers
-
- IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbPackageHeader) + sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)};
- MutableMemoryView HeaderView = AttachmentMetadataBuffer.GetMutableView();
- // Fixed size header
-
- CbPackageHeader* Hdr = (CbPackageHeader*)HeaderView.GetData();
- *Hdr = {.HeaderMagic = kCbPkgMagic, .AttachmentCount = gsl::narrow<uint32_t>(Attachments.size())};
- HeaderView.MidInline(sizeof(CbPackageHeader));
-
- // Attachment metadata array
- CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(HeaderView.GetData());
- ResponseBuffers.emplace_back(std::move(AttachmentMetadataBuffer)); // Attachment metadata
-
- // Root object
-
- IoBuffer RootIoBuffer = Data.GetObject().GetBuffer().AsIoBuffer();
- ZEN_ASSERT(RootIoBuffer.GetSize() > 0);
- *AttachmentInfo++ = {.PayloadSize = RootIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Data.GetObjectHash()};
- ResponseBuffers.emplace_back(std::move(RootIoBuffer)); // Root object
-
- // Attachment payloads
- tsl::robin_map<void*, std::string> FileNameMap;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- if (Attachment.IsNull())
- {
- ZEN_NOT_IMPLEMENTED("Null attachments are not supported");
- }
- else if (const CompressedBuffer& AttachmentBuffer = Attachment.AsCompressedBinary())
- {
- const CompositeBuffer& Compressed = AttachmentBuffer.GetCompressed();
- IoHash AttachmentHash = Attachment.GetHash();
-
- // If the data is either not backed by a file, or there are multiple
- // fragments then we cannot marshal it by local reference. We might
- // want/need to extend this in the future to allow multiple chunk
- // segments to be marshaled at once
-
- bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1);
- bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences);
- CbAttachmentReferenceHeader LocalRef;
- std::string Path8;
-
- if (MarshalByLocalRef)
- {
- MarshalByLocalRef = IsLocalRef(FileNameMap,
- DuplicatedHandles,
- Compressed,
- DenyPartialLocalReferences,
- TargetProcessHandle,
- LocalRef,
- Path8);
- }
-
- if (MarshalByLocalRef)
- {
- const bool IsCompressed = true;
- bool IsHandle = false;
-#if ZEN_PLATFORM_WINDOWS
- IsHandle = Path8.starts_with(HandlePrefix);
-#endif
- MarshalLocal(AttachmentInfo, Path8, LocalRef, AttachmentHash, IsCompressed, ResponseBuffers);
- ZEN_DEBUG("Marshalled '{}' as file {} of {} bytes", Path8, IsHandle ? "handle" : "path", Compressed.GetSize());
- }
- else
- {
- *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(),
- .Flags = CbAttachmentEntry::kIsCompressed,
- .AttachmentHash = AttachmentHash};
-
- std::span<const SharedBuffer> Segments = Compressed.GetSegments();
- ResponseBuffers.reserve(ResponseBuffers.size() + Segments.size() - 1);
- for (const SharedBuffer& Segment : Segments)
- {
- ZEN_ASSERT(Segment.GetSize() > 0);
- ResponseBuffers.emplace_back(Segment.AsIoBuffer());
- }
- }
- }
- else if (CbObject AttachmentObject = Attachment.AsObject())
- {
- IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer();
- ZEN_ASSERT(ObjIoBuffer.GetSize() > 0);
- ResponseBuffers.emplace_back(std::move(ObjIoBuffer));
-
- *AttachmentInfo++ = {.PayloadSize = ObjIoBuffer.Size(),
- .Flags = CbAttachmentEntry::kIsObject,
- .AttachmentHash = Attachment.GetHash()};
- }
- else if (const CompositeBuffer& AttachmentBinary = Attachment.AsCompositeBinary())
- {
- IoHash AttachmentHash = Attachment.GetHash();
- bool MarshalByLocalRef =
- EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1);
- bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences);
-
- CbAttachmentReferenceHeader LocalRef;
- std::string Path8;
-
- if (MarshalByLocalRef)
- {
- MarshalByLocalRef = IsLocalRef(FileNameMap,
- DuplicatedHandles,
- AttachmentBinary,
- DenyPartialLocalReferences,
- TargetProcessHandle,
- LocalRef,
- Path8);
- }
-
- if (MarshalByLocalRef)
- {
- const bool IsCompressed = false;
- bool IsHandle = false;
-#if ZEN_PLATFORM_WINDOWS
- IsHandle = Path8.starts_with(HandlePrefix);
-#endif
- MarshalLocal(AttachmentInfo, Path8, LocalRef, AttachmentHash, IsCompressed, ResponseBuffers);
- ZEN_DEBUG("Marshalled '{}' as file {} of {} bytes", Path8, IsHandle ? "handle" : "path", AttachmentBinary.GetSize());
- }
- else
- {
- *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()};
-
- std::span<const SharedBuffer> Segments = AttachmentBinary.GetSegments();
- ResponseBuffers.reserve(ResponseBuffers.size() + Segments.size() - 1);
- for (const SharedBuffer& Segment : Segments)
- {
- ZEN_ASSERT(Segment.GetSize() > 0);
- ResponseBuffers.emplace_back(Segment.AsIoBuffer());
- }
- }
- }
- else
- {
- ZEN_NOT_IMPLEMENTED("Unknown attachment kind");
- }
- }
- FileNameMap.clear();
-#if ZEN_PLATFORM_WINDOWS
- DuplicatedHandles.clear();
-#endif // ZEN_PLATFORM_WINDOWS
-
- return ResponseBuffers;
-}
-
-bool
-IsPackageMessage(IoBuffer Payload)
-{
- if (Payload.GetSize() < sizeof(CbPackageHeader))
- {
- return false;
- }
-
- BinaryReader Reader(Payload);
- const CbPackageHeader* Hdr = reinterpret_cast<const CbPackageHeader*>(Reader.GetView(sizeof(CbPackageHeader)).GetData());
- if (Hdr->HeaderMagic != kCbPkgMagic)
- {
- return false;
- }
-
- return true;
-}
-
-CbPackage
-ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint64_t)> CreateBuffer)
-{
- ZEN_TRACE_CPU("ParsePackageMessage");
-
- if (Payload.GetSize() < sizeof(CbPackageHeader))
- {
- throw std::invalid_argument(fmt::format("invalid CbPackage, missing complete header (size {})", Payload.GetSize()));
- }
-
- BinaryReader Reader(Payload);
-
- const CbPackageHeader* Hdr = reinterpret_cast<const CbPackageHeader*>(Reader.GetView(sizeof(CbPackageHeader)).GetData());
- if (Hdr->HeaderMagic != kCbPkgMagic)
- {
- throw std::invalid_argument(
- fmt::format("invalid CbPackage header magic, expected {0:x}, got {0:x}", static_cast<uint32_t>(kCbPkgMagic), Hdr->HeaderMagic));
- }
- Reader.Skip(sizeof(CbPackageHeader));
-
- const uint32_t ChunkCount = Hdr->AttachmentCount + 1;
-
- if (Reader.Remaining() < sizeof(CbAttachmentEntry) * ChunkCount)
- {
- throw std::invalid_argument(fmt::format("invalid CbPackage, missing attachment entry data (need {} bytes, have {} bytes)",
- sizeof(CbAttachmentEntry) * ChunkCount,
- Reader.Remaining()));
- }
- const CbAttachmentEntry* AttachmentEntries =
- reinterpret_cast<const CbAttachmentEntry*>(Reader.GetView(sizeof(CbAttachmentEntry) * ChunkCount).GetData());
- Reader.Skip(sizeof(CbAttachmentEntry) * ChunkCount);
-
- CbPackage Package;
-
- std::vector<CbAttachment> Attachments;
- Attachments.reserve(ChunkCount); // Guessing here...
-
- tsl::robin_map<std::string, IoBuffer> PartialFileBuffers;
-
- std::vector<std::pair<uint32_t, std::string>> MalformedAttachments;
-
- for (uint32_t i = 0; i < ChunkCount; ++i)
- {
- const CbAttachmentEntry& Entry = AttachmentEntries[i];
- const uint64_t AttachmentSize = Entry.PayloadSize;
-
- if (Reader.Remaining() < AttachmentSize)
- {
- throw std::invalid_argument(fmt::format("invalid CbPackage, missing attachment data (need {} bytes, have {} bytes)",
- AttachmentSize,
- Reader.Remaining()));
- }
- const IoBuffer AttachmentBuffer(Payload, Reader.CurrentOffset(), AttachmentSize);
- Reader.Skip(AttachmentSize);
-
- if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
- {
- // Marshal local reference - a "pointer" to the chunk backing file
-
- ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader));
-
- const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>();
- const char* PathPointer = reinterpret_cast<const char*>(AttachRefHdr + 1);
-
- ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength));
- std::string_view PathView(PathPointer, AttachRefHdr->AbsolutePathLength);
-
- IoBuffer FullFileBuffer;
-
- std::filesystem::path Path(Utf8ToWide(PathView));
- if (auto It = PartialFileBuffers.find(Path.string()); It != PartialFileBuffers.end())
- {
- FullFileBuffer = It->second;
- }
- else
- {
- if (PathView.starts_with(HandlePrefix))
- {
-#if ZEN_PLATFORM_WINDOWS
- std::string_view HandleString(PathView.substr(HandlePrefix.length()));
- std::optional<uint64_t> HandleNumber(ParseInt<uint64_t>(HandleString));
- if (HandleNumber.has_value())
- {
- HANDLE FileHandle = HANDLE(HandleNumber.value());
- ULARGE_INTEGER liFileSize;
- liFileSize.LowPart = ::GetFileSize(FileHandle, &liFileSize.HighPart);
- if (liFileSize.LowPart != INVALID_FILE_SIZE)
- {
- FullFileBuffer =
- IoBuffer(IoBuffer::File, (void*)FileHandle, 0, uint64_t(liFileSize.QuadPart), /*IsWholeFile*/ true);
- PartialFileBuffers.insert_or_assign(Path.string(), FullFileBuffer);
- }
- }
-#else // ZEN_PLATFORM_WINDOWS
- // Not supported on Linux/Mac. Could potentially use pidfd_getfd() but that requires a fairly new Linux kernel/includes
- // and to deal with acceess rights etc.
- ZEN_ASSERT(false);
-#endif // ZEN_PLATFORM_WINDOWS
- }
- else
- {
- FullFileBuffer = PartialFileBuffers.insert_or_assign(Path.string(), IoBufferBuilder::MakeFromFile(Path)).first->second;
- }
- }
-
- if (FullFileBuffer)
- {
- IoBuffer ChunkReference = AttachRefHdr->PayloadByteOffset == 0 && AttachRefHdr->PayloadByteSize == FullFileBuffer.GetSize()
- ? FullFileBuffer
- : IoBuffer(FullFileBuffer, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize);
-
- CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkReference)));
- if (CompBuf)
- {
- Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash));
- }
- else
- {
- MalformedAttachments.push_back(std::make_pair(i,
- fmt::format("Invalid format in '{}' (offset {}, size {}) for {}",
- Path,
- AttachRefHdr->PayloadByteOffset,
- AttachRefHdr->PayloadByteSize,
- Entry.AttachmentHash)));
- }
- }
- else
- {
- MalformedAttachments.push_back(std::make_pair(i,
- fmt::format("Unable to resolve chunk at '{}' (offset {}, size {}) for {}",
- Path,
- AttachRefHdr->PayloadByteOffset,
- AttachRefHdr->PayloadByteSize,
- Entry.AttachmentHash)));
- }
- }
- else if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
- {
- if (Entry.Flags & CbAttachmentEntry::kIsObject)
- {
- if (i == 0)
- {
- CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer)));
- if (CompBuf)
- {
- Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf)));
- }
- else
- {
- // First payload is always a compact binary object
- MalformedAttachments.push_back(
- std::make_pair(i,
- fmt::format("Invalid format, expected compressed buffer for CbObject (size {}) for {}",
- AttachmentBuffer.GetSize(),
- Entry.AttachmentHash)));
- }
- }
- else
- {
- MalformedAttachments.push_back(std::make_pair(
- i,
- fmt::format("Invalid format, compressed object attachments are not currently supported (size {}) for {}",
- AttachmentBuffer.GetSize(),
- Entry.AttachmentHash)));
- }
- }
- else
- {
- CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(IoBuffer(AttachmentBuffer)));
- if (CompBuf)
- {
- Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash));
- }
- else
- {
- MalformedAttachments.push_back(
- std::make_pair(i,
- fmt::format("Invalid format, expected compressed buffer for attachment (size {}) for {}",
- AttachmentBuffer.GetSize(),
- Entry.AttachmentHash)));
- }
- }
- }
- else /* not compressed */
- {
- if (Entry.Flags & CbAttachmentEntry::kIsObject)
- {
- if (i == 0)
- {
- Package.SetObject(LoadCompactBinaryObject(AttachmentBuffer));
- }
- else
- {
- MalformedAttachments.push_back(
- std::make_pair(i,
- fmt::format("Invalid format, object attachments are not currently supported (size {}) for {}",
- AttachmentBuffer.GetSize(),
- Entry.AttachmentHash)));
- }
- }
- else if (AttachmentSize > 0)
- {
- // Make a copy of the buffer so the attachments don't reference the entire payload
- IoBuffer AttachmentBufferCopy = CreateBuffer(Entry.AttachmentHash, AttachmentSize);
- ZEN_ASSERT(AttachmentBufferCopy);
- ZEN_ASSERT(AttachmentBufferCopy.Size() == AttachmentSize);
- AttachmentBufferCopy.GetMutableView().CopyFrom(AttachmentBuffer.GetView());
-
- Attachments.emplace_back(SharedBuffer{AttachmentBufferCopy});
- }
- else
- {
- MalformedAttachments.push_back(
- std::make_pair(i, fmt::format("Invalid format, attachment of size zero detected for {}", Entry.AttachmentHash)));
- }
- }
- }
- PartialFileBuffers.clear();
-
- Package.AddAttachments(Attachments);
-
- using namespace std::literals;
-
- if (!MalformedAttachments.empty())
- {
- StringBuilder<1024> SB;
- SB << (uint64_t)MalformedAttachments.size() << " malformed attachments in package message:\n";
- for (const auto& It : MalformedAttachments)
- {
- SB << " #"sv << It.first << ": " << It.second << "\n";
- }
- ZEN_WARN("{}", SB.ToView());
- throw std::invalid_argument(SB.ToString());
- }
-
- return Package;
-}
-
-bool
-ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage)
-{
- if (IsPackageMessage(Response))
- {
- OutPackage = ParsePackageMessage(Response);
- return true;
- }
- return OutPackage.TryLoad(Response);
-}
-
-CbPackageReader::CbPackageReader() : m_CreateBuffer([](const IoHash&, uint64_t Size) -> IoBuffer { return IoBuffer{Size}; })
-{
-}
-
-CbPackageReader::~CbPackageReader()
-{
-}
-
-void
-CbPackageReader::SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer)
-{
- m_CreateBuffer = CreateBuffer;
-}
-
-uint64_t
-CbPackageReader::ProcessPackageHeaderData(const void* Data, uint64_t DataBytes)
-{
- ZEN_ASSERT(m_CurrentState != State::kReadingBuffers);
-
- switch (m_CurrentState)
- {
- case State::kInitialState:
- ZEN_ASSERT(Data == nullptr);
- m_CurrentState = State::kReadingHeader;
- return sizeof m_PackageHeader;
-
- case State::kReadingHeader:
- ZEN_ASSERT(DataBytes == sizeof m_PackageHeader);
- memcpy(&m_PackageHeader, Data, sizeof m_PackageHeader);
- ZEN_ASSERT(m_PackageHeader.HeaderMagic == kCbPkgMagic);
- m_CurrentState = State::kReadingAttachmentEntries;
- m_AttachmentEntries.resize(m_PackageHeader.AttachmentCount + 1);
- return (m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry);
-
- case State::kReadingAttachmentEntries:
- ZEN_ASSERT(DataBytes == ((m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry)));
- memcpy(m_AttachmentEntries.data(), Data, DataBytes);
-
- for (CbAttachmentEntry& Entry : m_AttachmentEntries)
- {
- // This preallocates memory for payloads but note that for the local references
- // the caller will need to handle the payload differently (i.e it's a
- // CbAttachmentReferenceHeader not the actual payload)
-
- m_PayloadBuffers.emplace_back(IoBuffer{Entry.PayloadSize});
- }
-
- m_CurrentState = State::kReadingBuffers;
- return 0;
-
- default:
- ZEN_ASSERT(false);
- return 0;
- }
-}
-
-IoBuffer
-CbPackageReader::MarshalLocalChunkReference(IoBuffer AttachmentBuffer)
-{
- // Marshal local reference - a "pointer" to the chunk backing file
-
- ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader));
-
- const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>();
- const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1);
-
- ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength));
-
- std::u8string_view PathView{PathPointer, AttachRefHdr->AbsolutePathLength};
-
- std::filesystem::path Path{PathView};
-
- IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize);
-
- if (!ChunkReference)
- {
- // Unable to open chunk reference
-
- throw std::runtime_error(fmt::format("unable to resolve local reference to '{}' (offset {}, size {})",
- PathToUtf8(Path),
- AttachRefHdr->PayloadByteOffset,
- AttachRefHdr->PayloadByteSize));
- }
-
- return ChunkReference;
-};
-
-void
-CbPackageReader::Finalize()
-{
- if (m_AttachmentEntries.empty())
- {
- return;
- }
-
- m_Attachments.reserve(m_AttachmentEntries.size() - 1);
-
- int CurrentAttachmentIndex = 0;
- for (CbAttachmentEntry& Entry : m_AttachmentEntries)
- {
- IoBuffer AttachmentBuffer = m_PayloadBuffers[CurrentAttachmentIndex];
-
- if (CurrentAttachmentIndex == 0)
- {
- // Root object
- if (Entry.Flags & CbAttachmentEntry::kIsObject)
- {
- if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
- {
- m_RootObject = LoadCompactBinaryObject(MarshalLocalChunkReference(AttachmentBuffer));
- }
- else if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
- {
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer), RawHash, RawSize);
- if (RawHash == Entry.AttachmentHash)
- {
- m_RootObject = LoadCompactBinaryObject(Compressed);
- }
- }
- else
- {
- m_RootObject = LoadCompactBinaryObject(std::move(AttachmentBuffer));
- }
- }
- else
- {
- throw std::runtime_error("missing or invalid root object");
- }
- }
- else if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
- {
- IoBuffer ChunkReference = MarshalLocalChunkReference(AttachmentBuffer);
-
- if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
- {
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference), RawHash, RawSize);
- if (RawHash == Entry.AttachmentHash)
- {
- m_Attachments.emplace_back(CbAttachment(Compressed, Entry.AttachmentHash));
- }
- }
- else
- {
- CompressedBuffer Compressed =
- CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None);
- m_Attachments.emplace_back(CbAttachment(std::move(Compressed), Compressed.DecodeRawHash()));
- }
- }
-
- ++CurrentAttachmentIndex;
- }
-}
-
-/**
- ______________________ _____________________________
- \__ ___/\_ _____// _____/\__ ___/ _____/
- | | | __)_ \_____ \ | | \_____ \
- | | | \/ \ | | / \
- |____| /_______ /_______ / |____| /_______ /
- \/ \/ \/
- */
-
-#if ZEN_WITH_TESTS
-
-TEST_CASE("CbPackage.Serialization")
-{
- // Make a test package
-
- CbAttachment Attach1{SharedBuffer::MakeView(MakeMemoryView("abcd"))};
- CbAttachment Attach2{SharedBuffer::MakeView(MakeMemoryView("efgh"))};
-
- CbObjectWriter Cbo;
- Cbo.AddAttachment("abcd", Attach1);
- Cbo.AddAttachment("efgh", Attach2);
-
- CbPackage Pkg;
- Pkg.AddAttachment(Attach1);
- Pkg.AddAttachment(Attach2);
- Pkg.SetObject(Cbo.Save());
-
- SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg).Flatten();
- const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData());
- uint64_t RemainingBytes = Buffer.GetSize();
-
- auto ConsumeBytes = [&](uint64_t ByteCount) {
- ZEN_ASSERT(ByteCount <= RemainingBytes);
- void* ReturnPtr = (void*)CursorPtr;
- CursorPtr += ByteCount;
- RemainingBytes -= ByteCount;
- return ReturnPtr;
- };
-
- auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) {
- ZEN_ASSERT(ByteCount <= RemainingBytes);
- memcpy(TargetBuffer, CursorPtr, ByteCount);
- CursorPtr += ByteCount;
- RemainingBytes -= ByteCount;
- };
-
- CbPackageReader Reader;
- uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0);
- uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead);
- NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes);
- auto Buffers = Reader.GetPayloadBuffers();
-
- for (auto& PayloadBuffer : Buffers)
- {
- CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize());
- }
-
- Reader.Finalize();
-}
-
-TEST_CASE("CbPackage.EmptyObject")
-{
- CbPackage Pkg;
- Pkg.SetObject({});
- std::vector<IoBuffer> Result = FormatPackageMessage(Pkg, nullptr);
-}
-
-TEST_CASE("CbPackage.LocalRef")
-{
- ScopedTemporaryDirectory TempDir;
-
- auto Path1 = TempDir.Path() / "abcd";
- auto Path2 = TempDir.Path() / "efgh";
-
- {
- IoBuffer Buffer1 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("abcd"));
- IoBuffer Buffer2 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("efgh"));
-
- WriteFile(Path1, Buffer1);
- WriteFile(Path2, Buffer2);
- }
-
- // Make a test package
-
- IoBuffer FileBuffer1 = IoBufferBuilder::MakeFromFile(Path1);
- IoBuffer FileBuffer2 = IoBufferBuilder::MakeFromFile(Path2);
-
- CbAttachment Attach1{SharedBuffer(FileBuffer1)};
- CbAttachment Attach2{SharedBuffer(FileBuffer2)};
-
- CbObjectWriter Cbo;
- Cbo.AddAttachment("abcd", Attach1);
- Cbo.AddAttachment("efgh", Attach2);
-
- CbPackage Pkg;
- Pkg.AddAttachment(Attach1);
- Pkg.AddAttachment(Attach2);
- Pkg.SetObject(Cbo.Save());
-
- SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg, FormatFlags::kAllowLocalReferences).Flatten();
- const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData());
- uint64_t RemainingBytes = Buffer.GetSize();
-
- auto ConsumeBytes = [&](uint64_t ByteCount) {
- ZEN_ASSERT(ByteCount <= RemainingBytes);
- void* ReturnPtr = (void*)CursorPtr;
- CursorPtr += ByteCount;
- RemainingBytes -= ByteCount;
- return ReturnPtr;
- };
-
- auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) {
- ZEN_ASSERT(ByteCount <= RemainingBytes);
- memcpy(TargetBuffer, CursorPtr, ByteCount);
- CursorPtr += ByteCount;
- RemainingBytes -= ByteCount;
- };
-
- CbPackageReader Reader;
- uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0);
- uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead);
- NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes);
- auto Buffers = Reader.GetPayloadBuffers();
-
- for (auto& PayloadBuffer : Buffers)
- {
- CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize());
- }
-
- Reader.Finalize();
-}
-
-void
-forcelink_packageformat()
-{
-}
-
-#endif
-
-} // namespace zen
diff --git a/src/zenutil/xmake.lua b/src/zenutil/xmake.lua
index 744dff737..3d95651f2 100644
--- a/src/zenutil/xmake.lua
+++ b/src/zenutil/xmake.lua
@@ -6,5 +6,5 @@ target('zenutil')
add_headerfiles("**.h")
add_files("**.cpp")
add_includedirs("include", {public=true})
- add_deps("zencore")
+ add_deps("zencore", "zenhttp")
add_packages("vcpkg::robin-map", "vcpkg::spdlog")
diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp
index 214737425..b36f11741 100644
--- a/src/zenutil/zenserverprocess.cpp
+++ b/src/zenutil/zenserverprocess.cpp
@@ -2,6 +2,7 @@
#include "zenutil/zenserverprocess.h"
+#include <zencore/basicfile.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/except.h>
@@ -12,7 +13,6 @@
#include <zencore/string.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
-#include <zenutil/basicfile.h>
#include <atomic>
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index 97ebeb01d..c54144549 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -4,20 +4,16 @@
#if ZEN_WITH_TESTS
-# include <zenutil/basicfile.h>
# include <zenutil/cache/cacherequests.h>
# include <zenutil/cache/rpcrecording.h>
-# include <zenutil/packageformat.h>
namespace zen {
void
zenutil_forcelinktests()
{
- basicfile_forcelink();
cachepolicy_forcelink();
cache::rpcrecord_forcelink();
- forcelink_packageformat();
cacherequests_forcelink();
}