aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-11-18 11:35:13 +0100
committerGitHub <[email protected]>2022-11-18 02:35:13 -0800
commit55225621f018904abf7e212320bb784dc64f8ac3 (patch)
tree3fb962e9e0553448f9d42612bb078ff072308e1c
parentmove BasicFile to zenutil to remove zenstore dependency from zen command (#190) (diff)
downloadzen-55225621f018904abf7e212320bb784dc64f8ac3.tar.xz
zen-55225621f018904abf7e212320bb784dc64f8ac3.zip
Add `import-project` and `export-project` (#183)
* Add `import-project` and `export-project` command line parsing
-rw-r--r--zen/cmds/exportproject.cpp376
-rw-r--r--zen/cmds/exportproject.h79
-rw-r--r--zen/cmds/importproject.cpp246
-rw-r--r--zen/cmds/importproject.h22
-rw-r--r--zen/zen.cpp21
-rw-r--r--zencore/compactbinary.cpp26
-rw-r--r--zencore/compactbinarypackage.cpp138
-rw-r--r--zencore/compositebuffer.cpp94
-rw-r--r--zencore/compress.cpp44
-rw-r--r--zencore/include/zencore/compactbinarypackage.h30
-rw-r--r--zencore/include/zencore/compositebuffer.h9
-rw-r--r--zencore/include/zencore/compress.h9
-rw-r--r--zencore/include/zencore/stream.h2
-rw-r--r--zencore/stream.cpp36
-rw-r--r--zenhttp/httpshared.cpp91
-rw-r--r--zenserver-test/zenserver-test.cpp21
-rw-r--r--zenserver/cache/structuredcache.cpp15
-rw-r--r--zenserver/projectstore.cpp837
-rw-r--r--zenserver/projectstore.h20
-rw-r--r--zenserver/upstream/hordecompute.cpp2
-rw-r--r--zenserver/upstream/upstreamcache.cpp11
-rw-r--r--zenstore/cas.cpp6
-rw-r--r--zenstore/cas.h24
-rw-r--r--zenstore/cidstore.cpp8
-rw-r--r--zenstore/filecas.cpp15
-rw-r--r--zenstore/filecas.h4
-rw-r--r--zenstore/include/zenstore/cidstore.h7
-rw-r--r--zenutil/basicfile.cpp2
-rw-r--r--zenutil/cache/cacherequests.cpp17
29 files changed, 1807 insertions, 405 deletions
diff --git a/zen/cmds/exportproject.cpp b/zen/cmds/exportproject.cpp
new file mode 100644
index 000000000..5d8c7d536
--- /dev/null
+++ b/zen/cmds/exportproject.cpp
@@ -0,0 +1,376 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "exportproject.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/stream.h>
+#include <zencore/uid.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/httpcommon.h>
+#include <zenhttp/httpshared.h>
+#include <zenutil/basicfile.h>
+
+#include <memory>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace {
+
+void
+EnsureDirectoryExists(const std::filesystem::path& Path)
+{
+ while (!std::filesystem::is_directory(Path))
+ {
+ if (Path.has_parent_path())
+ {
+ EnsureDirectoryExists(Path.parent_path());
+ }
+ std::filesystem::create_directory(Path);
+ }
+}
+
+} // namespace
+
+ExportProjectCommand::ExportProjectCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
+ m_Options.add_option("", "t", "target", "Target path", cxxopts::value(m_TargetPath), "<targetpath>");
+ m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>");
+ m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>");
+}
+
+ExportProjectCommand::~ExportProjectCommand() = default;
+
+bool
+ExportProjectCommand::IsSuccess(const cpr::Response& Response, const std::string_view Operation)
+{
+ if (!zen::IsHttpSuccessCode(Response.status_code))
+ {
+ if (Response.status_code)
+ {
+ ZEN_ERROR("{} failed: {}: {} ({})", Operation, Response.status_code, Response.reason, Response.text);
+ }
+ else
+ {
+ ZEN_ERROR("{} failed: {}", Operation, Response.error.message);
+ }
+
+ return false;
+ }
+ return true;
+}
+
+int
+ExportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ using namespace std::literals;
+
+ ZEN_UNUSED(GlobalOptions);
+
+ m_Options.parse_positional({"target", "project", "oplog"});
+ m_Options.parse(argc, argv);
+
+ if (m_ProjectName.empty())
+ {
+ ZEN_ERROR("Project name must be given");
+ return 1;
+ }
+
+ if (m_TargetPath.empty())
+ {
+ ZEN_ERROR("Target path must be given");
+ return 1;
+ }
+
+ if (!std::filesystem::exists(m_TargetPath))
+ {
+ zen::CreateDirectories(m_TargetPath);
+ }
+ else if (!std::filesystem::is_directory(m_TargetPath))
+ {
+ ZEN_ERROR("Target path '{}' is not a directory", m_TargetPath);
+ return 1;
+ }
+
+ const std::string UrlBase = fmt::format("{}/prj", m_HostName);
+
+ cpr::Session Session;
+ {
+ ZEN_INFO("Requesting project '{}' from '{}'", m_ProjectName, m_HostName);
+
+ std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName);
+ Session.SetUrl({ProjectRequest});
+ cpr::Response Response = Session.Get();
+ if (!IsSuccess(Response, ProjectRequest))
+ {
+ return 1;
+ }
+ zen::IoBuffer Payload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ zen::BasicFile ProjectStore;
+ ProjectStore.Open(GetProjectPath(m_TargetPath, m_ProjectName), zen::BasicFile::Mode::kTruncate);
+ ProjectStore.Write(Payload.GetView(), 0);
+
+ if (m_OplogNames.empty())
+ {
+ zen::CbObject Params = LoadCompactBinaryObject(Payload);
+ zen ::CbArrayView Oplogs = Params["oplogs"sv].AsArrayView();
+ for (auto& OplogEntry : Oplogs)
+ {
+ std::string_view OpLog = OplogEntry.AsObjectView()["id"sv].AsString();
+ m_OplogNames.push_back(std::string(OpLog));
+ }
+ }
+ }
+
+ std::unordered_set<zen::IoHash, zen::IoHash::Hasher> UniqueChunks;
+ std::vector<zen::CbAttachment> AllAttachments;
+ std::vector<zen::CbPackage> OplogResponses;
+ for (const std::string& OplogName : m_OplogNames)
+ {
+ ZEN_INFO("Requesting oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_HostName, m_TargetPath);
+
+ std::string GetOplogArchiveRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName);
+ Session.SetUrl({GetOplogArchiveRequest});
+ Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}});
+ cpr::Response Response = Session.Get();
+ if (!IsSuccess(Response, GetOplogArchiveRequest))
+ {
+ return 1;
+ }
+ zen::IoBuffer CompressedPayload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ zen::IoBuffer Payload = zen::CompressedBuffer::FromCompressed(zen::SharedBuffer(CompressedPayload)).Decompress().AsIoBuffer();
+
+ OplogResponses.emplace_back(zen::ParsePackageMessage(Payload));
+ zen::CbPackage& ResponsePackage = OplogResponses.back();
+ zen::CbObject Result = ResponsePackage.GetObject();
+
+ zen::IoHash Checksum = Result["checksum"sv].AsHash();
+ zen ::CbArrayView Entries = Result["entries"sv].AsArrayView();
+
+ ZEN_INFO("Exporting {} ops for oplog '{}/{}' with checksum '{}' to '{}'",
+ Entries.Num(),
+ m_ProjectName,
+ OplogName,
+ Checksum,
+ m_TargetPath);
+ {
+ zen::BasicFile OpStore;
+ OpStore.Open(GetOplogPath(m_TargetPath, OplogName), zen::BasicFile::Mode::kTruncate);
+ OplogHeader Header = {.OpCount = Entries.Num(), .Checksum = Checksum};
+ OpStore.Write(&Header, sizeof(OplogHeader), 0);
+ std::vector<OplogEntry> OpEntries;
+ OpEntries.resize(Entries.Num());
+ const uint64_t DataOffset = sizeof(OplogHeader) + OpEntries.size() * sizeof(OplogEntry);
+ uint64_t BulkOffset = DataOffset;
+
+ zen::IoHashStream Hasher;
+
+ for (uint64_t OpIndex = 0; auto& OpEntry : Entries)
+ {
+ zen::BinaryWriter Writer;
+ OpEntry.CopyTo(Writer);
+ zen::MemoryView OpView = Writer.GetView();
+ Hasher.Append(OpView);
+
+ OpEntries[OpIndex].Offset = BulkOffset;
+ OpEntries[OpIndex].OpLength = gsl::narrow<uint32_t>(OpView.GetSize());
+ OpStore.Write(OpView, BulkOffset);
+ BulkOffset += OpView.GetSize();
+ OpIndex++;
+ }
+ zen::IoHash CalculatedChecksum = Hasher.GetHash();
+ if (CalculatedChecksum != Checksum)
+ {
+ ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum);
+ return 1;
+ }
+ OpStore.Write(OpEntries.data(), OpEntries.size() * sizeof(OplogEntry), sizeof(OplogHeader));
+ }
+
+ std::span<const zen::CbAttachment> Attachments = ResponsePackage.GetAttachments();
+ AllAttachments.reserve(AllAttachments.size() + Attachments.size());
+ AllAttachments.reserve(UniqueChunks.size() + Attachments.size());
+ for (const zen::CbAttachment& Attachment : Attachments)
+ {
+ if (UniqueChunks.insert(Attachment.GetHash()).second)
+ {
+ AllAttachments.push_back(Attachment);
+ }
+ }
+
+ ZEN_INFO("Exported {} ops referencing {} chunks for {}", Entries.Num(), Attachments.size(), OplogName);
+ }
+
+ size_t ChunkCount = AllAttachments.size();
+ zen::BasicFile ChunkStoreIndex;
+ ChunkStoreIndex.Open(GetChunksIndexPath(m_TargetPath), zen::BasicFile::Mode::kTruncate);
+ ChunksHeader Header = {.ChunkCount = ChunkCount};
+ ChunkStoreIndex.Write(&Header, sizeof(ChunksHeader), 0);
+ std::vector<ChunkEntry> ChunkEntries;
+ ChunkEntries.resize(ChunkCount);
+ uint64_t ChunkOffset = 0;
+
+ zen::WorkerThreadPool WorkerPool(std::thread::hardware_concurrency());
+ std::atomic_int64_t JobCount = 0;
+ std::vector<size_t> BlockChunkIndexes;
+ const size_t BlockSize = 1ull << Header.BlockSizeShift;
+ uint32_t CurrentBlockIndex = 0;
+
+ auto WriteBlockAsync = [](const std::string& TargetPath,
+ size_t WriteBlockOffset,
+ uint32_t BlockIndex,
+ const std::vector<size_t>& BlockChunkIndexes,
+ const std::vector<ChunkEntry>& ChunkEntries,
+ const std::vector<zen::CbAttachment>& Attachments,
+ zen::WorkerThreadPool& WorkerPool,
+ std::atomic_int64_t& JobCount) {
+ JobCount.fetch_add(1);
+ WorkerPool.ScheduleWork([&TargetPath, WriteBlockOffset, BlockIndex, BlockChunkIndexes, &ChunkEntries, &Attachments, &JobCount]() {
+ zen::BasicFile ChunkBlock;
+ ChunkBlock.Open(GetChunksPath(TargetPath, BlockIndex), zen::BasicFile::Mode::kTruncate);
+ for (size_t ChunkIndex : BlockChunkIndexes)
+ {
+ const ChunkEntry& Chunk = ChunkEntries[ChunkIndex];
+ zen::CompositeBuffer AttachmentBody = Attachments[ChunkIndex].AsCompressedBinary().GetCompressed();
+ size_t AttachmentBulkOffset = Chunk.Offset - WriteBlockOffset;
+ for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments())
+ {
+ size_t SegmentSize = Segment.GetSize();
+ ChunkBlock.Write(Segment.GetData(), Segment.GetSize(), AttachmentBulkOffset);
+ AttachmentBulkOffset += SegmentSize;
+ }
+ }
+ JobCount.fetch_add(-1);
+ });
+ };
+
+ ZEN_INFO("Exporting {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath);
+ for (size_t ChunkIndex = 0; const zen::CbAttachment& Attachment : AllAttachments)
+ {
+ ChunkEntry& Chunk = ChunkEntries[ChunkIndex];
+ Chunk.ChunkHash = Attachment.GetHash();
+ zen::CompositeBuffer AttachmentBody = Attachment.AsCompressedBinary().GetCompressed();
+ Chunk.Length = AttachmentBody.GetSize();
+
+ if (Chunk.Length < 1 * 1024 * 1024) // Use reasonable length for file
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((ChunkOffset + Chunk.Length) / BlockSize);
+ if (BlockIndex != CurrentBlockIndex)
+ {
+ size_t WriteBlockOffset = CurrentBlockIndex * BlockSize;
+ WriteBlockAsync(m_TargetPath,
+ WriteBlockOffset,
+ CurrentBlockIndex,
+ BlockChunkIndexes,
+ ChunkEntries,
+ AllAttachments,
+ WorkerPool,
+ JobCount);
+
+ ChunkOffset = BlockIndex * BlockSize;
+ CurrentBlockIndex = BlockIndex;
+ BlockChunkIndexes.clear();
+ }
+
+ Chunk.Offset = ChunkOffset;
+ ChunkOffset = Chunk.Offset + Chunk.Length;
+ BlockChunkIndexes.push_back(ChunkIndex);
+ }
+ else
+ {
+ Chunk.Offset = ~0ull;
+ JobCount.fetch_add(1);
+ WorkerPool.ScheduleWork([this, AttachmentBody, &Chunk, &JobCount]() {
+ std::filesystem::path Path = GetLargeChunkPath(m_TargetPath, Chunk.ChunkHash);
+ EnsureDirectoryExists(Path.parent_path());
+ zen::BasicFile ChunkFile;
+ ChunkFile.Open(Path, zen::BasicFile::Mode::kTruncate);
+ uint64_t Offset = 0;
+ for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments())
+ {
+ size_t SegmentSize = Segment.GetSize();
+ ChunkFile.Write(Segment.GetData(), Segment.GetSize(), Offset);
+ Offset += SegmentSize;
+ }
+ JobCount.fetch_add(-1);
+ });
+ }
+ ChunkIndex++;
+ }
+ if (!BlockChunkIndexes.empty())
+ {
+ size_t WriteBlockOffset = CurrentBlockIndex * BlockSize;
+ WriteBlockAsync(m_TargetPath,
+ WriteBlockOffset,
+ CurrentBlockIndex,
+ BlockChunkIndexes,
+ ChunkEntries,
+ AllAttachments,
+ WorkerPool,
+ JobCount);
+ }
+
+ while (JobCount.load())
+ {
+ zen::Sleep(1);
+ }
+
+ ChunkStoreIndex.Write(ChunkEntries.data(), ChunkEntries.size() * sizeof(ChunkEntry), sizeof(ChunksHeader));
+
+ ZEN_INFO("Exported {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath);
+
+ return 0;
+}
+
+std::filesystem::path
+ExportProjectCommand::GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog)
+{
+ return RootPath / (Oplog + ".ops");
+}
+
+std::filesystem::path
+ExportProjectCommand::GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash)
+{
+ zen::ExtendablePathBuilder<128> ShardedPath;
+ ShardedPath.Append(RootPath.c_str());
+ zen::ExtendableStringBuilder<64> HashString;
+ OpHash.ToHexString(HashString);
+ const char* str = HashString.c_str();
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str, str + 3);
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 3, str + 5);
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 5, str + 40);
+
+ return ShardedPath.ToPath();
+}
+
+std::filesystem::path
+ExportProjectCommand::GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName)
+{
+ return RootPath / (std::string(ProjectName) + ".zcb");
+}
+
+std::filesystem::path
+ExportProjectCommand::GetChunksIndexPath(const std::filesystem::path RootPath)
+{
+ return RootPath / "chunks.idx";
+}
+
+std::filesystem::path
+ExportProjectCommand::GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex)
+{
+ return RootPath / fmt::format("chunks{}.bin", BlockIndex);
+}
diff --git a/zen/cmds/exportproject.h b/zen/cmds/exportproject.h
new file mode 100644
index 000000000..bf41e2d7b
--- /dev/null
+++ b/zen/cmds/exportproject.h
@@ -0,0 +1,79 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "../zen.h"
+
+#include <zencore/filesystem.h>
+#include <zencore/iohash.h>
+
+#include <functional>
+
+namespace zen {
+class CbObjectView;
+}
+namespace cpr {
+class Response;
+}
+
+class ExportProjectCommand : public ZenCmdBase
+{
+public:
+ ExportProjectCommand();
+ ~ExportProjectCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options* Options() override { return &m_Options; }
+
+ struct OplogHeader
+ {
+ enum uint32
+ {
+ kMagic = 0x7816'B013
+ };
+ uint32_t Magic = kMagic;
+ uint32_t HeaderSize = sizeof(OplogHeader);
+ uint64_t OpCount = 0;
+ zen::IoHash Checksum = zen::IoHash::Zero;
+ };
+ struct OplogEntry
+ {
+ uint64_t Offset;
+ uint32_t OpLength;
+ };
+
+ struct ChunksHeader
+ {
+ enum uint32
+ {
+ kMagic = 0x574C'B016
+ };
+ uint32_t Magic = kMagic;
+ uint64_t ChunkCount = 0;
+ uint8_t BlockSizeShift = 31u;
+ uint8_t Reserved1 = 0;
+ uint16_t Reserved2 = 0;
+ uint32_t Reserved3 = 0;
+ };
+ struct ChunkEntry
+ {
+ zen::IoHash ChunkHash;
+ uint64_t Offset;
+ uint64_t Length;
+ };
+
+ static std::filesystem::path GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog);
+ static std::filesystem::path GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash);
+ static std::filesystem::path GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName);
+ static std::filesystem::path GetChunksIndexPath(const std::filesystem::path RootPath);
+ static std::filesystem::path GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex);
+
+ static bool IsSuccess(const cpr::Response& Response, const std::string_view Operation);
+
+private:
+ cxxopts::Options m_Options{"export-project", "Export one or more project oplogs to disk"};
+ std::string m_HostName;
+ std::string m_TargetPath;
+ std::string m_ProjectName;
+ std::vector<std::string> m_OplogNames;
+};
diff --git a/zen/cmds/importproject.cpp b/zen/cmds/importproject.cpp
new file mode 100644
index 000000000..d6c48cdeb
--- /dev/null
+++ b/zen/cmds/importproject.cpp
@@ -0,0 +1,246 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "importproject.h"
+#include "exportproject.h"
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/stream.h>
+#include <zenhttp/httpcommon.h>
+#include <zenhttp/httpshared.h>
+#include <zenutil/basicfile.h>
+
+#include <memory>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+ImportProjectCommand::ImportProjectCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
+ m_Options.add_option("", "s", "source", "Source path", cxxopts::value(m_SourcePath), "<sourcepath>");
+ m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>");
+ m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>");
+}
+
+ImportProjectCommand::~ImportProjectCommand() = default;
+
+int
+ImportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ using namespace std::literals;
+
+ ZEN_UNUSED(GlobalOptions);
+
+ m_Options.parse_positional({"source", "project", "oplog"});
+ m_Options.parse(argc, argv);
+
+ if (m_ProjectName.empty())
+ {
+ ZEN_ERROR("Project name must be given");
+ return 1;
+ }
+
+ if (m_SourcePath.empty())
+ {
+ ZEN_ERROR("Source path must be given");
+ return 1;
+ }
+
+ if (!std::filesystem::is_directory(m_SourcePath))
+ {
+ ZEN_ERROR("Source path '{}' is not a directory", m_SourcePath);
+ return 1;
+ }
+
+ const std::string UrlBase = fmt::format("{}/prj", m_HostName);
+
+ cpr::Session Session;
+
+ {
+ ZEN_INFO("Requesting project '{}' from '{}'", m_ProjectName, m_HostName);
+
+ zen::BasicFile ProjectStore;
+ ProjectStore.Open(ExportProjectCommand::GetProjectPath(m_SourcePath, m_ProjectName), zen::BasicFile::Mode::kRead);
+ zen::IoBuffer Payload = ProjectStore.ReadAll();
+
+ std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName);
+ Session.SetUrl({ProjectRequest});
+ Session.SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()});
+ cpr::Response Response = Session.Post();
+ if (!ExportProjectCommand::IsSuccess(Response, ProjectRequest))
+ {
+ return 1;
+ }
+ if (m_OplogNames.empty())
+ {
+ zen::DirectoryContent Content;
+ zen::GetDirectoryContent(m_SourcePath, zen::DirectoryContent::IncludeFilesFlag, Content);
+ for (auto& File : Content.Files)
+ {
+ if (File.extension() == ".ops")
+ {
+ m_OplogNames.push_back(File.stem().string());
+ }
+ }
+ }
+ }
+
+ zen::IoBuffer ChunkStoreIndex = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksIndexPath(m_SourcePath));
+ zen::IoBuffer ChunkStoreHeaderMem(ChunkStoreIndex, 0, sizeof(ExportProjectCommand::ChunksHeader));
+ const ExportProjectCommand::ChunksHeader* ChunksHeader =
+ reinterpret_cast<const ExportProjectCommand::ChunksHeader*>(ChunkStoreHeaderMem.GetView().GetData());
+
+ if (ChunksHeader->Magic != ExportProjectCommand::ChunksHeader::kMagic)
+ {
+ ZEN_ERROR("Invalid chunk index header");
+ return 1;
+ }
+ const size_t BlockSize = 1ull << ChunksHeader->BlockSizeShift;
+
+ zen::IoBuffer ChunkStoreEntriesMem(ChunkStoreIndex,
+ sizeof(ExportProjectCommand::ChunksHeader),
+ sizeof(ExportProjectCommand::ChunkEntry) * ChunksHeader->ChunkCount);
+ const ExportProjectCommand::ChunkEntry* ChunkEntries =
+ reinterpret_cast<const ExportProjectCommand::ChunkEntry*>(ChunkStoreEntriesMem.GetView().GetData());
+
+ for (const std::string& OplogName : m_OplogNames)
+ {
+ ZEN_INFO("Importing oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_SourcePath, m_HostName);
+ std::string GetOplogRequest = fmt::format("{}/{}/oplog/{}", UrlBase, m_ProjectName, OplogName);
+ Session.SetUrl(GetOplogRequest);
+ cpr::Response OplogResponse = Session.Get();
+ if (OplogResponse.status_code == static_cast<long>(zen::HttpResponseCode::NotFound))
+ {
+ OplogResponse = Session.Post();
+ if (!ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest))
+ {
+ return 1;
+ }
+ ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest);
+ OplogResponse = Session.Get();
+ if (!ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest))
+ {
+ return 1;
+ }
+ }
+
+ zen::BasicFile OpStore;
+ OpStore.Open(ExportProjectCommand::GetOplogPath(m_SourcePath, OplogName), zen::BasicFile::Mode::kRead);
+ ExportProjectCommand::OplogHeader OplogHeader;
+ OpStore.Read(&OplogHeader, sizeof(ExportProjectCommand::OplogHeader), 0);
+ if (OplogHeader.Magic != ExportProjectCommand::OplogHeader::kMagic ||
+ OplogHeader.HeaderSize != sizeof(ExportProjectCommand::OplogHeader))
+ {
+ ZEN_ERROR("Invalid oplog header");
+ return 1;
+ }
+ zen::IoHash Checksum = OplogHeader.Checksum;
+ std::vector<ExportProjectCommand::OplogEntry> OpEntries;
+ OpEntries.resize(OplogHeader.OpCount);
+ OpStore.Read(OpEntries.data(),
+ sizeof(ExportProjectCommand::OplogEntry) * OplogHeader.OpCount,
+ sizeof(ExportProjectCommand::OplogHeader));
+
+ ZEN_INFO("Constructing oplog with {} ops with checksum '{}' for '{}/{}'",
+ OpEntries.size(),
+ OplogHeader.Checksum,
+ m_ProjectName,
+ OplogName);
+ zen::IoHashStream Hasher;
+ zen::CbObjectWriter Request;
+ Request.BeginArray("entries"sv);
+ for (auto& OpEntry : OpEntries)
+ {
+ zen::IoBuffer CoreData(OpEntry.OpLength);
+ OpStore.Read(CoreData.MutableData(), OpEntry.OpLength, OpEntry.Offset);
+ Hasher.Append(CoreData.GetView());
+ zen::SharedBuffer SharedCoreData(CoreData);
+
+ zen::CbObject Op(SharedCoreData);
+ Request << Op;
+ }
+ Request.EndArray();
+ zen::IoHash CalculatedChecksum = Hasher.GetHash();
+ if (CalculatedChecksum != Checksum)
+ {
+ ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum);
+ return 1;
+ }
+ Request.AddHash("checksum"sv, Checksum);
+
+ zen::CbPackage RequestPackage;
+ RequestPackage.SetObject(Request.Save());
+
+ ZEN_INFO("Assembling {} attachments", ChunksHeader->ChunkCount);
+ std::vector<zen::CbAttachment> Attachments;
+ Attachments.reserve(ChunksHeader->ChunkCount);
+ uint32_t ReadBlockIndex = 0;
+ zen::IoBuffer BlockStore = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksPath(m_SourcePath, ReadBlockIndex));
+ for (uint64_t ChunkIndex = 0; ChunkIndex < ChunksHeader->ChunkCount; ++ChunkIndex)
+ {
+ const ExportProjectCommand::ChunkEntry& ChunkEntry = ChunkEntries[ChunkIndex];
+ if (ChunkEntry.Offset == ~0ull)
+ {
+ zen::IoBuffer ChunkBuffer =
+ zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetLargeChunkPath(m_SourcePath, ChunkEntry.ChunkHash));
+ zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer));
+ ZEN_ASSERT(Chunk);
+ Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash));
+ }
+ else
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>(ChunkEntry.Offset / BlockSize);
+ if (BlockIndex != ReadBlockIndex)
+ {
+ ReadBlockIndex = BlockIndex;
+ BlockStore = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksPath(m_SourcePath, ReadBlockIndex));
+ ZEN_ASSERT(BlockStore);
+ }
+ size_t BlockOffset = BlockIndex * BlockSize;
+ size_t AttachmentBulkOffset = ChunkEntry.Offset - BlockOffset;
+ zen::IoBuffer ChunkBuffer(BlockStore, AttachmentBulkOffset, ChunkEntry.Length);
+ zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer));
+ Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash));
+ }
+ }
+ RequestPackage.AddAttachments(Attachments);
+
+ ZEN_INFO("Sending oplog with {} ops and {} attachments for '{}/{}' to {}",
+ OpEntries.size(),
+ ChunksHeader->ChunkCount,
+ m_ProjectName,
+ OplogName,
+ m_HostName);
+ std::vector<zen::IoBuffer> RequestPayload = zen::FormatPackageMessage(RequestPackage, zen::FormatFlags::kAllowLocalReferences);
+ std::vector<zen::SharedBuffer> Parts;
+ Parts.reserve(RequestPayload.size());
+ for (const auto& I : RequestPayload)
+ {
+ Parts.emplace_back(zen::SharedBuffer(I));
+ }
+ zen::CompositeBuffer Cmp(std::move(Parts));
+ zen::CompressedBuffer CompressedRequest = zen::CompressedBuffer::Compress(Cmp);
+
+ std::string AppendOplogRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName);
+ Session.SetUrl(AppendOplogRequest);
+
+ zen::IoBuffer TmpBuffer = CompressedRequest.GetCompressed().Flatten().AsIoBuffer();
+ Session.SetBody(cpr::Body{(const char*)TmpBuffer.GetData(), TmpBuffer.GetSize()});
+ cpr::Response Response = Session.Post();
+ if (!ExportProjectCommand::IsSuccess(Response, AppendOplogRequest))
+ {
+ return 1;
+ }
+
+ ZEN_INFO("Imported {} ops and {} chunks", OpEntries.size(), ChunksHeader->ChunkCount);
+ }
+ return 0;
+}
diff --git a/zen/cmds/importproject.h b/zen/cmds/importproject.h
new file mode 100644
index 000000000..326ccfe92
--- /dev/null
+++ b/zen/cmds/importproject.h
@@ -0,0 +1,22 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "../zen.h"
+
+class ImportProjectCommand : public ZenCmdBase
+{
+public:
+ ImportProjectCommand();
+ ~ImportProjectCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options* Options() override { return &m_Options; }
+
+private:
+ cxxopts::Options m_Options{"import-project", "Import project oplogs from disk"};
+ std::string m_HostName;
+ std::string m_SourcePath;
+ std::string m_ProjectName;
+ std::vector<std::string> m_OplogNames;
+};
diff --git a/zen/zen.cpp b/zen/zen.cpp
index 5abe4b04e..a736d740e 100644
--- a/zen/zen.cpp
+++ b/zen/zen.cpp
@@ -9,7 +9,9 @@
#include "cmds/cache.h"
#include "cmds/copy.h"
#include "cmds/dedup.h"
+#include "cmds/exportproject.h"
#include "cmds/hash.h"
+#include "cmds/importproject.h"
#include "cmds/print.h"
#include "cmds/run.h"
#include "cmds/status.h"
@@ -103,6 +105,7 @@ main(int argc, char** argv)
#endif
zen::logging::InitializeLogging();
+ MaximizeOpenFileCount();
//////////////////////////////////////////////////////////////////////////
@@ -115,13 +118,15 @@ main(int argc, char** argv)
#if ZEN_WITH_EXEC_SERVICES
RunCommand RunCmd;
#endif
- StatusCommand StatusCmd;
- TopCommand TopCmd;
- PrintCommand PrintCmd;
- PrintPackageCommand PrintPkgCmd;
- PsCommand PsCmd;
- UpCommand UpCmd;
- DownCommand DownCmd;
+ StatusCommand StatusCmd;
+ TopCommand TopCmd;
+ PrintCommand PrintCmd;
+ PrintPackageCommand PrintPkgCmd;
+ PsCommand PsCmd;
+ UpCommand UpCmd;
+ DownCommand DownCmd;
+ ExportProjectCommand ExportProjectCmd;
+ ImportProjectCommand ImportProjectCmd;
#if ZEN_WITH_TESTS
RunTestsCommand RunTestsCmd;
@@ -138,7 +143,9 @@ main(int argc, char** argv)
{"copy", &CopyCmd, "Copy file(s)"},
{"dedup", &DedupCmd, "Dedup files"},
{"drop", &DropCmd, "Drop cache bucket(s)"},
+ {"export-project", &ExportProjectCmd, "Export project store oplog"},
{"hash", &HashCmd, "Compute file hashes"},
+ {"import-project", &ImportProjectCmd, "Import project store oplog"},
{"print", &PrintCmd, "Print compact binary object"},
{"printpackage", &PrintPkgCmd, "Print compact binary package"},
#if ZEN_WITH_EXEC_SERVICES
diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp
index 0e4a46fa1..0db9f02ea 100644
--- a/zencore/compactbinary.cpp
+++ b/zencore/compactbinary.cpp
@@ -839,10 +839,10 @@ CbFieldView::CopyTo(MutableMemoryView Buffer) const
void
CbFieldView::CopyTo(BinaryWriter& Ar) const
{
- const MemoryView Source = GetViewNoType();
+ const MemoryView SourceView = GetViewNoType();
CbFieldType SerializedType = CbFieldTypeOps::GetSerializedType(Type);
- Ar.Write(&SerializedType, sizeof(SerializedType));
- Ar.Write(Source.GetData(), Source.GetSize());
+ const MemoryView TypeView(reinterpret_cast<const uint8_t*>(&SerializedType), sizeof(SerializedType));
+ Ar.Write({TypeView, SourceView});
}
MemoryView
@@ -948,10 +948,10 @@ CbArrayView::CopyTo(MutableMemoryView Buffer) const
void
CbArrayView::CopyTo(BinaryWriter& Ar) const
{
- const MemoryView Source = GetPayloadView();
- CbFieldType SerializedType = CbFieldTypeOps::GetType(GetType());
- Ar.Write(&SerializedType, sizeof(SerializedType));
- Ar.Write(Source.GetData(), Source.GetSize());
+ const MemoryView SourceView = GetPayloadView();
+ CbFieldType SerializedType = CbFieldTypeOps::GetSerializedType(GetType());
+ const MemoryView TypeView(reinterpret_cast<const uint8_t*>(&SerializedType), sizeof(SerializedType));
+ Ar.Write({TypeView, SourceView});
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -988,7 +988,7 @@ CbObjectView::VisitFields(ICbVisitor&)
CbFieldView
CbObjectView::FindView(const std::string_view Name) const
{
- for (const CbFieldView Field : *this)
+ for (const CbFieldView& Field : *this)
{
if (Name == Field.GetName())
{
@@ -1001,7 +1001,7 @@ CbObjectView::FindView(const std::string_view Name) const
CbFieldView
CbObjectView::FindViewIgnoreCase(const std::string_view Name) const
{
- for (const CbFieldView Field : *this)
+ for (const CbFieldView& Field : *this)
{
if (Name == Field.GetName())
{
@@ -1061,10 +1061,10 @@ CbObjectView::CopyTo(MutableMemoryView Buffer) const
void
CbObjectView::CopyTo(BinaryWriter& Ar) const
{
- const MemoryView Source = GetPayloadView();
- CbFieldType SerializedType = CbFieldTypeOps::GetType(GetType());
- Ar.Write(&SerializedType, sizeof(SerializedType));
- Ar.Write(Source.GetData(), Source.GetSize());
+ const MemoryView SourceView = GetPayloadView();
+ CbFieldType SerializedType = CbFieldTypeOps::GetSerializedType(GetType());
+ const MemoryView TypeView(reinterpret_cast<const uint8_t*>(&SerializedType), sizeof(SerializedType));
+ Ar.Write({TypeView, SourceView});
}
//////////////////////////////////////////////////////////////////////////
diff --git a/zencore/compactbinarypackage.cpp b/zencore/compactbinarypackage.cpp
index a25466bed..19675b9cf 100644
--- a/zencore/compactbinarypackage.cpp
+++ b/zencore/compactbinarypackage.cpp
@@ -11,7 +11,7 @@ namespace zen {
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-CbAttachment::CbAttachment(const CompressedBuffer& InValue) : CbAttachment(InValue.MakeOwned())
+CbAttachment::CbAttachment(const CompressedBuffer& InValue, const IoHash& Hash) : CbAttachment(InValue.MakeOwned(), Hash)
{
}
@@ -19,36 +19,40 @@ CbAttachment::CbAttachment(const SharedBuffer& InValue) : CbAttachment(Composite
{
}
-CbAttachment::CbAttachment(const SharedBuffer& InValue, [[maybe_unused]] const IoHash& InHash)
-: CbAttachment(CompositeBuffer(InValue), InHash)
+CbAttachment::CbAttachment(const SharedBuffer& InValue, const IoHash& InHash) : CbAttachment(CompositeBuffer(InValue), InHash)
{
}
-CbAttachment::CbAttachment(const CompositeBuffer& InValue) : Value{std::in_place_type<BinaryValue>, InValue}
+CbAttachment::CbAttachment(const CompositeBuffer& InValue)
+: Hash(InValue.IsNull() ? IoHash::Zero : IoHash::HashBuffer(InValue))
+, Value(InValue)
{
- if (std::get<BinaryValue>(Value).Buffer.IsNull())
+ if (std::get<CompositeBuffer>(Value).IsNull())
{
Value.emplace<std::nullptr_t>();
}
}
-CbAttachment::CbAttachment(CompositeBuffer&& InValue) : Value{std::in_place_type<BinaryValue>, InValue}
+CbAttachment::CbAttachment(CompositeBuffer&& InValue)
+: Hash(InValue.IsNull() ? IoHash::Zero : IoHash::HashBuffer(InValue))
+, Value(std::move(InValue))
+
{
- if (std::get<BinaryValue>(Value).Buffer.IsNull())
+ if (std::get<CompositeBuffer>(Value).IsNull())
{
Value.emplace<std::nullptr_t>();
}
}
-CbAttachment::CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash) : Value{std::in_place_type<BinaryValue>, InValue, Hash}
+CbAttachment::CbAttachment(CompositeBuffer&& InValue, const IoHash& InHash) : Hash(InHash), Value(InValue)
{
- if (std::get<BinaryValue>(Value).Buffer.IsNull())
+ if (std::get<CompositeBuffer>(Value).IsNull())
{
Value.emplace<std::nullptr_t>();
}
}
-CbAttachment::CbAttachment(CompressedBuffer&& InValue) : Value(std::in_place_type<CompressedBuffer>, InValue)
+CbAttachment::CbAttachment(CompressedBuffer&& InValue, const IoHash& InHash) : Hash(InHash), Value(InValue)
{
if (std::get<CompressedBuffer>(Value).IsNull())
{
@@ -61,11 +65,13 @@ CbAttachment::CbAttachment(const CbObject& InValue, const IoHash* const InHash)
auto SetValue = [&](const CbObject& ValueToSet) {
if (InHash)
{
- Value.emplace<CbObjectValue>(ValueToSet, *InHash);
+ Value.emplace<CbObject>(ValueToSet);
+ Hash = *InHash;
}
else
{
- Value.emplace<CbObjectValue>(ValueToSet, ValueToSet.GetHash());
+ Value.emplace<CbObject>(ValueToSet);
+ Hash = ValueToSet.GetHash();
}
};
@@ -94,7 +100,8 @@ CbAttachment::TryLoad(CbFieldIterator& Fields)
if (const CbObjectView ObjectView = Fields.AsObjectView(); !Fields.HasError())
{
// Is a null object or object not prefixed with a precomputed hash value
- Value.emplace<CbObjectValue>(CbObject(ObjectView, Fields.GetOuterBuffer()), ObjectView.GetHash());
+ Value.emplace<CbObject>(CbObject(ObjectView, Fields.GetOuterBuffer()));
+ Hash = ObjectView.GetHash();
++Fields;
}
else if (const IoHash ObjectAttachmentHash = Fields.AsObjectAttachment(); !Fields.HasError())
@@ -106,7 +113,8 @@ CbAttachment::TryLoad(CbFieldIterator& Fields)
{
return false;
}
- Value.emplace<CbObjectValue>(CbObject(InnerObjectView, Fields.GetOuterBuffer()), ObjectAttachmentHash);
+ Value.emplace<CbObject>(CbObject(InnerObjectView, Fields.GetOuterBuffer()));
+ Hash = ObjectAttachmentHash;
++Fields;
}
else if (const IoHash BinaryAttachmentHash = Fields.AsBinaryAttachment(); !Fields.HasError())
@@ -118,7 +126,8 @@ CbAttachment::TryLoad(CbFieldIterator& Fields)
{
return false;
}
- Value.emplace<BinaryValue>(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer()), BinaryAttachmentHash);
+ Value.emplace<CompositeBuffer>(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer()));
+ Hash = BinaryAttachmentHash;
++Fields;
}
else if (MemoryView BinaryView = Fields.AsBinaryView(); !Fields.HasError())
@@ -126,14 +135,17 @@ CbAttachment::TryLoad(CbFieldIterator& Fields)
if (BinaryView.GetSize() > 0)
{
// Is a compressed binary blob
- Value.emplace<CompressedBuffer>(
- CompressedBuffer::FromCompressed(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer())).MakeOwned());
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer())).MakeOwned();
+ Value.emplace<CompressedBuffer>(Compressed);
+ Hash = IoHash::FromBLAKE3(Compressed.GetRawHash());
++Fields;
}
else
{
// Is an uncompressed empty binary blob
- Value.emplace<BinaryValue>(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer()), IoHash::HashBuffer(nullptr, 0));
+ Value.emplace<CompositeBuffer>(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer()));
+ Hash = IoHash::HashBuffer(nullptr, 0);
++Fields;
}
}
@@ -179,7 +191,8 @@ TryLoad_ArchiveFieldIntoAttachment(CbAttachment& TargetAttachment, CbField&& Fie
if (Buffer.GetSize() > 0)
{
// Is a compressed binary blob
- TargetAttachment = CbAttachment(CompressedBuffer::FromCompressed(std::move(Buffer)));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(std::move(Buffer));
+ TargetAttachment = CbAttachment(Compressed, IoHash::FromBLAKE3(Compressed.GetRawHash()));
}
else
{
@@ -205,25 +218,25 @@ CbAttachment::TryLoad(BinaryReader& Reader, BufferAllocator Allocator)
void
CbAttachment::Save(CbWriter& Writer) const
{
- if (const CbObjectValue* ObjValue = std::get_if<CbObjectValue>(&Value))
+ if (const CbObject* Object = std::get_if<CbObject>(&Value))
{
- if (ObjValue->Object)
+ if (*Object)
{
- Writer.AddObjectAttachment(ObjValue->Hash);
+ Writer.AddObjectAttachment(Hash);
}
- Writer.AddObject(ObjValue->Object);
+ Writer.AddObject(*Object);
}
- else if (const BinaryValue* BinValue = std::get_if<BinaryValue>(&Value))
+ else if (const CompositeBuffer* Binary = std::get_if<CompositeBuffer>(&Value))
{
- if (BinValue->Buffer.GetSize() > 0)
+ if (Binary->GetSize() > 0)
{
- Writer.AddBinaryAttachment(BinValue->Hash);
+ Writer.AddBinaryAttachment(Hash);
}
- Writer.AddBinary(BinValue->Buffer);
+ Writer.AddBinary(*Binary);
}
- else if (const CompressedBuffer* BufferValue = std::get_if<CompressedBuffer>(&Value))
+ else if (const CompressedBuffer* Compressed = std::get_if<CompressedBuffer>(&Value))
{
- Writer.AddBinary(BufferValue->GetCompressed());
+ Writer.AddBinary(Compressed->GetCompressed());
}
}
@@ -244,7 +257,7 @@ CbAttachment::IsNull() const
bool
CbAttachment::IsBinary() const
{
- return std::holds_alternative<BinaryValue>(Value);
+ return std::holds_alternative<CompositeBuffer>(Value);
}
bool
@@ -256,36 +269,21 @@ CbAttachment::IsCompressedBinary() const
bool
CbAttachment::IsObject() const
{
- return std::holds_alternative<CbObjectValue>(Value);
+ return std::holds_alternative<CbObject>(Value);
}
IoHash
CbAttachment::GetHash() const
{
- if (const CompressedBuffer* Buffer = std::get_if<CompressedBuffer>(&Value))
- {
- return IoHash::FromBLAKE3(Buffer->GetRawHash());
- }
-
- if (const BinaryValue* BinValue = std::get_if<BinaryValue>(&Value))
- {
- return BinValue->Hash;
- }
-
- if (const CbObjectValue* ObjectValue = std::get_if<CbObjectValue>(&Value))
- {
- return ObjectValue->Hash;
- }
-
- return IoHash::Zero;
+ return Hash;
}
CompositeBuffer
CbAttachment::AsCompositeBinary() const
{
- if (const BinaryValue* BinValue = std::get_if<BinaryValue>(&Value))
+ if (const CompositeBuffer* BinValue = std::get_if<CompositeBuffer>(&Value))
{
- return BinValue->Buffer;
+ return *BinValue;
}
return CompositeBuffer::Null;
@@ -294,9 +292,9 @@ CbAttachment::AsCompositeBinary() const
SharedBuffer
CbAttachment::AsBinary() const
{
- if (const BinaryValue* BinValue = std::get_if<BinaryValue>(&Value))
+ if (const CompositeBuffer* BinValue = std::get_if<CompositeBuffer>(&Value))
{
- return BinValue->Buffer.Flatten();
+ return BinValue->Flatten();
}
return {};
@@ -305,9 +303,9 @@ CbAttachment::AsBinary() const
CompressedBuffer
CbAttachment::AsCompressedBinary() const
{
- if (const CompressedBuffer* Buffer = std::get_if<CompressedBuffer>(&Value))
+ if (const CompressedBuffer* CompValue = std::get_if<CompressedBuffer>(&Value))
{
- return *Buffer;
+ return *CompValue;
}
return CompressedBuffer::Null;
@@ -317,9 +315,9 @@ CbAttachment::AsCompressedBinary() const
CbObject
CbAttachment::AsObject() const
{
- if (const CbObjectValue* ObjectValue = std::get_if<CbObjectValue>(&Value))
+ if (const CbObject* ObjectValue = std::get_if<CbObject>(&Value))
{
- return ObjectValue->Object;
+ return *ObjectValue;
}
return {};
@@ -377,6 +375,19 @@ CbPackage::AddAttachment(const CbAttachment& Attachment, AttachmentResolver* Res
}
}
+void
+CbPackage::AddAttachments(std::span<const CbAttachment> InAttachments)
+{
+ if (InAttachments.empty())
+ {
+ return;
+ }
+ // Assume we have no duplicates!
+ Attachments.insert(Attachments.end(), InAttachments.begin(), InAttachments.end());
+ std::sort(Attachments.begin(), Attachments.end());
+ ZEN_ASSERT_SLOW(std::unique(Attachments.begin(), Attachments.end()) == Attachments.end());
+}
+
int32_t
CbPackage::RemoveAttachment(const IoHash& Hash)
{
@@ -710,7 +721,7 @@ namespace legacy {
{
return false;
}
- Package.AddAttachment(CbAttachment(Compressed));
+ Package.AddAttachment(CbAttachment(Compressed, Hash));
}
else
{
@@ -738,7 +749,7 @@ namespace legacy {
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(AttachmentData))
{
- Package.AddAttachment(CbAttachment(Compressed));
+ Package.AddAttachment(CbAttachment(Compressed, Hash));
}
else
{
@@ -978,15 +989,15 @@ TEST_CASE("usonpackage.serialization")
{
using namespace std::literals;
- const auto TestSaveLoadValidate = [&](const char* Test, const CbPackage& Package) {
+ const auto TestSaveLoadValidate = [&](const char* Test, CbPackage& InOutPackage) {
ZEN_UNUSED(Test);
CbWriter Writer;
- Package.Save(Writer);
+ InOutPackage.Save(Writer);
CbFieldIterator Fields = Writer.Save();
BinaryWriter MemStream;
- Package.Save(MemStream);
+ InOutPackage.Save(MemStream);
CHECK(MakeMemoryView(MemStream).EqualBytes(Fields.GetRangeBuffer().GetView()));
CHECK(ValidateCompactBinaryRange(MakeMemoryView(MemStream), CbValidateMode::All) == CbValidateError::None);
@@ -995,13 +1006,14 @@ TEST_CASE("usonpackage.serialization")
CbPackage FromFields;
FromFields.TryLoad(Fields);
CHECK_FALSE(bool(Fields));
- CHECK(FromFields == Package);
+ CHECK(FromFields == InOutPackage);
CbPackage FromArchive;
BinaryReader ReadAr(MakeMemoryView(MemStream));
FromArchive.TryLoad(ReadAr);
CHECK(ReadAr.CurrentOffset() == ReadAr.Size());
- CHECK(FromArchive == Package);
+ CHECK(FromArchive == InOutPackage);
+ InOutPackage = FromArchive;
};
SUBCASE("Empty")
@@ -1083,7 +1095,7 @@ TEST_CASE("usonpackage.serialization")
const CbAttachment* const Object2Attachment = Package.FindAttachment(Object2.GetHash());
CHECK((Object1Attachment && Object1Attachment->AsObject().Equals(Object1)));
- CHECK((Object2Attachment && Object2Attachment->AsBinary() == Object2.GetBuffer()));
+ CHECK((Object2Attachment && Object2Attachment->AsBinary().GetView().EqualBytes(Object2.GetBuffer().GetView())));
SharedBuffer Object1ClonedBuffer = SharedBuffer::Clone(Object1.GetOuterBuffer());
Package.AddAttachment(CbAttachment(Object1ClonedBuffer));
diff --git a/zencore/compositebuffer.cpp b/zencore/compositebuffer.cpp
index 3190ca5ea..e4ca93cc2 100644
--- a/zencore/compositebuffer.cpp
+++ b/zencore/compositebuffer.cpp
@@ -124,6 +124,100 @@ CompositeBuffer::ViewOrCopyRange(uint64_t Offset, uint64_t Size, UniqueBuffer& C
return View;
}
+CompositeBuffer::Iterator
+CompositeBuffer::GetIterator(uint64_t Offset) const
+{
+ size_t SegmentCount = m_Segments.size();
+ size_t SegmentIndex = 0;
+ while (SegmentIndex < SegmentCount)
+ {
+ size_t SegmentSize = m_Segments[SegmentIndex].GetSize();
+ if (Offset < SegmentSize)
+ {
+ return {.SegmentIndex = SegmentIndex, .OffsetInSegment = Offset};
+ }
+ Offset -= SegmentSize;
+ SegmentIndex++;
+ }
+ return {.SegmentIndex = ~0ull, .OffsetInSegment = ~0ull};
+}
+
+MemoryView
+CompositeBuffer::ViewOrCopyRange(Iterator& It, uint64_t Size, UniqueBuffer& CopyBuffer) const
+{
+ MutableMemoryView WriteView;
+ size_t SegmentCount = m_Segments.size();
+ ZEN_ASSERT(It.SegmentIndex < SegmentCount);
+ uint64_t SizeLeft = Size;
+ while (SizeLeft > 0 && It.SegmentIndex < SegmentCount)
+ {
+ const SharedBuffer& Segment = m_Segments[It.SegmentIndex];
+ size_t SegmentSize = Segment.GetSize();
+ if (Size == SizeLeft && Size <= (SegmentSize - It.OffsetInSegment))
+ {
+ MemoryView View = Segment.GetView();
+ View.RightChopInline(It.OffsetInSegment);
+ View.LeftInline(SizeLeft);
+ It.OffsetInSegment += SizeLeft;
+ ZEN_ASSERT_SLOW(It.OffsetInSegment <= SegmentSize);
+ if (It.OffsetInSegment == SegmentSize)
+ {
+ It.SegmentIndex++;
+ It.OffsetInSegment = 0;
+ }
+ return View;
+ }
+ if (WriteView.GetSize() == 0)
+ {
+ if (CopyBuffer.GetSize() < Size)
+ {
+ CopyBuffer = UniqueBuffer::Alloc(Size);
+ }
+ WriteView = CopyBuffer.GetMutableView();
+ }
+ size_t CopySize = zen::Min(SegmentSize - It.OffsetInSegment, SizeLeft);
+ MemoryView ReadView = Segment.GetView();
+ ReadView.RightChopInline(It.OffsetInSegment);
+ ReadView.LeftInline(CopySize);
+ WriteView = WriteView.CopyFrom(ReadView);
+ It.OffsetInSegment += CopySize;
+ ZEN_ASSERT_SLOW(It.OffsetInSegment <= SegmentSize);
+ if (It.OffsetInSegment == SegmentSize)
+ {
+ It.SegmentIndex++;
+ It.OffsetInSegment = 0;
+ }
+ SizeLeft -= CopySize;
+ }
+ return CopyBuffer.GetView().Left(Size - SizeLeft);
+}
+
+void
+CompositeBuffer::CopyTo(MutableMemoryView WriteView, Iterator& It) const
+{
+ size_t SizeLeft = WriteView.GetSize();
+ size_t SegmentCount = m_Segments.size();
+ ZEN_ASSERT(It.SegmentIndex < SegmentCount);
+ while (WriteView.GetSize() > 0 && It.SegmentIndex < SegmentCount)
+ {
+ const SharedBuffer& Segment = m_Segments[It.SegmentIndex];
+ size_t SegmentSize = Segment.GetSize();
+ size_t CopySize = zen::Min(SegmentSize - It.OffsetInSegment, SizeLeft);
+ MemoryView ReadView = Segment.GetView();
+ ReadView.RightChopInline(It.OffsetInSegment);
+ ReadView.LeftInline(CopySize);
+ WriteView = WriteView.CopyFrom(ReadView);
+ It.OffsetInSegment += CopySize;
+ ZEN_ASSERT_SLOW(It.OffsetInSegment <= SegmentSize);
+ if (It.OffsetInSegment == SegmentSize)
+ {
+ It.SegmentIndex++;
+ It.OffsetInSegment = 0;
+ }
+ SizeLeft -= CopySize;
+ }
+}
+
void
CompositeBuffer::CopyTo(MutableMemoryView Target, uint64_t Offset) const
{
diff --git a/zencore/compress.cpp b/zencore/compress.cpp
index 35a5acb3a..15cc5f6a7 100644
--- a/zencore/compress.cpp
+++ b/zencore/compress.cpp
@@ -65,7 +65,8 @@ struct BufferHeader
BufferHeader Header;
if (sizeof(BufferHeader) <= CompressedData.GetSize())
{
- CompressedData.CopyTo(MakeMutableMemoryView(&Header, &Header + 1));
+ CompositeBuffer::Iterator It;
+ CompressedData.CopyTo(MakeMutableMemoryView(&Header, &Header + 1), It);
Header.ByteSwap();
}
return Header;
@@ -235,10 +236,13 @@ BlockEncoder::Compress(const CompositeBuffer& RawData, const uint64_t BlockSize)
{
UniqueBuffer RawBlockCopy;
MutableMemoryView CompressedBlocksView = CompressedData.GetMutableView() + sizeof(BufferHeader) + MetaSize;
+
+ CompositeBuffer::Iterator It = RawData.GetIterator(0);
+
for (uint64_t RawOffset = 0; RawOffset < RawSize;)
{
const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize);
- const MemoryView RawBlock = RawData.ViewOrCopyRange(RawOffset, RawBlockSize, RawBlockCopy);
+ const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy);
RawHash.Append(RawBlock);
MutableMemoryView CompressedBlock = CompressedBlocksView;
@@ -669,8 +673,10 @@ BufferHeader::IsValid(const CompositeBuffer& CompressedData)
{
if (const BaseDecoder* const Decoder = GetDecoder(Header.Method))
{
- UniqueBuffer HeaderCopy;
- const MemoryView HeaderView = CompressedData.ViewOrCopyRange(0, Decoder->GetHeaderSize(Header), HeaderCopy);
+ UniqueBuffer HeaderCopy = UniqueBuffer::Alloc(Decoder->GetHeaderSize(Header));
+ CompositeBuffer::Iterator It;
+ CompressedData.CopyTo(HeaderCopy.GetMutableView(), It);
+ const MemoryView HeaderView = HeaderCopy.GetView();
if (Header.Crc32 == BufferHeader::CalculateCrc32(HeaderView))
{
return true;
@@ -851,6 +857,30 @@ CompressedBuffer::FromCompressed(SharedBuffer&& InCompressedData)
return Local;
}
+CompressedBuffer
+CompressedBuffer::FromCompressedNoValidate(IoBuffer&& InCompressedData)
+{
+ if (InCompressedData.GetSize() <= sizeof(detail::BufferHeader))
+ {
+ return CompressedBuffer();
+ }
+ CompressedBuffer Local;
+ Local.CompressedData = CompositeBuffer(SharedBuffer(std::move(InCompressedData)));
+ return Local;
+}
+
+CompressedBuffer
+CompressedBuffer::FromCompressedNoValidate(CompositeBuffer&& InCompressedData)
+{
+ if (InCompressedData.GetSize() <= sizeof(detail::BufferHeader))
+ {
+ return CompressedBuffer();
+ }
+ CompressedBuffer Local;
+ Local.CompressedData = std::move(InCompressedData);
+ return Local;
+}
+
uint64_t
CompressedBuffer::GetRawSize() const
{
@@ -927,7 +957,9 @@ CompressedBuffer::DecompressToComposite() const
}
bool
-CompressedBuffer::TryGetCompressParameters(OodleCompressor& OutCompressor, OodleCompressionLevel& OutCompressionLevel) const
+CompressedBuffer::TryGetCompressParameters(OodleCompressor& OutCompressor,
+ OodleCompressionLevel& OutCompressionLevel,
+ uint64_t& OutBlockSize) const
{
using namespace detail;
if (CompressedData)
@@ -937,10 +969,12 @@ CompressedBuffer::TryGetCompressParameters(OodleCompressor& OutCompressor, Oodle
case CompressionMethod::None:
OutCompressor = OodleCompressor::NotSet;
OutCompressionLevel = OodleCompressionLevel::None;
+ OutBlockSize = 0;
return true;
case CompressionMethod::Oodle:
OutCompressor = OodleCompressor(Header.Compressor);
OutCompressionLevel = OodleCompressionLevel(Header.CompressionLevel);
+ OutBlockSize = uint64_t(1) << Header.BlockSizeExponent;
return true;
default:
break;
diff --git a/zencore/include/zencore/compactbinarypackage.h b/zencore/include/zencore/compactbinarypackage.h
index c3e587f40..16f723edc 100644
--- a/zencore/include/zencore/compactbinarypackage.h
+++ b/zencore/include/zencore/compactbinarypackage.h
@@ -64,8 +64,8 @@ public:
ZENCORE_API explicit CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash);
/** Construct a compressed binary attachment. Value is cloned if not owned. */
- ZENCORE_API explicit CbAttachment(const CompressedBuffer& InValue);
- ZENCORE_API explicit CbAttachment(CompressedBuffer&& InValue);
+ ZENCORE_API explicit CbAttachment(const CompressedBuffer& InValue, const IoHash& Hash);
+ ZENCORE_API explicit CbAttachment(CompressedBuffer&& InValue, const IoHash& Hash);
/** Reset this to a null attachment. */
inline void Reset() { *this = CbAttachment(); }
@@ -133,28 +133,8 @@ public:
private:
ZENCORE_API CbAttachment(const CbObject& Value, const IoHash* Hash);
- struct CbObjectValue
- {
- CbObject Object;
- IoHash Hash;
-
- CbObjectValue(const CbObject& InObject, const IoHash& InHash) : Object(InObject), Hash(InHash) {}
- CbObjectValue(CbObject&& InObject, const IoHash& InHash) : Object(std::move(InObject)), Hash(InHash) {}
- };
-
- struct BinaryValue
- {
- CompositeBuffer Buffer;
- IoHash Hash;
-
- BinaryValue(const CompositeBuffer& InBuffer) : Buffer(InBuffer.MakeOwned()), Hash(IoHash::HashBuffer(InBuffer)) {}
- BinaryValue(const CompositeBuffer& InBuffer, const IoHash& InHash) : Buffer(InBuffer.MakeOwned()), Hash(InHash) {}
- BinaryValue(CompositeBuffer&& InBuffer) : Buffer(std::move(InBuffer)), Hash(IoHash::HashBuffer(Buffer)) {}
- BinaryValue(CompositeBuffer&& InBuffer, const IoHash& InHash) : Buffer(std::move(InBuffer)), Hash(InHash) {}
- BinaryValue(SharedBuffer&& InBuffer, const IoHash& InHash) : Buffer(std::move(InBuffer)), Hash(InHash) {}
- };
-
- std::variant<std::nullptr_t, CbObjectValue, BinaryValue, CompressedBuffer> Value;
+ IoHash Hash;
+ std::variant<std::nullptr_t, CbObject, CompositeBuffer, CompressedBuffer> Value;
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -300,6 +280,8 @@ public:
/** Add the attachment to this package, along with any references that can be resolved. */
inline void AddAttachment(const CbAttachment& Attachment, AttachmentResolver Resolver) { AddAttachment(Attachment, &Resolver); }
+ void AddAttachments(std::span<const CbAttachment> Attachments);
+
/**
* Remove an attachment by hash.
*
diff --git a/zencore/include/zencore/compositebuffer.h b/zencore/include/zencore/compositebuffer.h
index 4a3b60428..4e4b4d002 100644
--- a/zencore/include/zencore/compositebuffer.h
+++ b/zencore/include/zencore/compositebuffer.h
@@ -99,6 +99,15 @@ public:
uint64_t Size,
std::function<void(MemoryView View, const SharedBuffer& ViewOuter)> Visitor) const;
+ struct Iterator
+ {
+ size_t SegmentIndex = 0;
+ uint64_t OffsetInSegment = 0;
+ };
+ ZENCORE_API Iterator GetIterator(uint64_t Offset) const;
+ ZENCORE_API MemoryView ViewOrCopyRange(Iterator& It, uint64_t Size, UniqueBuffer& CopyBuffer) const;
+ ZENCORE_API void CopyTo(MutableMemoryView Target, Iterator& It) const;
+
/** A null composite buffer. */
static const CompositeBuffer Null;
diff --git a/zencore/include/zencore/compress.h b/zencore/include/zencore/compress.h
index d37ecfa79..92dc1fb76 100644
--- a/zencore/include/zencore/compress.h
+++ b/zencore/include/zencore/compress.h
@@ -75,6 +75,8 @@ public:
[[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressed(CompositeBuffer&& CompressedData);
[[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressed(const SharedBuffer& CompressedData);
[[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressed(SharedBuffer&& CompressedData);
+ [[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(IoBuffer&& CompressedData);
+ [[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(CompositeBuffer&& CompressedData);
/** Reset this to null. */
inline void Reset() { CompressedData.Reset(); }
@@ -89,8 +91,8 @@ public:
[[nodiscard]] inline bool IsOwned() const { return CompressedData.IsOwned(); }
/** Returns a copy of the compressed buffer that owns its underlying memory. */
- [[nodiscard]] inline CompressedBuffer MakeOwned() const& { return FromCompressed(CompressedData.MakeOwned()); }
- [[nodiscard]] inline CompressedBuffer MakeOwned() && { return FromCompressed(std::move(CompressedData).MakeOwned()); }
+ [[nodiscard]] inline CompressedBuffer MakeOwned() const& { return FromCompressedNoValidate(CompressedData.MakeOwned()); }
+ [[nodiscard]] inline CompressedBuffer MakeOwned() && { return FromCompressedNoValidate(std::move(CompressedData).MakeOwned()); }
/** Returns a composite buffer containing the compressed data. May be null. May not be owned. */
[[nodiscard]] inline const CompositeBuffer& GetCompressed() const& { return CompressedData; }
@@ -117,7 +119,8 @@ public:
* @return True if parameters were written, otherwise false.
*/
[[nodiscard]] ZENCORE_API bool TryGetCompressParameters(OodleCompressor& OutCompressor,
- OodleCompressionLevel& OutCompressionLevel) const;
+ OodleCompressionLevel& OutCompressionLevel,
+ uint64_t& OutBlockSize) const;
/**
* Decompress into a memory view that is less or equal GetRawSize() bytes.
diff --git a/zencore/include/zencore/stream.h b/zencore/include/zencore/stream.h
index ec303e1f8..9e4996249 100644
--- a/zencore/include/zencore/stream.h
+++ b/zencore/include/zencore/stream.h
@@ -28,6 +28,7 @@ public:
}
inline void Write(MemoryView Memory) { Write(Memory.GetData(), Memory.GetSize()); }
+ void Write(std::initializer_list<const MemoryView> Buffers);
inline uint64_t CurrentOffset() const { return m_Offset; }
@@ -41,7 +42,6 @@ public:
inline MutableMemoryView GetMutableView() { return MutableMemoryView(m_Buffer.data(), m_Offset); }
private:
- RwLock m_Lock;
std::vector<uint8_t> m_Buffer;
uint64_t m_Offset = 0;
diff --git a/zencore/stream.cpp b/zencore/stream.cpp
index 8faf90af2..3402e51be 100644
--- a/zencore/stream.cpp
+++ b/zencore/stream.cpp
@@ -11,10 +11,28 @@
namespace zen {
void
-BinaryWriter::Write(const void* data, size_t ByteCount, uint64_t Offset)
+BinaryWriter::Write(std::initializer_list<const MemoryView> Buffers)
{
- RwLock::ExclusiveLockScope _(m_Lock);
+ size_t TotalByteCount = 0;
+ for (const MemoryView& View : Buffers)
+ {
+ TotalByteCount += View.GetSize();
+ }
+ const size_t NeedEnd = m_Offset + TotalByteCount;
+ if (NeedEnd > m_Buffer.size())
+ {
+ m_Buffer.resize(NeedEnd);
+ }
+ for (const MemoryView& View : Buffers)
+ {
+ memcpy(m_Buffer.data() + m_Offset, View.GetData(), View.GetSize());
+ m_Offset += View.GetSize();
+ }
+}
+void
+BinaryWriter::Write(const void* data, size_t ByteCount, uint64_t Offset)
+{
const size_t NeedEnd = Offset + ByteCount;
if (NeedEnd > m_Buffer.size())
@@ -28,8 +46,6 @@ BinaryWriter::Write(const void* data, size_t ByteCount, uint64_t Offset)
void
BinaryWriter::Reset()
{
- RwLock::ExclusiveLockScope _(m_Lock);
-
m_Buffer.clear();
m_Offset = 0;
}
@@ -41,6 +57,18 @@ BinaryWriter::Reset()
#if ZEN_WITH_TESTS
+TEST_CASE("binary.writer.span")
+{
+ BinaryWriter Writer;
+ const MemoryView View1("apa", 3);
+ const MemoryView View2(" ", 1);
+ const MemoryView View3("banan", 5);
+ Writer.Write({View1, View2, View3});
+ MemoryView Result = Writer.GetView();
+ CHECK(Result.GetSize() == 9);
+ CHECK(memcmp(Result.GetData(), "apa banan", 9) == 0);
+}
+
void
stream_forcelink()
{
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp
index a7dca5441..e2f061a87 100644
--- a/zenhttp/httpshared.cpp
+++ b/zenhttp/httpshared.cpp
@@ -17,6 +17,10 @@
#include <span>
#include <vector>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
namespace zen {
std::vector<IoBuffer>
@@ -93,10 +97,12 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
ResponseBuffers.push_back(std::move(RefBuffer));
};
- auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary,
- bool DenyPartialLocalReferences,
- CbAttachmentReferenceHeader& LocalRef,
- std::string& Path8) -> bool {
+ tsl::robin_map<void*, std::string> FileNameMap;
+
+ auto IsLocalRef = [&FileNameMap](const CompositeBuffer& AttachmentBinary,
+ bool DenyPartialLocalReferences,
+ CbAttachmentReferenceHeader& LocalRef,
+ std::string& Path8) -> bool {
const SharedBuffer& Segment = AttachmentBinary.GetSegments().front();
IoBufferFileReference Ref;
const IoBuffer& SegmentBuffer = Segment.AsIoBuffer();
@@ -111,9 +117,17 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
return false;
}
- ExtendablePathBuilder<256> LocalRefFile;
- LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle)));
- Path8 = LocalRefFile.ToUtf8();
+ if (auto It = FileNameMap.find(Ref.FileHandle); It != FileNameMap.end())
+ {
+ Path8 = It->second;
+ }
+ else
+ {
+ ExtendablePathBuilder<256> LocalRefFile;
+ LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle)));
+ Path8 = LocalRefFile.ToUtf8();
+ FileNameMap.insert_or_assign(Ref.FileHandle, Path8);
+ }
LocalRef.AbsolutePathLength = gsl::narrow<uint16_t>(Path8.size());
LocalRef.PayloadByteOffset = Ref.FileChunkOffset;
@@ -131,7 +145,7 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary())
{
CompositeBuffer Compressed = AttachmentBuffer.GetCompressed();
- IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash());
+ IoHash AttachmentHash = Attachment.GetHash();
// If the data is either not backed by a file, or there are multiple
// fragments then we cannot marshal it by local reference. We might
@@ -158,7 +172,7 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
{
*AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(),
.Flags = CbAttachmentEntry::kIsCompressed,
- .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())};
+ .AttachmentHash = AttachmentHash};
for (const SharedBuffer& Segment : Compressed.GetSegments())
{
@@ -177,7 +191,7 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
}
else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary())
{
- IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash());
+ IoHash AttachmentHash = Attachment.GetHash();
bool MarshalByLocalRef =
EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1);
bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences);
@@ -211,6 +225,7 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
ZEN_NOT_IMPLEMENTED("Unknown attachment kind");
}
}
+ FileNameMap.clear();
return ResponseBuffers;
}
@@ -262,6 +277,11 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
CbPackage Package;
+ std::vector<CbAttachment> Attachments;
+ Attachments.reserve(ChunkCount); // Guessing here...
+
+ tsl::robin_map<std::string, IoBuffer> PartialFileBuffers;
+
for (uint32_t i = 0; i < ChunkCount; ++i)
{
const CbAttachmentEntry& Entry = AttachmentEntries[i];
@@ -283,23 +303,18 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
std::filesystem::path Path{std::u8string_view(PathPointer, AttachRefHdr->AbsolutePathLength)};
- if (IoBuffer ChunkReference =
- IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize))
+ IoBuffer FullFileBuffer;
+ if (auto It = PartialFileBuffers.find(Path.string()); It != PartialFileBuffers.end())
{
- CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)));
- if (!CompBuf)
- {
- throw std::runtime_error(fmt::format("invalid format for chunk #{} at '{}' (offset {}, size {})",
- i,
- PathToUtf8(Path),
- AttachRefHdr->PayloadByteOffset,
- AttachRefHdr->PayloadByteSize));
- }
- CbAttachment Attachment(std::move(CompBuf));
- Package.AddAttachment(Attachment);
+ FullFileBuffer = It->second;
}
else
{
+ FullFileBuffer = PartialFileBuffers.insert_or_assign(Path.string(), IoBufferBuilder::MakeFromFile(Path)).first->second;
+ }
+
+ if (!FullFileBuffer)
+ {
// Unable to open chunk reference
throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})",
i,
@@ -307,6 +322,21 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
AttachRefHdr->PayloadByteOffset,
AttachRefHdr->PayloadByteSize));
}
+
+ IoBuffer ChunkReference = AttachRefHdr->PayloadByteOffset == 0 && AttachRefHdr->PayloadByteSize == FullFileBuffer.GetSize()
+ ? FullFileBuffer
+ : IoBuffer(FullFileBuffer, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize);
+
+ CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkReference)));
+ if (!CompBuf)
+ {
+ throw std::runtime_error(fmt::format("invalid format for chunk #{} at '{}' (offset {}, size {})",
+ i,
+ PathToUtf8(Path),
+ AttachRefHdr->PayloadByteOffset,
+ AttachRefHdr->PayloadByteSize));
+ }
+ Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash));
}
else if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
{
@@ -341,8 +371,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
throw std::runtime_error(fmt::format("invalid format for chunk #{} expected compressed buffer for attachment", i));
}
- CbAttachment Attachment(std::move(CompBuf));
- Package.AddAttachment(Attachment);
+ Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash));
}
}
else /* not compressed */
@@ -367,10 +396,13 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
AttachmentBufferCopy.GetMutableView().CopyFrom(AttachmentBuffer.GetView());
CbAttachment Attachment(SharedBuffer{AttachmentBufferCopy});
- Package.AddAttachment(Attachment);
+ Attachments.emplace_back(SharedBuffer{AttachmentBufferCopy});
}
}
}
+ PartialFileBuffers.clear();
+
+ Package.AddAttachments(Attachments);
return Package;
}
@@ -517,12 +549,13 @@ CbPackageReader::Finalize()
if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
{
- m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference))));
+ m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)), Entry.AttachmentHash));
}
else
{
- m_Attachments.push_back(CbAttachment(
- CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None)));
+ CompressedBuffer Compressed =
+ CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None);
+ m_Attachments.push_back(CbAttachment(std::move(Compressed), IoHash::FromBLAKE3(Compressed.GetRawHash())));
}
}
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 82b770e3c..1d0596c29 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -523,7 +523,8 @@ TEST_CASE("project.basic")
{
uint8_t AttachData[] = {1, 2, 3};
- zen::CbAttachment Attach{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}))};
+ zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}));
+ zen::CbAttachment Attach{Attachment, IoHash::FromBLAKE3(Attachment.GetRawHash())};
zen::CbObjectWriter OpWriter;
OpWriter << "key"
@@ -773,7 +774,7 @@ TEST_CASE("zcache.cbpackage")
zen::CbPackage Package;
Package.SetObject(Obj.Save().AsObject());
- Package.AddAttachment(zen::CbAttachment(CompressedData));
+ Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey));
return Package;
};
@@ -999,7 +1000,7 @@ TEST_CASE("zcache.policy")
zen::CbPackage Package;
Package.SetObject(CacheRecord);
- Package.AddAttachment(zen::CbAttachment(CompressedData));
+ Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey));
return Package;
};
@@ -2655,12 +2656,14 @@ TEST_CASE("http.package")
static const uint8_t Data1[] = {0, 1, 2, 3};
static const uint8_t Data2[] = {0, 1, 2, 3, 4, 5, 6, 7, 8};
- zen::CbAttachment Attach1{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data1, 4}),
- zen::OodleCompressor::NotSet,
- zen::OodleCompressionLevel::None)};
- zen::CbAttachment Attach2{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data2, 8}),
- zen::OodleCompressor::NotSet,
- zen::OodleCompressionLevel::None)};
+ zen::CompressedBuffer AttachmentData1 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data1, 4}),
+ zen::OodleCompressor::NotSet,
+ zen::OodleCompressionLevel::None);
+ zen::CbAttachment Attach1{AttachmentData1, IoHash::FromBLAKE3(AttachmentData1.GetRawHash())};
+ zen::CompressedBuffer AttachmentData2 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data2, 8}),
+ zen::OodleCompressor::NotSet,
+ zen::OodleCompressionLevel::None);
+ zen::CbAttachment Attach2{AttachmentData2, IoHash::FromBLAKE3(AttachmentData2.GetRawHash())};
zen::CbObjectWriter Writer;
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index c5beef4b3..dfb69c0fe 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -530,7 +530,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
{
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
{
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ Package.AddAttachment(
+ CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), AttachmentHash.AsHash()));
}
else
{
@@ -681,7 +682,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
{
if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash()))
{
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ Package.AddAttachment(
+ CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), HashView.AsHash()));
Count.Valid++;
}
}
@@ -1605,7 +1607,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
{
if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
{
- ResponsePackage.AddAttachment(CbAttachment(Value.Payload));
+ ResponsePackage.AddAttachment(CbAttachment(Value.Payload, IoHash::FromBLAKE3(Value.Payload.GetRawHash())));
}
}
@@ -1959,10 +1961,11 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
const CompressedBuffer& Result = Request.Result;
if (Result)
{
- ResponseObject.AddHash("RawHash"sv, IoHash::FromBLAKE3(Result.GetRawHash()));
+ IoHash Hash = IoHash::FromBLAKE3(Result.GetRawHash());
+ ResponseObject.AddHash("RawHash"sv, Hash);
if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
{
- RpcResponse.AddAttachment(CbAttachment(Result));
+ RpcResponse.AddAttachment(CbAttachment(Result, Hash));
}
else
{
@@ -2475,7 +2478,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept
Writer.AddHash("RawHash"sv, Request.Key->ChunkId);
if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
{
- RpcResponse.AddAttachment(CbAttachment(Request.Value));
+ RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId));
}
else
{
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index d46b312d3..6940583d1 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -5,6 +5,7 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compactbinaryvalue.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
@@ -15,6 +16,8 @@
#include <zencore/testutils.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/httpshared.h>
#include <zenstore/caslog.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/basicfile.h>
@@ -61,7 +64,7 @@ namespace {
// We can't move our folder, probably because it is busy, bail..
return false;
}
- zen::Sleep(100);
+ Sleep(100);
} while (true);
}
} // namespace
@@ -77,7 +80,7 @@ OpKeyStringAsOId(std::string_view OpKey)
Writer << "key"sv << OpKey;
XXH3_128Stream KeyHasher;
- Writer.Save()["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ Writer.Save()["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
XXH3_128 KeyHash = KeyHasher.GetHash();
Oid OpId;
@@ -142,7 +145,7 @@ struct ProjectStore::OplogStorage : public RefCounted
uint64_t InvalidEntries = 0;
m_Oplog.Replay(
- [&](const zen::OplogEntry& LogEntry) {
+ [&](const OplogEntry& LogEntry) {
if (LogEntry.OpCoreSize == 0)
{
++InvalidEntries;
@@ -209,6 +212,8 @@ struct ProjectStore::OplogStorage : public RefCounted
{
ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp");
+ using namespace std::literals;
+
SharedBuffer Buffer = Op.GetBuffer();
const uint64_t WriteSize = Buffer.GetSize();
const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
@@ -216,7 +221,7 @@ struct ProjectStore::OplogStorage : public RefCounted
ZEN_ASSERT(WriteSize != 0);
XXH3_128Stream KeyHasher;
- Op["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
XXH3_128 KeyHash = KeyHasher.GetHash();
RwLock::ExclusiveLockScope _(m_RwLock);
@@ -275,7 +280,7 @@ ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Stor
m_TempPath = m_BasePath / "temp"sv;
- zen::CleanDirectory(m_TempPath);
+ CleanDirectory(m_TempPath);
}
ProjectStore::Oplog::~Oplog()
@@ -476,6 +481,11 @@ ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
return false;
}
+ if (Hash != IoHash::Zero)
+ {
+ m_ChunkMap.insert_or_assign(FileId, Hash);
+ }
+
FileMapEntry Entry;
Entry.ServerPath = ServerPath;
Entry.ClientPath = ClientPath;
@@ -599,9 +609,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
using namespace std::literals;
- const CbObject& Core = OpPackage.GetObject();
- const OplogEntry OpEntry = m_Storage->AppendOp(Core);
-
// Persist attachments
uint64_t AttachmentBytes = 0;
@@ -624,12 +631,25 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
AttachmentBytes += AttachmentSize;
}
- const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry);
+ const CbObject& Core = OpPackage.GetObject();
+ const uint32_t EntryId = AppendNewOplogEntry(Core);
+
+ ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
+
+ return EntryId;
+}
+
+uint32_t
+ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
+{
+ ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry");
- ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total",
- EntryId,
- zen::NiceBytes(NewAttachmentBytes),
- zen::NiceBytes(AttachmentBytes));
+ ZEN_ASSERT(m_Storage);
+
+ using namespace std::literals;
+
+ const OplogEntry OpEntry = m_Storage->AppendOp(Core);
+ const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry);
return EntryId;
}
@@ -672,11 +692,11 @@ ProjectStore::Project::Read()
{
CbObject Cfg = LoadCompactBinaryObject(Obj);
- Identifier = Cfg["id"].AsString();
- RootDir = Cfg["root"].AsString();
- ProjectRootDir = Cfg["project"].AsString();
- EngineRootDir = Cfg["engine"].AsString();
- ProjectFilePath = Cfg["projectfile"].AsString();
+ Identifier = Cfg["id"sv].AsString();
+ RootDir = Cfg["root"sv].AsString();
+ ProjectRootDir = Cfg["project"sv].AsString();
+ EngineRootDir = Cfg["engine"sv].AsString();
+ ProjectFilePath = Cfg["projectfile"sv].AsString();
}
else
{
@@ -930,7 +950,7 @@ ProjectStore::Project::IsExpired() const
ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc)
: GcStorage(Gc)
, GcContributor(Gc)
-, m_Log(zen::logging::Get("project"))
+, m_Log(logging::Get("project"))
, m_CidStore(Store)
, m_ProjectBasePath(BasePath)
{
@@ -1245,6 +1265,251 @@ ProjectStore::Exists(std::string_view ProjectId)
return Project::Exists(BasePathForProject(ProjectId));
}
+CbArray
+ProjectStore::GetProjectsList()
+{
+ using namespace std::literals;
+
+ DiscoverProjects();
+
+ CbWriter Response;
+ Response.BeginArray();
+
+ IterateProjects([&Response](ProjectStore::Project& Prj) {
+ Response.BeginObject();
+ Response << "Id"sv << Prj.Identifier;
+ Response << "RootDir"sv << Prj.RootDir.string();
+ Response << "ProjectRootDir"sv << Prj.ProjectRootDir;
+ Response << "EngineRootDir"sv << Prj.EngineRootDir;
+ Response << "ProjectFilePath"sv << Prj.ProjectFilePath;
+ Response.EndObject();
+ });
+ Response.EndArray();
+ return Response.Save().AsArray();
+}
+
+HttpResponseCode
+ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, bool FilterClient, CbObject& OutPayload)
+{
+ using namespace std::literals;
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ ZEN_INFO("Project file request for unknown project '{}'", ProjectId);
+ return HttpResponseCode::NotFound;
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ ZEN_INFO("Project file for unknown oplog '{}/{}'", ProjectId, OplogId);
+ return HttpResponseCode::NotFound;
+ }
+
+ CbObjectWriter Response;
+ Response.BeginArray("files"sv);
+
+ FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) {
+ Response.BeginObject();
+ Response << "id"sv << Id;
+ Response << "clientpath"sv << ClientPath;
+ if (!FilterClient)
+ {
+ Response << "serverpath"sv << ServerPath;
+ }
+ Response.EndObject();
+ });
+
+ Response.EndArray();
+ OutPayload = Response.Save();
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
+ProjectStore::GetChunkInfo(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ CbObject& OutPayload)
+{
+ using namespace std::literals;
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ ZEN_INFO("Chunk info request for unknown project '{}'", ProjectId);
+ return HttpResponseCode::NotFound;
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ ZEN_INFO("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId);
+ return HttpResponseCode::NotFound;
+ }
+ if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
+ {
+ ZEN_INFO("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId);
+ return HttpResponseCode::BadRequest;
+ }
+
+ const Oid Obj = Oid::FromHexString(ChunkId);
+
+ IoBuffer Chunk = FoundLog->FindChunk(Obj);
+ if (!Chunk)
+ {
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
+ return HttpResponseCode::NotFound;
+ }
+
+ uint64_t ChunkSize = Chunk.GetSize();
+ if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ ZEN_ASSERT(!Compressed.IsNull());
+ ChunkSize = Compressed.GetRawSize();
+ }
+
+ CbObjectWriter Response;
+ Response << "size"sv << ChunkSize;
+ OutPayload = Response.Save();
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
+ProjectStore::GetChunk(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ IoBuffer& OutChunk)
+{
+ bool IsOffset = Offset != 0 || Size != ~(0ull);
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ ZEN_INFO("Chunk request for unknown project '{}'", ProjectId);
+ return HttpResponseCode::NotFound;
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ ZEN_INFO("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId);
+ return HttpResponseCode::NotFound;
+ }
+
+ if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
+ {
+ ZEN_INFO("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, ChunkId);
+ return HttpResponseCode::BadRequest;
+ }
+
+ const Oid Obj = Oid::FromHexString(ChunkId);
+
+ IoBuffer Chunk = FoundLog->FindChunk(Obj);
+ if (!Chunk)
+ {
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
+ return HttpResponseCode::NotFound;
+ }
+
+ OutChunk = Chunk;
+ HttpContentType ContentType = Chunk.GetContentType();
+
+ if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ ZEN_ASSERT(!Compressed.IsNull());
+
+ if (IsOffset)
+ {
+ uint64_t RawSize = Compressed.GetRawSize();
+ if ((Offset + Size) > RawSize)
+ {
+ Size = RawSize - Offset;
+ }
+
+ if (AcceptType == HttpContentType::kBinary)
+ {
+ OutChunk = Compressed.Decompress(Offset, Size).AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kBinary);
+ }
+ else
+ {
+ // Value will be a range of compressed blocks that covers the requested range
+ // The client will have to compensate for any offsets that do not land on an even block size multiple
+ OutChunk = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kCompressedBinary);
+ }
+ }
+ else
+ {
+ if (AcceptType == HttpContentType::kBinary)
+ {
+ OutChunk = Compressed.Decompress().AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kBinary);
+ }
+ else
+ {
+ OutChunk = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kCompressedBinary);
+ }
+ }
+ }
+ else if (IsOffset)
+ {
+ if ((Offset + Size) > Chunk.GetSize())
+ {
+ Size = Chunk.GetSize() - Offset;
+ }
+ OutChunk = IoBuffer(Chunk, Offset, Size);
+ OutChunk.SetContentType(ContentType);
+ }
+
+ ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType));
+
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
+ProjectStore::GetChunk(const std::string_view Cid, ZenContentType AcceptType, IoBuffer& OutChunk)
+{
+ using namespace std::literals;
+
+ if (Cid.length() != IoHash::StringLength)
+ {
+ ZEN_INFO("Chunk request for invalid chunk hash '{}'", Cid);
+ return HttpResponseCode::BadRequest;
+ }
+
+ const IoHash Hash = IoHash::FromHexString(Cid);
+ OutChunk = m_CidStore.FindChunkByCid(Hash);
+
+ if (!OutChunk)
+ {
+ ZEN_DEBUG("chunk - '{}' MISSING", Cid);
+ return HttpResponseCode::NotFound;
+ }
+
+ if (AcceptType == HttpContentType::kBinary)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(OutChunk));
+ OutChunk = Compressed.Decompress().AsIoBuffer();
+ OutChunk.SetContentType(HttpContentType::kBinary);
+ }
+ else
+ {
+ OutChunk.SetContentType(HttpContentType::kCompressedBinary);
+ }
+ return HttpResponseCode::OK;
+}
+
//////////////////////////////////////////////////////////////////////////
HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
@@ -1264,34 +1529,15 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
// currently not possible for (arbitrary, external) technical reasons
m_Router.RegisterRoute(
"list",
- [this](HttpRouterRequest& Req) {
- m_ProjectStore->DiscoverProjects();
-
- CbWriter Response;
- Response.BeginArray();
-
- m_ProjectStore->IterateProjects([&Response](ProjectStore::Project& Prj) {
- Response.BeginObject();
- Response << "Id"sv << Prj.Identifier;
- Response << "RootDir"sv << Prj.RootDir.string();
- Response << "ProjectRootDir"sv << Prj.ProjectRootDir;
- Response << "EngineRootDir"sv << Prj.EngineRootDir;
- Response << "ProjectFilePath"sv << Prj.ProjectFilePath;
- Response.EndObject();
- });
- Response.EndArray();
-
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save().AsArray());
- },
+ [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); },
HttpVerb::kGet);
m_Router.RegisterRoute(
"{project}/oplog/{log}/batch",
[this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
if (!Project)
@@ -1412,7 +1658,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk));
ResponsePtr += sizeof(ResponseChunk);
}
-
return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs);
},
HttpVerb::kPost);
@@ -1427,42 +1672,17 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
const auto& ProjectId = Req.GetCapture(1);
const auto& OplogId = Req.GetCapture(2);
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog& Log = *FoundLog;
-
HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
const bool FilterClient = Params.GetValue("filter"sv) == "client"sv;
- CbObjectWriter Response;
- Response.BeginArray("files");
-
- Log.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) {
- Response.BeginObject();
- Response << "id"sv << Id;
- Response << "clientpath"sv << ClientPath;
- if (!FilterClient)
- {
- Response << "serverpath"sv << ServerPath;
- }
- Response.EndObject();
- });
-
- Response.EndArray();
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
+ CbObject ResponsePayload;
+ HttpResponseCode Response = m_ProjectStore->GetProjectFiles(ProjectId, OplogId, FilterClient, ResponsePayload);
+ if (Response != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(Response);
+ }
+ return HttpReq.WriteResponse(Response, ResponsePayload);
},
HttpVerb::kGet);
@@ -1475,41 +1695,13 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
const auto& OplogId = Req.GetCapture(2);
const auto& ChunkId = Req.GetCapture(3);
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog& Log = *FoundLog;
-
- const Oid Obj = Oid::FromHexString(ChunkId);
-
- IoBuffer Chunk = Log.FindChunk(Obj);
- if (!Chunk)
- {
- m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- uint64_t ChunkSize = Chunk.GetSize();
- if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
+ CbObject ResponsePayload;
+ HttpResponseCode Response = m_ProjectStore->GetChunkInfo(ProjectId, OplogId, ChunkId, ResponsePayload);
+ if (Response != HttpResponseCode::OK)
{
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
- ZEN_ASSERT(!Compressed.IsNull());
- ChunkSize = Compressed.GetRawSize();
+ return HttpReq.WriteResponse(Response);
}
-
- CbObjectWriter Response;
- Response << "size"sv << ChunkSize;
- HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save());
+ HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload);
},
HttpVerb::kGet);
@@ -1522,9 +1714,8 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
const auto& OplogId = Req.GetCapture(2);
const auto& ChunkId = Req.GetCapture(3);
- bool IsOffset = false;
- uint64_t Offset = 0;
- uint64_t Size = ~(0ull);
+ uint64_t Offset = 0;
+ uint64_t Size = ~(0ull);
auto QueryParms = Req.ServerRequest().GetQueryParams();
@@ -1532,8 +1723,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
{
if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm))
{
- Offset = OffsetVal.value();
- IsOffset = true;
+ Offset = OffsetVal.value();
}
else
{
@@ -1545,8 +1735,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
{
if (auto SizeVal = ParseInt<uint64_t>(SizeParm))
{
- Size = SizeVal.value();
- IsOffset = true;
+ Size = SizeVal.value();
}
else
{
@@ -1555,87 +1744,16 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
}
HttpContentType AcceptType = HttpReq.AcceptContentType();
- if (AcceptType == HttpContentType::kUnknownContentType)
- {
- AcceptType = HttpContentType::kBinary;
- }
-
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- m_Log.warn("chunk - '{}/{}/{}' FAILED, missing project", ProjectId, OplogId, ChunkId);
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- m_Log.warn("chunk - '{}/{}/{}' FAILED, missing oplog", ProjectId, OplogId, ChunkId);
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog& Log = *FoundLog;
- Oid Obj = Oid::FromHexString(ChunkId);
-
- IoBuffer Chunk = Log.FindChunk(Obj);
- if (!Chunk)
- {
- m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId);
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- IoBuffer Value = Chunk;
- HttpContentType ContentType = Chunk.GetContentType();
-
- if (Chunk.GetContentType() == HttpContentType::kCompressedBinary)
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
- ZEN_ASSERT(!Compressed.IsNull());
-
- if (IsOffset)
- {
- if ((Offset + Size) > Compressed.GetRawSize())
- {
- Size = Compressed.GetRawSize() - Offset;
- }
- if (AcceptType == HttpContentType::kBinary)
- {
- Value = Compressed.Decompress(Offset, Size).AsIoBuffer();
- ContentType = HttpContentType::kBinary;
- }
- else
- {
- Value = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer();
- ContentType = HttpContentType::kCompressedBinary;
- }
- }
- else
- {
- if (AcceptType == HttpContentType::kBinary)
- {
- Value = Compressed.Decompress().AsIoBuffer();
- ContentType = HttpContentType::kBinary;
- }
- else
- {
- Value = Compressed.GetCompressed().Flatten().AsIoBuffer();
- ContentType = HttpContentType::kCompressedBinary;
- }
- }
- }
- else if (IsOffset)
+ IoBuffer Chunk;
+ HttpResponseCode Response = m_ProjectStore->GetChunk(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk);
+ if (Response != HttpResponseCode::OK)
{
- if ((Offset + Size) > Chunk.GetSize())
- {
- Size = Chunk.GetSize() - Offset;
- }
- Value = IoBuffer(Chunk, Offset, Size);
+ return HttpReq.WriteResponse(Response);
}
- m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType));
- return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value);
+ m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Chunk.GetContentType()));
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Chunk.GetContentType(), Chunk);
},
HttpVerb::kGet | HttpVerb::kHead);
@@ -1814,7 +1932,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath);
- zen::WriteFile(BadPackagePath, Payload);
+ WriteFile(BadPackagePath, Payload);
return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
}
@@ -1872,7 +1990,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
ProjectStore::Oplog& Oplog = *FoundLog;
- if (const std::optional<int32_t> OpId = zen::ParseInt<uint32_t>(OpIdString))
+ if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString))
{
if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value()))
{
@@ -1910,7 +2028,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
case ZenContentType::kCompressedBinary:
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)))
{
- Package.AddAttachment(CbAttachment(Compressed));
+ Package.AddAttachment(CbAttachment(Compressed, AttachmentHash));
}
else
{
@@ -1942,6 +2060,196 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
HttpVerb::kGet);
m_Router.RegisterRoute(
+ "{project}/oplog/{log}/archive",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ switch (Req.ServerRequest().RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbObjectWriter Response;
+ Response.BeginArray("entries"sv);
+ std::unordered_set<IoHash> AttachementHashes;
+ size_t OpCount = 0;
+ IoHashStream Hasher;
+
+ FoundLog->IterateOplog([this, &Hasher, &Response, &AttachementHashes, &OpCount](CbObject Op) {
+ SharedBuffer Buffer = Op.GetBuffer();
+ Hasher.Append(Buffer.GetView());
+ Response << Op;
+ Op.IterateAttachments([this, &AttachementHashes, &OpCount](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsAttachment();
+ AttachementHashes.emplace(AttachmentHash);
+ });
+ OpCount++;
+ });
+ Response.EndArray();
+
+ IoHash Checksum = Hasher.GetHash();
+ Response.AddHash("checksum"sv, Checksum);
+
+ ZEN_INFO("Exporting {} ops and {} chunks from '{}/{}' with checksum '{}'",
+ OpCount,
+ AttachementHashes.size(),
+ ProjectId,
+ OplogId,
+ Checksum);
+
+ CbPackage ResponsePackage;
+ ResponsePackage.SetObject(Response.Save());
+
+ std::vector<CbAttachment> Attachments;
+ Attachments.reserve(AttachementHashes.size());
+ for (const IoHash& AttachmentHash : AttachementHashes)
+ {
+ IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash);
+ if (Payload)
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload));
+ ZEN_ASSERT(Compressed);
+ Attachments.emplace_back(CbAttachment(Compressed, AttachmentHash));
+ }
+ }
+ ResponsePackage.AddAttachments(Attachments);
+
+ std::vector<IoBuffer> ResponsePayload = FormatPackageMessage(ResponsePackage, FormatFlags::kAllowLocalReferences);
+ const ZenContentType AcceptType = HttpReq.AcceptContentType();
+ if (AcceptType == ZenContentType::kCompressedBinary)
+ {
+ std::vector<SharedBuffer> Parts;
+ Parts.reserve(ResponsePayload.size());
+ for (const auto& I : ResponsePayload)
+ {
+ Parts.emplace_back(SharedBuffer(I));
+ }
+ CompositeBuffer Cmp(std::move(Parts));
+ CompressedBuffer CompressedResponse = CompressedBuffer::Compress(Cmp);
+ HttpReq.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCompressedBinary,
+ CompressedResponse.GetCompressed().Flatten().AsIoBuffer());
+ }
+ else if (AcceptType == ZenContentType::kCbPackage)
+ {
+ HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponsePayload);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ }
+ break;
+ case HttpVerb::kPost:
+ {
+ ZEN_INFO("Importing oplog '{}/{}'", ProjectId, OplogId);
+ IoBuffer CompressedPayload = HttpReq.ReadPayload();
+ IoBuffer Payload = CompressedBuffer::FromCompressed(SharedBuffer(CompressedPayload)).Decompress().AsIoBuffer();
+
+ CbPackage RequestPackage = ParsePackageMessage(Payload);
+ CbObject Request = RequestPackage.GetObject();
+ IoHash Checksum = Request["checksum"sv].AsHash();
+
+ std::span<const CbAttachment> Attachments = RequestPackage.GetAttachments();
+ zen ::CbArrayView Entries = Request["entries"sv].AsArrayView();
+
+ ZEN_INFO("Importing oplog with {} ops and {} attachments with checksum '{}' to '{}/{}'",
+ Entries.Num(),
+ Attachments.size(),
+ Checksum,
+ ProjectId,
+ OplogId);
+ std::vector<CbObject> Ops;
+ Ops.reserve(Entries.Num());
+ IoHashStream Hasher;
+ for (auto& OpEntry : Entries)
+ {
+ CbObjectView Core = OpEntry.AsObjectView();
+
+ if (!Core["key"sv])
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "No oplog entry key specified");
+ }
+
+ BinaryWriter Writer;
+ Core.CopyTo(Writer);
+ MemoryView OpView = Writer.GetView();
+ Hasher.Append(OpView);
+ IoBuffer OpBuffer(IoBuffer::Clone, OpView.GetData(), OpView.GetSize());
+ CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
+ Ops.emplace_back(Op);
+ }
+ IoHash CalculatedChecksum = Hasher.GetHash();
+ if (CalculatedChecksum != Checksum)
+ {
+ ZEN_WARN("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum);
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId);
+
+ // Spread over multiple threads?
+ WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u));
+ std::atomic_int64_t JobCount = 0;
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ JobCount.fetch_add(1);
+ WorkerPool.ScheduleWork([this, &Attachment, &JobCount]() {
+ CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary();
+ m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly);
+ JobCount.fetch_add(-1);
+ });
+ }
+ while (JobCount.load())
+ {
+ Sleep(1);
+ }
+
+ ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId);
+ for (const CbObject& Op : Ops)
+ {
+ const uint32_t OpLsn = FoundLog->AppendNewOplogEntry(Op);
+ ZEN_DEBUG("oplog entry #{}", OpLsn);
+
+ if (OpLsn == ProjectStore::Oplog::kInvalidOp)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'",
+ ProjectId,
+ OplogId,
+ OpLsn,
+ NiceBytes(Op.GetSize()),
+ Op["key"sv].AsString());
+ }
+ ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId);
+ return Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
+ }
+ break;
+ default:
+ break;
+ }
+ },
+ HttpVerb::kPost | HttpVerb::kGet);
+ m_Router.RegisterRoute(
"{project}/oplog/{log}",
[this](HttpRouterRequest& Req) {
const auto& ProjectId = Req.GetCapture(1);
@@ -2120,7 +2428,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
std::vector<std::string> OpLogs = Project->ScanForOplogs();
CbObjectWriter Response;
- Response << "id"sv << Project->Identifier << "root"sv << PathToUtf8(Project->RootDir);
+ Response << "id"sv << Project->Identifier;
+ Response << "root"sv << PathToUtf8(Project->RootDir);
+ Response << "engine"sv << PathToUtf8(Project->EngineRootDir);
+ Response << "project"sv << PathToUtf8(Project->ProjectRootDir);
+ Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath);
Response.BeginArray("oplogs"sv);
for (const std::string& OplogId : OpLogs)
@@ -2190,24 +2502,32 @@ HttpProjectService::HandleRequest(HttpServerRequest& Request)
namespace testutils {
using namespace std::literals;
- CbPackage CreateOplogPackage(const Oid& Id, const std::span<const CompressedBuffer>& Attachments)
+ std::string OidAsString(const Oid& Id)
+ {
+ StringBuilder<25> OidStringBuilder;
+ Id.ToString(OidStringBuilder);
+ return OidStringBuilder.ToString();
+ }
+
+ CbPackage CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
{
CbPackage Package;
CbObjectWriter Object;
- Object << "key"sv << Id;
+ Object << "key"sv << OidAsString(Id);
if (!Attachments.empty())
{
Object.BeginArray("bulkdata");
- for (const CompressedBuffer& Attachment : Attachments)
+ for (const auto& Attachment : Attachments)
{
+ CbAttachment Attach(Attachment.second, IoHash::FromBLAKE3(Attachment.second.GetRawHash()));
Object.BeginObject();
- Object << "id" << Oid::NewOid();
- Object << "type"
- << "Standard";
- Object << "data" << CbAttachment(Attachment);
+ Object << "id"sv << Attachment.first;
+ Object << "type"sv
+ << "Standard"sv;
+ Object << "data"sv << Attach;
Object.EndObject();
- Package.AddAttachment(CbAttachment(Attachment));
+ Package.AddAttachment(Attach);
}
Object.EndArray();
}
@@ -2215,9 +2535,9 @@ namespace testutils {
return Package;
};
- std::vector<CompressedBuffer> CreateAttachments(const std::span<const size_t>& Sizes)
+ std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes)
{
- std::vector<CompressedBuffer> Result;
+ std::vector<std::pair<Oid, CompressedBuffer>> Result;
Result.reserve(Sizes.size());
for (size_t Size : Sizes)
{
@@ -2227,12 +2547,28 @@ namespace testutils {
{
Data[Idx] = Idx % 255;
}
-
- Result.emplace_back(zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())));
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
+ Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
}
return Result;
}
+ uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset)
+ {
+ if (RawOffset > 0)
+ {
+ uint64 BlockSize = 0;
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
+ {
+ return 0;
+ }
+ return BlockSize > 0 ? RawOffset % BlockSize : 0;
+ }
+ return 0;
+ }
+
} // namespace testutils
TEST_CASE("project.store.create")
@@ -2391,6 +2727,91 @@ TEST_CASE("project.store.gc")
}
}
+TEST_CASE("project.store.partial.read")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ ScopedTemporaryDirectory TempDir;
+
+ GcManager Gc;
+ CidStore CidStore(Gc);
+ CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ CidStore.Initialize(CidConfig);
+
+ std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv;
+ ProjectStore ProjectStore(CidStore, BasePath, Gc);
+ std::filesystem::path RootDir = TempDir.Path() / "root"sv;
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv;
+
+ std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv;
+ std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv;
+ {
+ CreateDirectories(Project1FilePath.parent_path());
+ BasicFile ProjectFile;
+ ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate);
+ }
+
+ std::vector<Oid> OpIds;
+ OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()});
+ std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
+ {
+ Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ Project1RootDir.string(),
+ Project1FilePath.string()));
+ ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv);
+ CHECK(Oplog != nullptr);
+ Attachments[OpIds[0]] = {};
+ Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77});
+ Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99});
+ Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122});
+ for (auto It : Attachments)
+ {
+ Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second));
+ }
+ }
+ {
+ IoBuffer Chunk;
+ CHECK(ProjectStore.GetChunk(IoHash::FromBLAKE3(Attachments[OpIds[1]][0].second.GetRawHash()).ToHexString(),
+ HttpContentType::kCompressedBinary,
+ Chunk) == HttpResponseCode::OK);
+ CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ CHECK(Attachment.GetRawSize() == Attachments[OpIds[1]][0].second.GetRawSize());
+ }
+
+ IoBuffer ChunkResult;
+ CHECK(ProjectStore.GetChunk("proj1"sv,
+ "oplog1"sv,
+ OidAsString(Attachments[OpIds[2]][1].first),
+ 0,
+ ~0ull,
+ HttpContentType::kCompressedBinary,
+ ChunkResult) == HttpResponseCode::OK);
+ CHECK(ChunkResult);
+ CHECK(CompressedBuffer::FromCompressed(SharedBuffer(ChunkResult)).GetRawSize() == Attachments[OpIds[2]][1].second.GetRawSize());
+
+ IoBuffer PartialChunkResult;
+ CHECK(ProjectStore.GetChunk("proj1"sv,
+ "oplog1"sv,
+ OidAsString(Attachments[OpIds[2]][1].first),
+ 5,
+ 1773,
+ HttpContentType::kCompressedBinary,
+ PartialChunkResult) == HttpResponseCode::OK);
+ CHECK(PartialChunkResult);
+ CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult));
+ CHECK(PartialCompressedResult.GetRawSize() >= 1773);
+
+ uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5);
+ SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed);
+ SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress();
+ const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]);
+ const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData());
+ CHECK(FullDataPtr[0] == PartialDataPtr[0]);
+}
#endif
void
diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h
index 00fb613d9..c62e7840d 100644
--- a/zenserver/projectstore.h
+++ b/zenserver/projectstore.h
@@ -90,6 +90,8 @@ public:
*/
uint32_t AppendNewOplogEntry(CbPackage Op);
+ uint32_t AppendNewOplogEntry(CbObject Core);
+
enum UpdateType
{
kUpdateNewEntry,
@@ -222,6 +224,24 @@ public:
virtual void CollectGarbage(GcContext& GcCtx) override;
virtual GcStorageSize StorageSize() const override;
+ CbArray GetProjectsList();
+ HttpResponseCode GetProjectFiles(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ bool FilterClient,
+ CbObject& OutPayload);
+ HttpResponseCode GetChunkInfo(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ CbObject& OutPayload);
+ HttpResponseCode GetChunk(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view ChunkId,
+ uint64_t Offset,
+ uint64_t Size,
+ ZenContentType AcceptType,
+ IoBuffer& OutChunk);
+ HttpResponseCode GetChunk(const std::string_view Cid, ZenContentType AcceptType, IoBuffer& OutChunk);
+
private:
spdlog::logger& m_Log;
CidStore& m_CidStore;
diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp
index 22b06d9c4..988386726 100644
--- a/zenserver/upstream/hordecompute.cpp
+++ b/zenserver/upstream/hordecompute.cpp
@@ -999,7 +999,7 @@ namespace detail {
ApplyResult.TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
ApplyResult.TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize();
- CbAttachment Attachment(AttachmentBuffer);
+ CbAttachment Attachment(AttachmentBuffer, DecompressedId);
OutputPackage.AddAttachment(Attachment);
});
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index f9673bcd6..bc06653b9 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -253,7 +253,7 @@ namespace detail {
if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
{
- Package.AddAttachment(CbAttachment(Chunk));
+ Package.AddAttachment(CbAttachment(Chunk, AttachmentHash.AsHash()));
}
else
{
@@ -337,7 +337,7 @@ namespace detail {
{
if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response)))
{
- Package.AddAttachment(CbAttachment(Chunk));
+ Package.AddAttachment(CbAttachment(Chunk, AttachmentHash.AsHash()));
}
}
});
@@ -1254,7 +1254,7 @@ namespace detail {
{
if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value)))
{
- Package.AddAttachment(CbAttachment(AttachmentBuffer));
+ Package.AddAttachment(CbAttachment(AttachmentBuffer, IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())));
}
else
{
@@ -1312,8 +1312,9 @@ namespace detail {
}
BatchWriter.EndObject();
// Policy unspecified and expected to be Default
- BatchWriter.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Compressed.GetRawHash()));
- BatchPackage.AddAttachment(CbAttachment(Compressed));
+ IoHash Hash = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ BatchWriter.AddBinaryAttachment("RawHash"sv, Hash);
+ BatchPackage.AddAttachment(CbAttachment(Compressed, Hash));
}
BatchWriter.EndObject();
}
diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp
index f8fc41341..fdec78c60 100644
--- a/zenstore/cas.cpp
+++ b/zenstore/cas.cpp
@@ -47,7 +47,7 @@ public:
virtual ~CasImpl();
virtual void Initialize(const CidStoreConfiguration& InConfig) override;
- virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) override;
+ virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override;
virtual IoBuffer FindChunk(const IoHash& ChunkHash) override;
virtual bool ContainsChunk(const IoHash& ChunkHash) override;
virtual void FilterChunks(HashKeySet& InOutChunks) override;
@@ -191,7 +191,7 @@ CasImpl::UpdateManifest()
}
CasStore::InsertResult
-CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
+CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode)
{
ZEN_TRACE_CPU("CAS::InsertChunk");
@@ -208,7 +208,7 @@ CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
return m_SmallStrategy.InsertChunk(Chunk, ChunkHash);
}
- return m_LargeStrategy.InsertChunk(Chunk, ChunkHash);
+ return m_LargeStrategy.InsertChunk(Chunk, ChunkHash, Mode);
}
IoBuffer
diff --git a/zenstore/cas.h b/zenstore/cas.h
index 2ad160d28..9c48d4707 100644
--- a/zenstore/cas.h
+++ b/zenstore/cas.h
@@ -39,15 +39,21 @@ public:
bool New = false;
};
- virtual void Initialize(const CidStoreConfiguration& Config) = 0;
- virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash) = 0;
- virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
- virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
- virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
- virtual void Flush() = 0;
- virtual void Scrub(ScrubContext& Ctx) = 0;
- virtual void GarbageCollect(GcContext& GcCtx) = 0;
- virtual CidStoreSize TotalSize() const = 0;
+ enum class InsertMode
+ {
+ kCopyOnly,
+ kMayBeMovedInPlace
+ };
+
+ virtual void Initialize(const CidStoreConfiguration& Config) = 0;
+ virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0;
+ virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0;
+ virtual bool ContainsChunk(const IoHash& ChunkHash) = 0;
+ virtual void FilterChunks(HashKeySet& InOutChunks) = 0;
+ virtual void Flush() = 0;
+ virtual void Scrub(ScrubContext& Ctx) = 0;
+ virtual void GarbageCollect(GcContext& GcCtx) = 0;
+ virtual CidStoreSize TotalSize() const = 0;
protected:
CidStoreConfiguration m_Config;
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 5a079dbed..8b2797ce9 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -23,14 +23,14 @@ struct CidStore::Impl
void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); }
- CidStore::InsertResult AddChunk(const CompressedBuffer& ChunkData)
+ CidStore::InsertResult AddChunk(const CompressedBuffer& ChunkData, CidStore::InsertMode Mode)
{
const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash());
IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer();
Payload.SetContentType(ZenContentType::kCompressedBinary);
- CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, DecompressedId);
+ CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, DecompressedId, static_cast<CasStore::InsertMode>(Mode));
return {.New = Result.New};
}
@@ -78,9 +78,9 @@ CidStore::Initialize(const CidStoreConfiguration& Config)
}
CidStore::InsertResult
-CidStore::AddChunk(const CompressedBuffer& ChunkData)
+CidStore::AddChunk(const CompressedBuffer& ChunkData, InsertMode Mode)
{
- return m_Impl->AddChunk(ChunkData);
+ return m_Impl->AddChunk(ChunkData, Mode);
}
IoBuffer
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index d0a8ad849..9825f225a 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -124,7 +124,7 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN
}
CasStore::InsertResult
-FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
+FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode)
{
ZEN_ASSERT(m_IsInitialized);
@@ -132,6 +132,16 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary);
#endif
+ if (Mode == CasStore::InsertMode::kCopyOnly)
+ {
+ ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash);
+ if (std::filesystem::is_regular_file(Name.ShardedPath.ToPath()))
+ {
+ return {.New = false};
+ }
+ return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash);
+ }
+
// File-based chunks have special case handling whereby we move the file into
// place in the file store directory, thus avoiding unnecessary copying
@@ -153,6 +163,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
if (!Success)
{
+ // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed
+ // to delete it.
ZEN_WARN("Failed to flag temporary payload file '{}' for deletion: '{}'",
PathFromHandle(ChunkFileHandle),
GetLastErrorAsString());
@@ -294,6 +306,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
ChunkHash);
DeletePayloadFileOnClose();
+
#elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle);
std::filesystem::path DestPath = Name.ShardedPath.c_str();
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
index f14e5d057..de79b8b81 100644
--- a/zenstore/filecas.h
+++ b/zenstore/filecas.h
@@ -34,7 +34,9 @@ struct FileCasStrategy final : public GcStorage
void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore);
CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash);
- CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash);
+ CasStore::InsertResult InsertChunk(IoBuffer Chunk,
+ const IoHash& ChunkHash,
+ CasStore::InsertMode Mode = CasStore::InsertMode::kMayBeMovedInPlace);
IoBuffer FindChunk(const IoHash& ChunkHash);
bool HaveChunk(const IoHash& ChunkHash);
void FilterChunks(HashKeySet& InOutChunks);
diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h
index 21e3c3160..e8984a83d 100644
--- a/zenstore/include/zenstore/cidstore.h
+++ b/zenstore/include/zenstore/cidstore.h
@@ -63,9 +63,14 @@ public:
{
bool New = false;
};
+ enum class InsertMode
+ {
+ kCopyOnly,
+ kMayBeMovedInPlace
+ };
void Initialize(const CidStoreConfiguration& Config);
- InsertResult AddChunk(const CompressedBuffer& ChunkData);
+ InsertResult AddChunk(const CompressedBuffer& ChunkData, InsertMode Mode = InsertMode::kMayBeMovedInPlace);
IoBuffer FindChunkByCid(const IoHash& DecompressedId);
bool ContainsChunk(const IoHash& DecompressedId);
void FilterChunks(HashKeySet& InOutChunks);
diff --git a/zenutil/basicfile.cpp b/zenutil/basicfile.cpp
index fb77f4bd1..1e6043d7e 100644
--- a/zenutil/basicfile.cpp
+++ b/zenutil/basicfile.cpp
@@ -72,7 +72,7 @@ BasicFile::Open(const std::filesystem::path& FileName, Mode Mode, std::error_cod
break;
}
- const DWORD dwShareMode = FILE_SHARE_READ;
+ const DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE;
const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL;
HANDLE hTemplateFile = nullptr;
diff --git a/zenutil/cache/cacherequests.cpp b/zenutil/cache/cacherequests.cpp
index 0ac6c35ed..1ba2721b6 100644
--- a/zenutil/cache/cacherequests.cpp
+++ b/zenutil/cache/cacherequests.cpp
@@ -268,8 +268,9 @@ namespace cacherequests {
const CompressedBuffer& Buffer = Value.Body;
if (Buffer)
{
- Writer.AddBinaryAttachment("RawHash", IoHash::FromBLAKE3(Buffer.GetRawHash()));
- OutPackage.AddAttachment(CbAttachment(Buffer));
+ IoHash AttachmentHash = IoHash::FromBLAKE3(Buffer.GetRawHash());
+ Writer.AddBinaryAttachment("RawHash", AttachmentHash);
+ OutPackage.AddAttachment(CbAttachment(Buffer, AttachmentHash));
Writer.AddInteger("RawSize", Buffer.GetRawSize());
}
else
@@ -499,16 +500,17 @@ namespace cacherequests {
Writer.BeginArray("Values");
for (const GetCacheRecordResultValue& Value : RecordResult->Values)
{
+ IoHash AttachmentHash = Value.Body ? IoHash::FromBLAKE3(Value.Body.GetRawHash()) : Value.RawHash;
Writer.BeginObject();
{
Writer.AddObjectId("Id", Value.Id);
- Writer.AddHash("RawHash", Value.Body ? IoHash::FromBLAKE3(Value.Body.GetRawHash()) : Value.RawHash);
+ Writer.AddHash("RawHash", AttachmentHash);
Writer.AddInteger("RawSize", Value.Body ? Value.Body.GetRawSize() : Value.RawSize);
}
Writer.EndObject();
if (Value.Body)
{
- OutPackage.AddAttachment(CbAttachment(Value.Body));
+ OutPackage.AddAttachment(CbAttachment(Value.Body, AttachmentHash));
}
}
@@ -594,8 +596,9 @@ namespace cacherequests {
{
return false;
}
- Writer.AddBinaryAttachment("RawHash", IoHash::FromBLAKE3(ValueRequest.Body.GetRawHash()));
- OutPackage.AddAttachment(CbAttachment(ValueRequest.Body));
+ IoHash AttachmentHash = IoHash::FromBLAKE3(ValueRequest.Body.GetRawHash());
+ Writer.AddBinaryAttachment("RawHash", AttachmentHash);
+ OutPackage.AddAttachment(CbAttachment(ValueRequest.Body, AttachmentHash));
}
else if (ValueRequest.RawHash != IoHash::Zero)
{
@@ -797,7 +800,7 @@ namespace cacherequests {
ResponseObject.AddHash("RawHash", ValueResult.RawHash);
if (ValueResult.Body)
{
- OutPackage.AddAttachment(CbAttachment(ValueResult.Body));
+ OutPackage.AddAttachment(CbAttachment(ValueResult.Body, ValueResult.RawHash));
}
else
{