diff options
| author | Dan Engelbrecht <[email protected]> | 2022-11-18 11:35:13 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-11-18 02:35:13 -0800 |
| commit | 55225621f018904abf7e212320bb784dc64f8ac3 (patch) | |
| tree | 3fb962e9e0553448f9d42612bb078ff072308e1c | |
| parent | move BasicFile to zenutil to remove zenstore dependency from zen command (#190) (diff) | |
| download | zen-55225621f018904abf7e212320bb784dc64f8ac3.tar.xz zen-55225621f018904abf7e212320bb784dc64f8ac3.zip | |
Add `import-project` and `export-project` (#183)
* Add `import-project` and `export-project` command line parsing
29 files changed, 1807 insertions, 405 deletions
diff --git a/zen/cmds/exportproject.cpp b/zen/cmds/exportproject.cpp new file mode 100644 index 000000000..5d8c7d536 --- /dev/null +++ b/zen/cmds/exportproject.cpp @@ -0,0 +1,376 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "exportproject.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/stream.h> +#include <zencore/uid.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpcommon.h> +#include <zenhttp/httpshared.h> +#include <zenutil/basicfile.h> + +#include <memory> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace { + +void +EnsureDirectoryExists(const std::filesystem::path& Path) +{ + while (!std::filesystem::is_directory(Path)) + { + if (Path.has_parent_path()) + { + EnsureDirectoryExists(Path.parent_path()); + } + std::filesystem::create_directory(Path); + } +} + +} // namespace + +ExportProjectCommand::ExportProjectCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); + m_Options.add_option("", "t", "target", "Target path", cxxopts::value(m_TargetPath), "<targetpath>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>"); +} + +ExportProjectCommand::~ExportProjectCommand() = default; + +bool +ExportProjectCommand::IsSuccess(const cpr::Response& Response, const std::string_view Operation) +{ + if (!zen::IsHttpSuccessCode(Response.status_code)) + { + if (Response.status_code) + { + ZEN_ERROR("{} failed: {}: {} ({})", Operation, Response.status_code, Response.reason, Response.text); + } + else + { + ZEN_ERROR("{} failed: {}", Operation, Response.error.message); + } + + return false; + } + return true; +} + +int +ExportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + + ZEN_UNUSED(GlobalOptions); + + m_Options.parse_positional({"target", "project", "oplog"}); + m_Options.parse(argc, argv); + + if (m_ProjectName.empty()) + { + ZEN_ERROR("Project name must be given"); + return 1; + } + + if (m_TargetPath.empty()) + { + ZEN_ERROR("Target path must be given"); + return 1; + } + + if (!std::filesystem::exists(m_TargetPath)) + { + zen::CreateDirectories(m_TargetPath); + } + else if (!std::filesystem::is_directory(m_TargetPath)) + { + ZEN_ERROR("Target path '{}' is not a directory", m_TargetPath); + return 1; + } + + const std::string UrlBase = fmt::format("{}/prj", m_HostName); + + cpr::Session Session; + { + ZEN_INFO("Requesting project '{}' from '{}'", m_ProjectName, m_HostName); + + std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName); + Session.SetUrl({ProjectRequest}); + cpr::Response Response = Session.Get(); + if (!IsSuccess(Response, ProjectRequest)) + { + return 1; + } + zen::IoBuffer Payload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + zen::BasicFile ProjectStore; + ProjectStore.Open(GetProjectPath(m_TargetPath, m_ProjectName), zen::BasicFile::Mode::kTruncate); + ProjectStore.Write(Payload.GetView(), 0); + + if (m_OplogNames.empty()) + { + zen::CbObject Params = LoadCompactBinaryObject(Payload); + zen ::CbArrayView Oplogs = Params["oplogs"sv].AsArrayView(); + for (auto& OplogEntry : Oplogs) + { + std::string_view OpLog = OplogEntry.AsObjectView()["id"sv].AsString(); + m_OplogNames.push_back(std::string(OpLog)); + } + } + } + + std::unordered_set<zen::IoHash, zen::IoHash::Hasher> UniqueChunks; + std::vector<zen::CbAttachment> AllAttachments; + std::vector<zen::CbPackage> OplogResponses; + for (const std::string& OplogName : m_OplogNames) + { + ZEN_INFO("Requesting oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_HostName, m_TargetPath); + + std::string GetOplogArchiveRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName); + Session.SetUrl({GetOplogArchiveRequest}); + Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); + cpr::Response Response = Session.Get(); + if (!IsSuccess(Response, GetOplogArchiveRequest)) + { + return 1; + } + zen::IoBuffer CompressedPayload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + zen::IoBuffer Payload = zen::CompressedBuffer::FromCompressed(zen::SharedBuffer(CompressedPayload)).Decompress().AsIoBuffer(); + + OplogResponses.emplace_back(zen::ParsePackageMessage(Payload)); + zen::CbPackage& ResponsePackage = OplogResponses.back(); + zen::CbObject Result = ResponsePackage.GetObject(); + + zen::IoHash Checksum = Result["checksum"sv].AsHash(); + zen ::CbArrayView Entries = Result["entries"sv].AsArrayView(); + + ZEN_INFO("Exporting {} ops for oplog '{}/{}' with checksum '{}' to '{}'", + Entries.Num(), + m_ProjectName, + OplogName, + Checksum, + m_TargetPath); + { + zen::BasicFile OpStore; + OpStore.Open(GetOplogPath(m_TargetPath, OplogName), zen::BasicFile::Mode::kTruncate); + OplogHeader Header = {.OpCount = Entries.Num(), .Checksum = Checksum}; + OpStore.Write(&Header, sizeof(OplogHeader), 0); + std::vector<OplogEntry> OpEntries; + OpEntries.resize(Entries.Num()); + const uint64_t DataOffset = sizeof(OplogHeader) + OpEntries.size() * sizeof(OplogEntry); + uint64_t BulkOffset = DataOffset; + + zen::IoHashStream Hasher; + + for (uint64_t OpIndex = 0; auto& OpEntry : Entries) + { + zen::BinaryWriter Writer; + OpEntry.CopyTo(Writer); + zen::MemoryView OpView = Writer.GetView(); + Hasher.Append(OpView); + + OpEntries[OpIndex].Offset = BulkOffset; + OpEntries[OpIndex].OpLength = gsl::narrow<uint32_t>(OpView.GetSize()); + OpStore.Write(OpView, BulkOffset); + BulkOffset += OpView.GetSize(); + OpIndex++; + } + zen::IoHash CalculatedChecksum = Hasher.GetHash(); + if (CalculatedChecksum != Checksum) + { + ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); + return 1; + } + OpStore.Write(OpEntries.data(), OpEntries.size() * sizeof(OplogEntry), sizeof(OplogHeader)); + } + + std::span<const zen::CbAttachment> Attachments = ResponsePackage.GetAttachments(); + AllAttachments.reserve(AllAttachments.size() + Attachments.size()); + AllAttachments.reserve(UniqueChunks.size() + Attachments.size()); + for (const zen::CbAttachment& Attachment : Attachments) + { + if (UniqueChunks.insert(Attachment.GetHash()).second) + { + AllAttachments.push_back(Attachment); + } + } + + ZEN_INFO("Exported {} ops referencing {} chunks for {}", Entries.Num(), Attachments.size(), OplogName); + } + + size_t ChunkCount = AllAttachments.size(); + zen::BasicFile ChunkStoreIndex; + ChunkStoreIndex.Open(GetChunksIndexPath(m_TargetPath), zen::BasicFile::Mode::kTruncate); + ChunksHeader Header = {.ChunkCount = ChunkCount}; + ChunkStoreIndex.Write(&Header, sizeof(ChunksHeader), 0); + std::vector<ChunkEntry> ChunkEntries; + ChunkEntries.resize(ChunkCount); + uint64_t ChunkOffset = 0; + + zen::WorkerThreadPool WorkerPool(std::thread::hardware_concurrency()); + std::atomic_int64_t JobCount = 0; + std::vector<size_t> BlockChunkIndexes; + const size_t BlockSize = 1ull << Header.BlockSizeShift; + uint32_t CurrentBlockIndex = 0; + + auto WriteBlockAsync = [](const std::string& TargetPath, + size_t WriteBlockOffset, + uint32_t BlockIndex, + const std::vector<size_t>& BlockChunkIndexes, + const std::vector<ChunkEntry>& ChunkEntries, + const std::vector<zen::CbAttachment>& Attachments, + zen::WorkerThreadPool& WorkerPool, + std::atomic_int64_t& JobCount) { + JobCount.fetch_add(1); + WorkerPool.ScheduleWork([&TargetPath, WriteBlockOffset, BlockIndex, BlockChunkIndexes, &ChunkEntries, &Attachments, &JobCount]() { + zen::BasicFile ChunkBlock; + ChunkBlock.Open(GetChunksPath(TargetPath, BlockIndex), zen::BasicFile::Mode::kTruncate); + for (size_t ChunkIndex : BlockChunkIndexes) + { + const ChunkEntry& Chunk = ChunkEntries[ChunkIndex]; + zen::CompositeBuffer AttachmentBody = Attachments[ChunkIndex].AsCompressedBinary().GetCompressed(); + size_t AttachmentBulkOffset = Chunk.Offset - WriteBlockOffset; + for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments()) + { + size_t SegmentSize = Segment.GetSize(); + ChunkBlock.Write(Segment.GetData(), Segment.GetSize(), AttachmentBulkOffset); + AttachmentBulkOffset += SegmentSize; + } + } + JobCount.fetch_add(-1); + }); + }; + + ZEN_INFO("Exporting {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath); + for (size_t ChunkIndex = 0; const zen::CbAttachment& Attachment : AllAttachments) + { + ChunkEntry& Chunk = ChunkEntries[ChunkIndex]; + Chunk.ChunkHash = Attachment.GetHash(); + zen::CompositeBuffer AttachmentBody = Attachment.AsCompressedBinary().GetCompressed(); + Chunk.Length = AttachmentBody.GetSize(); + + if (Chunk.Length < 1 * 1024 * 1024) // Use reasonable length for file + { + uint32_t BlockIndex = gsl::narrow<uint32_t>((ChunkOffset + Chunk.Length) / BlockSize); + if (BlockIndex != CurrentBlockIndex) + { + size_t WriteBlockOffset = CurrentBlockIndex * BlockSize; + WriteBlockAsync(m_TargetPath, + WriteBlockOffset, + CurrentBlockIndex, + BlockChunkIndexes, + ChunkEntries, + AllAttachments, + WorkerPool, + JobCount); + + ChunkOffset = BlockIndex * BlockSize; + CurrentBlockIndex = BlockIndex; + BlockChunkIndexes.clear(); + } + + Chunk.Offset = ChunkOffset; + ChunkOffset = Chunk.Offset + Chunk.Length; + BlockChunkIndexes.push_back(ChunkIndex); + } + else + { + Chunk.Offset = ~0ull; + JobCount.fetch_add(1); + WorkerPool.ScheduleWork([this, AttachmentBody, &Chunk, &JobCount]() { + std::filesystem::path Path = GetLargeChunkPath(m_TargetPath, Chunk.ChunkHash); + EnsureDirectoryExists(Path.parent_path()); + zen::BasicFile ChunkFile; + ChunkFile.Open(Path, zen::BasicFile::Mode::kTruncate); + uint64_t Offset = 0; + for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments()) + { + size_t SegmentSize = Segment.GetSize(); + ChunkFile.Write(Segment.GetData(), Segment.GetSize(), Offset); + Offset += SegmentSize; + } + JobCount.fetch_add(-1); + }); + } + ChunkIndex++; + } + if (!BlockChunkIndexes.empty()) + { + size_t WriteBlockOffset = CurrentBlockIndex * BlockSize; + WriteBlockAsync(m_TargetPath, + WriteBlockOffset, + CurrentBlockIndex, + BlockChunkIndexes, + ChunkEntries, + AllAttachments, + WorkerPool, + JobCount); + } + + while (JobCount.load()) + { + zen::Sleep(1); + } + + ChunkStoreIndex.Write(ChunkEntries.data(), ChunkEntries.size() * sizeof(ChunkEntry), sizeof(ChunksHeader)); + + ZEN_INFO("Exported {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath); + + return 0; +} + +std::filesystem::path +ExportProjectCommand::GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog) +{ + return RootPath / (Oplog + ".ops"); +} + +std::filesystem::path +ExportProjectCommand::GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash) +{ + zen::ExtendablePathBuilder<128> ShardedPath; + ShardedPath.Append(RootPath.c_str()); + zen::ExtendableStringBuilder<64> HashString; + OpHash.ToHexString(HashString); + const char* str = HashString.c_str(); + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str, str + 3); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 3, str + 5); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 5, str + 40); + + return ShardedPath.ToPath(); +} + +std::filesystem::path +ExportProjectCommand::GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName) +{ + return RootPath / (std::string(ProjectName) + ".zcb"); +} + +std::filesystem::path +ExportProjectCommand::GetChunksIndexPath(const std::filesystem::path RootPath) +{ + return RootPath / "chunks.idx"; +} + +std::filesystem::path +ExportProjectCommand::GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex) +{ + return RootPath / fmt::format("chunks{}.bin", BlockIndex); +} diff --git a/zen/cmds/exportproject.h b/zen/cmds/exportproject.h new file mode 100644 index 000000000..bf41e2d7b --- /dev/null +++ b/zen/cmds/exportproject.h @@ -0,0 +1,79 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +#include <zencore/filesystem.h> +#include <zencore/iohash.h> + +#include <functional> + +namespace zen { +class CbObjectView; +} +namespace cpr { +class Response; +} + +class ExportProjectCommand : public ZenCmdBase +{ +public: + ExportProjectCommand(); + ~ExportProjectCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options* Options() override { return &m_Options; } + + struct OplogHeader + { + enum uint32 + { + kMagic = 0x7816'B013 + }; + uint32_t Magic = kMagic; + uint32_t HeaderSize = sizeof(OplogHeader); + uint64_t OpCount = 0; + zen::IoHash Checksum = zen::IoHash::Zero; + }; + struct OplogEntry + { + uint64_t Offset; + uint32_t OpLength; + }; + + struct ChunksHeader + { + enum uint32 + { + kMagic = 0x574C'B016 + }; + uint32_t Magic = kMagic; + uint64_t ChunkCount = 0; + uint8_t BlockSizeShift = 31u; + uint8_t Reserved1 = 0; + uint16_t Reserved2 = 0; + uint32_t Reserved3 = 0; + }; + struct ChunkEntry + { + zen::IoHash ChunkHash; + uint64_t Offset; + uint64_t Length; + }; + + static std::filesystem::path GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog); + static std::filesystem::path GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash); + static std::filesystem::path GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName); + static std::filesystem::path GetChunksIndexPath(const std::filesystem::path RootPath); + static std::filesystem::path GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex); + + static bool IsSuccess(const cpr::Response& Response, const std::string_view Operation); + +private: + cxxopts::Options m_Options{"export-project", "Export one or more project oplogs to disk"}; + std::string m_HostName; + std::string m_TargetPath; + std::string m_ProjectName; + std::vector<std::string> m_OplogNames; +}; diff --git a/zen/cmds/importproject.cpp b/zen/cmds/importproject.cpp new file mode 100644 index 000000000..d6c48cdeb --- /dev/null +++ b/zen/cmds/importproject.cpp @@ -0,0 +1,246 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "importproject.h" +#include "exportproject.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/stream.h> +#include <zenhttp/httpcommon.h> +#include <zenhttp/httpshared.h> +#include <zenutil/basicfile.h> + +#include <memory> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +ZEN_THIRD_PARTY_INCLUDES_END + +ImportProjectCommand::ImportProjectCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); + m_Options.add_option("", "s", "source", "Source path", cxxopts::value(m_SourcePath), "<sourcepath>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>"); +} + +ImportProjectCommand::~ImportProjectCommand() = default; + +int +ImportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + + ZEN_UNUSED(GlobalOptions); + + m_Options.parse_positional({"source", "project", "oplog"}); + m_Options.parse(argc, argv); + + if (m_ProjectName.empty()) + { + ZEN_ERROR("Project name must be given"); + return 1; + } + + if (m_SourcePath.empty()) + { + ZEN_ERROR("Source path must be given"); + return 1; + } + + if (!std::filesystem::is_directory(m_SourcePath)) + { + ZEN_ERROR("Source path '{}' is not a directory", m_SourcePath); + return 1; + } + + const std::string UrlBase = fmt::format("{}/prj", m_HostName); + + cpr::Session Session; + + { + ZEN_INFO("Requesting project '{}' from '{}'", m_ProjectName, m_HostName); + + zen::BasicFile ProjectStore; + ProjectStore.Open(ExportProjectCommand::GetProjectPath(m_SourcePath, m_ProjectName), zen::BasicFile::Mode::kRead); + zen::IoBuffer Payload = ProjectStore.ReadAll(); + + std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName); + Session.SetUrl({ProjectRequest}); + Session.SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()}); + cpr::Response Response = Session.Post(); + if (!ExportProjectCommand::IsSuccess(Response, ProjectRequest)) + { + return 1; + } + if (m_OplogNames.empty()) + { + zen::DirectoryContent Content; + zen::GetDirectoryContent(m_SourcePath, zen::DirectoryContent::IncludeFilesFlag, Content); + for (auto& File : Content.Files) + { + if (File.extension() == ".ops") + { + m_OplogNames.push_back(File.stem().string()); + } + } + } + } + + zen::IoBuffer ChunkStoreIndex = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksIndexPath(m_SourcePath)); + zen::IoBuffer ChunkStoreHeaderMem(ChunkStoreIndex, 0, sizeof(ExportProjectCommand::ChunksHeader)); + const ExportProjectCommand::ChunksHeader* ChunksHeader = + reinterpret_cast<const ExportProjectCommand::ChunksHeader*>(ChunkStoreHeaderMem.GetView().GetData()); + + if (ChunksHeader->Magic != ExportProjectCommand::ChunksHeader::kMagic) + { + ZEN_ERROR("Invalid chunk index header"); + return 1; + } + const size_t BlockSize = 1ull << ChunksHeader->BlockSizeShift; + + zen::IoBuffer ChunkStoreEntriesMem(ChunkStoreIndex, + sizeof(ExportProjectCommand::ChunksHeader), + sizeof(ExportProjectCommand::ChunkEntry) * ChunksHeader->ChunkCount); + const ExportProjectCommand::ChunkEntry* ChunkEntries = + reinterpret_cast<const ExportProjectCommand::ChunkEntry*>(ChunkStoreEntriesMem.GetView().GetData()); + + for (const std::string& OplogName : m_OplogNames) + { + ZEN_INFO("Importing oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_SourcePath, m_HostName); + std::string GetOplogRequest = fmt::format("{}/{}/oplog/{}", UrlBase, m_ProjectName, OplogName); + Session.SetUrl(GetOplogRequest); + cpr::Response OplogResponse = Session.Get(); + if (OplogResponse.status_code == static_cast<long>(zen::HttpResponseCode::NotFound)) + { + OplogResponse = Session.Post(); + if (!ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest)) + { + return 1; + } + ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest); + OplogResponse = Session.Get(); + if (!ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest)) + { + return 1; + } + } + + zen::BasicFile OpStore; + OpStore.Open(ExportProjectCommand::GetOplogPath(m_SourcePath, OplogName), zen::BasicFile::Mode::kRead); + ExportProjectCommand::OplogHeader OplogHeader; + OpStore.Read(&OplogHeader, sizeof(ExportProjectCommand::OplogHeader), 0); + if (OplogHeader.Magic != ExportProjectCommand::OplogHeader::kMagic || + OplogHeader.HeaderSize != sizeof(ExportProjectCommand::OplogHeader)) + { + ZEN_ERROR("Invalid oplog header"); + return 1; + } + zen::IoHash Checksum = OplogHeader.Checksum; + std::vector<ExportProjectCommand::OplogEntry> OpEntries; + OpEntries.resize(OplogHeader.OpCount); + OpStore.Read(OpEntries.data(), + sizeof(ExportProjectCommand::OplogEntry) * OplogHeader.OpCount, + sizeof(ExportProjectCommand::OplogHeader)); + + ZEN_INFO("Constructing oplog with {} ops with checksum '{}' for '{}/{}'", + OpEntries.size(), + OplogHeader.Checksum, + m_ProjectName, + OplogName); + zen::IoHashStream Hasher; + zen::CbObjectWriter Request; + Request.BeginArray("entries"sv); + for (auto& OpEntry : OpEntries) + { + zen::IoBuffer CoreData(OpEntry.OpLength); + OpStore.Read(CoreData.MutableData(), OpEntry.OpLength, OpEntry.Offset); + Hasher.Append(CoreData.GetView()); + zen::SharedBuffer SharedCoreData(CoreData); + + zen::CbObject Op(SharedCoreData); + Request << Op; + } + Request.EndArray(); + zen::IoHash CalculatedChecksum = Hasher.GetHash(); + if (CalculatedChecksum != Checksum) + { + ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); + return 1; + } + Request.AddHash("checksum"sv, Checksum); + + zen::CbPackage RequestPackage; + RequestPackage.SetObject(Request.Save()); + + ZEN_INFO("Assembling {} attachments", ChunksHeader->ChunkCount); + std::vector<zen::CbAttachment> Attachments; + Attachments.reserve(ChunksHeader->ChunkCount); + uint32_t ReadBlockIndex = 0; + zen::IoBuffer BlockStore = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksPath(m_SourcePath, ReadBlockIndex)); + for (uint64_t ChunkIndex = 0; ChunkIndex < ChunksHeader->ChunkCount; ++ChunkIndex) + { + const ExportProjectCommand::ChunkEntry& ChunkEntry = ChunkEntries[ChunkIndex]; + if (ChunkEntry.Offset == ~0ull) + { + zen::IoBuffer ChunkBuffer = + zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetLargeChunkPath(m_SourcePath, ChunkEntry.ChunkHash)); + zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); + ZEN_ASSERT(Chunk); + Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash)); + } + else + { + uint32_t BlockIndex = gsl::narrow<uint32_t>(ChunkEntry.Offset / BlockSize); + if (BlockIndex != ReadBlockIndex) + { + ReadBlockIndex = BlockIndex; + BlockStore = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksPath(m_SourcePath, ReadBlockIndex)); + ZEN_ASSERT(BlockStore); + } + size_t BlockOffset = BlockIndex * BlockSize; + size_t AttachmentBulkOffset = ChunkEntry.Offset - BlockOffset; + zen::IoBuffer ChunkBuffer(BlockStore, AttachmentBulkOffset, ChunkEntry.Length); + zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); + Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash)); + } + } + RequestPackage.AddAttachments(Attachments); + + ZEN_INFO("Sending oplog with {} ops and {} attachments for '{}/{}' to {}", + OpEntries.size(), + ChunksHeader->ChunkCount, + m_ProjectName, + OplogName, + m_HostName); + std::vector<zen::IoBuffer> RequestPayload = zen::FormatPackageMessage(RequestPackage, zen::FormatFlags::kAllowLocalReferences); + std::vector<zen::SharedBuffer> Parts; + Parts.reserve(RequestPayload.size()); + for (const auto& I : RequestPayload) + { + Parts.emplace_back(zen::SharedBuffer(I)); + } + zen::CompositeBuffer Cmp(std::move(Parts)); + zen::CompressedBuffer CompressedRequest = zen::CompressedBuffer::Compress(Cmp); + + std::string AppendOplogRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName); + Session.SetUrl(AppendOplogRequest); + + zen::IoBuffer TmpBuffer = CompressedRequest.GetCompressed().Flatten().AsIoBuffer(); + Session.SetBody(cpr::Body{(const char*)TmpBuffer.GetData(), TmpBuffer.GetSize()}); + cpr::Response Response = Session.Post(); + if (!ExportProjectCommand::IsSuccess(Response, AppendOplogRequest)) + { + return 1; + } + + ZEN_INFO("Imported {} ops and {} chunks", OpEntries.size(), ChunksHeader->ChunkCount); + } + return 0; +} diff --git a/zen/cmds/importproject.h b/zen/cmds/importproject.h new file mode 100644 index 000000000..326ccfe92 --- /dev/null +++ b/zen/cmds/importproject.h @@ -0,0 +1,22 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +class ImportProjectCommand : public ZenCmdBase +{ +public: + ImportProjectCommand(); + ~ImportProjectCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options* Options() override { return &m_Options; } + +private: + cxxopts::Options m_Options{"import-project", "Import project oplogs from disk"}; + std::string m_HostName; + std::string m_SourcePath; + std::string m_ProjectName; + std::vector<std::string> m_OplogNames; +}; diff --git a/zen/zen.cpp b/zen/zen.cpp index 5abe4b04e..a736d740e 100644 --- a/zen/zen.cpp +++ b/zen/zen.cpp @@ -9,7 +9,9 @@ #include "cmds/cache.h" #include "cmds/copy.h" #include "cmds/dedup.h" +#include "cmds/exportproject.h" #include "cmds/hash.h" +#include "cmds/importproject.h" #include "cmds/print.h" #include "cmds/run.h" #include "cmds/status.h" @@ -103,6 +105,7 @@ main(int argc, char** argv) #endif zen::logging::InitializeLogging(); + MaximizeOpenFileCount(); ////////////////////////////////////////////////////////////////////////// @@ -115,13 +118,15 @@ main(int argc, char** argv) #if ZEN_WITH_EXEC_SERVICES RunCommand RunCmd; #endif - StatusCommand StatusCmd; - TopCommand TopCmd; - PrintCommand PrintCmd; - PrintPackageCommand PrintPkgCmd; - PsCommand PsCmd; - UpCommand UpCmd; - DownCommand DownCmd; + StatusCommand StatusCmd; + TopCommand TopCmd; + PrintCommand PrintCmd; + PrintPackageCommand PrintPkgCmd; + PsCommand PsCmd; + UpCommand UpCmd; + DownCommand DownCmd; + ExportProjectCommand ExportProjectCmd; + ImportProjectCommand ImportProjectCmd; #if ZEN_WITH_TESTS RunTestsCommand RunTestsCmd; @@ -138,7 +143,9 @@ main(int argc, char** argv) {"copy", &CopyCmd, "Copy file(s)"}, {"dedup", &DedupCmd, "Dedup files"}, {"drop", &DropCmd, "Drop cache bucket(s)"}, + {"export-project", &ExportProjectCmd, "Export project store oplog"}, {"hash", &HashCmd, "Compute file hashes"}, + {"import-project", &ImportProjectCmd, "Import project store oplog"}, {"print", &PrintCmd, "Print compact binary object"}, {"printpackage", &PrintPkgCmd, "Print compact binary package"}, #if ZEN_WITH_EXEC_SERVICES diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp index 0e4a46fa1..0db9f02ea 100644 --- a/zencore/compactbinary.cpp +++ b/zencore/compactbinary.cpp @@ -839,10 +839,10 @@ CbFieldView::CopyTo(MutableMemoryView Buffer) const void CbFieldView::CopyTo(BinaryWriter& Ar) const { - const MemoryView Source = GetViewNoType(); + const MemoryView SourceView = GetViewNoType(); CbFieldType SerializedType = CbFieldTypeOps::GetSerializedType(Type); - Ar.Write(&SerializedType, sizeof(SerializedType)); - Ar.Write(Source.GetData(), Source.GetSize()); + const MemoryView TypeView(reinterpret_cast<const uint8_t*>(&SerializedType), sizeof(SerializedType)); + Ar.Write({TypeView, SourceView}); } MemoryView @@ -948,10 +948,10 @@ CbArrayView::CopyTo(MutableMemoryView Buffer) const void CbArrayView::CopyTo(BinaryWriter& Ar) const { - const MemoryView Source = GetPayloadView(); - CbFieldType SerializedType = CbFieldTypeOps::GetType(GetType()); - Ar.Write(&SerializedType, sizeof(SerializedType)); - Ar.Write(Source.GetData(), Source.GetSize()); + const MemoryView SourceView = GetPayloadView(); + CbFieldType SerializedType = CbFieldTypeOps::GetSerializedType(GetType()); + const MemoryView TypeView(reinterpret_cast<const uint8_t*>(&SerializedType), sizeof(SerializedType)); + Ar.Write({TypeView, SourceView}); } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -988,7 +988,7 @@ CbObjectView::VisitFields(ICbVisitor&) CbFieldView CbObjectView::FindView(const std::string_view Name) const { - for (const CbFieldView Field : *this) + for (const CbFieldView& Field : *this) { if (Name == Field.GetName()) { @@ -1001,7 +1001,7 @@ CbObjectView::FindView(const std::string_view Name) const CbFieldView CbObjectView::FindViewIgnoreCase(const std::string_view Name) const { - for (const CbFieldView Field : *this) + for (const CbFieldView& Field : *this) { if (Name == Field.GetName()) { @@ -1061,10 +1061,10 @@ CbObjectView::CopyTo(MutableMemoryView Buffer) const void CbObjectView::CopyTo(BinaryWriter& Ar) const { - const MemoryView Source = GetPayloadView(); - CbFieldType SerializedType = CbFieldTypeOps::GetType(GetType()); - Ar.Write(&SerializedType, sizeof(SerializedType)); - Ar.Write(Source.GetData(), Source.GetSize()); + const MemoryView SourceView = GetPayloadView(); + CbFieldType SerializedType = CbFieldTypeOps::GetSerializedType(GetType()); + const MemoryView TypeView(reinterpret_cast<const uint8_t*>(&SerializedType), sizeof(SerializedType)); + Ar.Write({TypeView, SourceView}); } ////////////////////////////////////////////////////////////////////////// diff --git a/zencore/compactbinarypackage.cpp b/zencore/compactbinarypackage.cpp index a25466bed..19675b9cf 100644 --- a/zencore/compactbinarypackage.cpp +++ b/zencore/compactbinarypackage.cpp @@ -11,7 +11,7 @@ namespace zen { /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -CbAttachment::CbAttachment(const CompressedBuffer& InValue) : CbAttachment(InValue.MakeOwned()) +CbAttachment::CbAttachment(const CompressedBuffer& InValue, const IoHash& Hash) : CbAttachment(InValue.MakeOwned(), Hash) { } @@ -19,36 +19,40 @@ CbAttachment::CbAttachment(const SharedBuffer& InValue) : CbAttachment(Composite { } -CbAttachment::CbAttachment(const SharedBuffer& InValue, [[maybe_unused]] const IoHash& InHash) -: CbAttachment(CompositeBuffer(InValue), InHash) +CbAttachment::CbAttachment(const SharedBuffer& InValue, const IoHash& InHash) : CbAttachment(CompositeBuffer(InValue), InHash) { } -CbAttachment::CbAttachment(const CompositeBuffer& InValue) : Value{std::in_place_type<BinaryValue>, InValue} +CbAttachment::CbAttachment(const CompositeBuffer& InValue) +: Hash(InValue.IsNull() ? IoHash::Zero : IoHash::HashBuffer(InValue)) +, Value(InValue) { - if (std::get<BinaryValue>(Value).Buffer.IsNull()) + if (std::get<CompositeBuffer>(Value).IsNull()) { Value.emplace<std::nullptr_t>(); } } -CbAttachment::CbAttachment(CompositeBuffer&& InValue) : Value{std::in_place_type<BinaryValue>, InValue} +CbAttachment::CbAttachment(CompositeBuffer&& InValue) +: Hash(InValue.IsNull() ? IoHash::Zero : IoHash::HashBuffer(InValue)) +, Value(std::move(InValue)) + { - if (std::get<BinaryValue>(Value).Buffer.IsNull()) + if (std::get<CompositeBuffer>(Value).IsNull()) { Value.emplace<std::nullptr_t>(); } } -CbAttachment::CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash) : Value{std::in_place_type<BinaryValue>, InValue, Hash} +CbAttachment::CbAttachment(CompositeBuffer&& InValue, const IoHash& InHash) : Hash(InHash), Value(InValue) { - if (std::get<BinaryValue>(Value).Buffer.IsNull()) + if (std::get<CompositeBuffer>(Value).IsNull()) { Value.emplace<std::nullptr_t>(); } } -CbAttachment::CbAttachment(CompressedBuffer&& InValue) : Value(std::in_place_type<CompressedBuffer>, InValue) +CbAttachment::CbAttachment(CompressedBuffer&& InValue, const IoHash& InHash) : Hash(InHash), Value(InValue) { if (std::get<CompressedBuffer>(Value).IsNull()) { @@ -61,11 +65,13 @@ CbAttachment::CbAttachment(const CbObject& InValue, const IoHash* const InHash) auto SetValue = [&](const CbObject& ValueToSet) { if (InHash) { - Value.emplace<CbObjectValue>(ValueToSet, *InHash); + Value.emplace<CbObject>(ValueToSet); + Hash = *InHash; } else { - Value.emplace<CbObjectValue>(ValueToSet, ValueToSet.GetHash()); + Value.emplace<CbObject>(ValueToSet); + Hash = ValueToSet.GetHash(); } }; @@ -94,7 +100,8 @@ CbAttachment::TryLoad(CbFieldIterator& Fields) if (const CbObjectView ObjectView = Fields.AsObjectView(); !Fields.HasError()) { // Is a null object or object not prefixed with a precomputed hash value - Value.emplace<CbObjectValue>(CbObject(ObjectView, Fields.GetOuterBuffer()), ObjectView.GetHash()); + Value.emplace<CbObject>(CbObject(ObjectView, Fields.GetOuterBuffer())); + Hash = ObjectView.GetHash(); ++Fields; } else if (const IoHash ObjectAttachmentHash = Fields.AsObjectAttachment(); !Fields.HasError()) @@ -106,7 +113,8 @@ CbAttachment::TryLoad(CbFieldIterator& Fields) { return false; } - Value.emplace<CbObjectValue>(CbObject(InnerObjectView, Fields.GetOuterBuffer()), ObjectAttachmentHash); + Value.emplace<CbObject>(CbObject(InnerObjectView, Fields.GetOuterBuffer())); + Hash = ObjectAttachmentHash; ++Fields; } else if (const IoHash BinaryAttachmentHash = Fields.AsBinaryAttachment(); !Fields.HasError()) @@ -118,7 +126,8 @@ CbAttachment::TryLoad(CbFieldIterator& Fields) { return false; } - Value.emplace<BinaryValue>(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer()), BinaryAttachmentHash); + Value.emplace<CompositeBuffer>(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer())); + Hash = BinaryAttachmentHash; ++Fields; } else if (MemoryView BinaryView = Fields.AsBinaryView(); !Fields.HasError()) @@ -126,14 +135,17 @@ CbAttachment::TryLoad(CbFieldIterator& Fields) if (BinaryView.GetSize() > 0) { // Is a compressed binary blob - Value.emplace<CompressedBuffer>( - CompressedBuffer::FromCompressed(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer())).MakeOwned()); + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer())).MakeOwned(); + Value.emplace<CompressedBuffer>(Compressed); + Hash = IoHash::FromBLAKE3(Compressed.GetRawHash()); ++Fields; } else { // Is an uncompressed empty binary blob - Value.emplace<BinaryValue>(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer()), IoHash::HashBuffer(nullptr, 0)); + Value.emplace<CompositeBuffer>(SharedBuffer::MakeView(BinaryView, Fields.GetOuterBuffer())); + Hash = IoHash::HashBuffer(nullptr, 0); ++Fields; } } @@ -179,7 +191,8 @@ TryLoad_ArchiveFieldIntoAttachment(CbAttachment& TargetAttachment, CbField&& Fie if (Buffer.GetSize() > 0) { // Is a compressed binary blob - TargetAttachment = CbAttachment(CompressedBuffer::FromCompressed(std::move(Buffer))); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(std::move(Buffer)); + TargetAttachment = CbAttachment(Compressed, IoHash::FromBLAKE3(Compressed.GetRawHash())); } else { @@ -205,25 +218,25 @@ CbAttachment::TryLoad(BinaryReader& Reader, BufferAllocator Allocator) void CbAttachment::Save(CbWriter& Writer) const { - if (const CbObjectValue* ObjValue = std::get_if<CbObjectValue>(&Value)) + if (const CbObject* Object = std::get_if<CbObject>(&Value)) { - if (ObjValue->Object) + if (*Object) { - Writer.AddObjectAttachment(ObjValue->Hash); + Writer.AddObjectAttachment(Hash); } - Writer.AddObject(ObjValue->Object); + Writer.AddObject(*Object); } - else if (const BinaryValue* BinValue = std::get_if<BinaryValue>(&Value)) + else if (const CompositeBuffer* Binary = std::get_if<CompositeBuffer>(&Value)) { - if (BinValue->Buffer.GetSize() > 0) + if (Binary->GetSize() > 0) { - Writer.AddBinaryAttachment(BinValue->Hash); + Writer.AddBinaryAttachment(Hash); } - Writer.AddBinary(BinValue->Buffer); + Writer.AddBinary(*Binary); } - else if (const CompressedBuffer* BufferValue = std::get_if<CompressedBuffer>(&Value)) + else if (const CompressedBuffer* Compressed = std::get_if<CompressedBuffer>(&Value)) { - Writer.AddBinary(BufferValue->GetCompressed()); + Writer.AddBinary(Compressed->GetCompressed()); } } @@ -244,7 +257,7 @@ CbAttachment::IsNull() const bool CbAttachment::IsBinary() const { - return std::holds_alternative<BinaryValue>(Value); + return std::holds_alternative<CompositeBuffer>(Value); } bool @@ -256,36 +269,21 @@ CbAttachment::IsCompressedBinary() const bool CbAttachment::IsObject() const { - return std::holds_alternative<CbObjectValue>(Value); + return std::holds_alternative<CbObject>(Value); } IoHash CbAttachment::GetHash() const { - if (const CompressedBuffer* Buffer = std::get_if<CompressedBuffer>(&Value)) - { - return IoHash::FromBLAKE3(Buffer->GetRawHash()); - } - - if (const BinaryValue* BinValue = std::get_if<BinaryValue>(&Value)) - { - return BinValue->Hash; - } - - if (const CbObjectValue* ObjectValue = std::get_if<CbObjectValue>(&Value)) - { - return ObjectValue->Hash; - } - - return IoHash::Zero; + return Hash; } CompositeBuffer CbAttachment::AsCompositeBinary() const { - if (const BinaryValue* BinValue = std::get_if<BinaryValue>(&Value)) + if (const CompositeBuffer* BinValue = std::get_if<CompositeBuffer>(&Value)) { - return BinValue->Buffer; + return *BinValue; } return CompositeBuffer::Null; @@ -294,9 +292,9 @@ CbAttachment::AsCompositeBinary() const SharedBuffer CbAttachment::AsBinary() const { - if (const BinaryValue* BinValue = std::get_if<BinaryValue>(&Value)) + if (const CompositeBuffer* BinValue = std::get_if<CompositeBuffer>(&Value)) { - return BinValue->Buffer.Flatten(); + return BinValue->Flatten(); } return {}; @@ -305,9 +303,9 @@ CbAttachment::AsBinary() const CompressedBuffer CbAttachment::AsCompressedBinary() const { - if (const CompressedBuffer* Buffer = std::get_if<CompressedBuffer>(&Value)) + if (const CompressedBuffer* CompValue = std::get_if<CompressedBuffer>(&Value)) { - return *Buffer; + return *CompValue; } return CompressedBuffer::Null; @@ -317,9 +315,9 @@ CbAttachment::AsCompressedBinary() const CbObject CbAttachment::AsObject() const { - if (const CbObjectValue* ObjectValue = std::get_if<CbObjectValue>(&Value)) + if (const CbObject* ObjectValue = std::get_if<CbObject>(&Value)) { - return ObjectValue->Object; + return *ObjectValue; } return {}; @@ -377,6 +375,19 @@ CbPackage::AddAttachment(const CbAttachment& Attachment, AttachmentResolver* Res } } +void +CbPackage::AddAttachments(std::span<const CbAttachment> InAttachments) +{ + if (InAttachments.empty()) + { + return; + } + // Assume we have no duplicates! + Attachments.insert(Attachments.end(), InAttachments.begin(), InAttachments.end()); + std::sort(Attachments.begin(), Attachments.end()); + ZEN_ASSERT_SLOW(std::unique(Attachments.begin(), Attachments.end()) == Attachments.end()); +} + int32_t CbPackage::RemoveAttachment(const IoHash& Hash) { @@ -710,7 +721,7 @@ namespace legacy { { return false; } - Package.AddAttachment(CbAttachment(Compressed)); + Package.AddAttachment(CbAttachment(Compressed, Hash)); } else { @@ -738,7 +749,7 @@ namespace legacy { { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(AttachmentData)) { - Package.AddAttachment(CbAttachment(Compressed)); + Package.AddAttachment(CbAttachment(Compressed, Hash)); } else { @@ -978,15 +989,15 @@ TEST_CASE("usonpackage.serialization") { using namespace std::literals; - const auto TestSaveLoadValidate = [&](const char* Test, const CbPackage& Package) { + const auto TestSaveLoadValidate = [&](const char* Test, CbPackage& InOutPackage) { ZEN_UNUSED(Test); CbWriter Writer; - Package.Save(Writer); + InOutPackage.Save(Writer); CbFieldIterator Fields = Writer.Save(); BinaryWriter MemStream; - Package.Save(MemStream); + InOutPackage.Save(MemStream); CHECK(MakeMemoryView(MemStream).EqualBytes(Fields.GetRangeBuffer().GetView())); CHECK(ValidateCompactBinaryRange(MakeMemoryView(MemStream), CbValidateMode::All) == CbValidateError::None); @@ -995,13 +1006,14 @@ TEST_CASE("usonpackage.serialization") CbPackage FromFields; FromFields.TryLoad(Fields); CHECK_FALSE(bool(Fields)); - CHECK(FromFields == Package); + CHECK(FromFields == InOutPackage); CbPackage FromArchive; BinaryReader ReadAr(MakeMemoryView(MemStream)); FromArchive.TryLoad(ReadAr); CHECK(ReadAr.CurrentOffset() == ReadAr.Size()); - CHECK(FromArchive == Package); + CHECK(FromArchive == InOutPackage); + InOutPackage = FromArchive; }; SUBCASE("Empty") @@ -1083,7 +1095,7 @@ TEST_CASE("usonpackage.serialization") const CbAttachment* const Object2Attachment = Package.FindAttachment(Object2.GetHash()); CHECK((Object1Attachment && Object1Attachment->AsObject().Equals(Object1))); - CHECK((Object2Attachment && Object2Attachment->AsBinary() == Object2.GetBuffer())); + CHECK((Object2Attachment && Object2Attachment->AsBinary().GetView().EqualBytes(Object2.GetBuffer().GetView()))); SharedBuffer Object1ClonedBuffer = SharedBuffer::Clone(Object1.GetOuterBuffer()); Package.AddAttachment(CbAttachment(Object1ClonedBuffer)); diff --git a/zencore/compositebuffer.cpp b/zencore/compositebuffer.cpp index 3190ca5ea..e4ca93cc2 100644 --- a/zencore/compositebuffer.cpp +++ b/zencore/compositebuffer.cpp @@ -124,6 +124,100 @@ CompositeBuffer::ViewOrCopyRange(uint64_t Offset, uint64_t Size, UniqueBuffer& C return View; } +CompositeBuffer::Iterator +CompositeBuffer::GetIterator(uint64_t Offset) const +{ + size_t SegmentCount = m_Segments.size(); + size_t SegmentIndex = 0; + while (SegmentIndex < SegmentCount) + { + size_t SegmentSize = m_Segments[SegmentIndex].GetSize(); + if (Offset < SegmentSize) + { + return {.SegmentIndex = SegmentIndex, .OffsetInSegment = Offset}; + } + Offset -= SegmentSize; + SegmentIndex++; + } + return {.SegmentIndex = ~0ull, .OffsetInSegment = ~0ull}; +} + +MemoryView +CompositeBuffer::ViewOrCopyRange(Iterator& It, uint64_t Size, UniqueBuffer& CopyBuffer) const +{ + MutableMemoryView WriteView; + size_t SegmentCount = m_Segments.size(); + ZEN_ASSERT(It.SegmentIndex < SegmentCount); + uint64_t SizeLeft = Size; + while (SizeLeft > 0 && It.SegmentIndex < SegmentCount) + { + const SharedBuffer& Segment = m_Segments[It.SegmentIndex]; + size_t SegmentSize = Segment.GetSize(); + if (Size == SizeLeft && Size <= (SegmentSize - It.OffsetInSegment)) + { + MemoryView View = Segment.GetView(); + View.RightChopInline(It.OffsetInSegment); + View.LeftInline(SizeLeft); + It.OffsetInSegment += SizeLeft; + ZEN_ASSERT_SLOW(It.OffsetInSegment <= SegmentSize); + if (It.OffsetInSegment == SegmentSize) + { + It.SegmentIndex++; + It.OffsetInSegment = 0; + } + return View; + } + if (WriteView.GetSize() == 0) + { + if (CopyBuffer.GetSize() < Size) + { + CopyBuffer = UniqueBuffer::Alloc(Size); + } + WriteView = CopyBuffer.GetMutableView(); + } + size_t CopySize = zen::Min(SegmentSize - It.OffsetInSegment, SizeLeft); + MemoryView ReadView = Segment.GetView(); + ReadView.RightChopInline(It.OffsetInSegment); + ReadView.LeftInline(CopySize); + WriteView = WriteView.CopyFrom(ReadView); + It.OffsetInSegment += CopySize; + ZEN_ASSERT_SLOW(It.OffsetInSegment <= SegmentSize); + if (It.OffsetInSegment == SegmentSize) + { + It.SegmentIndex++; + It.OffsetInSegment = 0; + } + SizeLeft -= CopySize; + } + return CopyBuffer.GetView().Left(Size - SizeLeft); +} + +void +CompositeBuffer::CopyTo(MutableMemoryView WriteView, Iterator& It) const +{ + size_t SizeLeft = WriteView.GetSize(); + size_t SegmentCount = m_Segments.size(); + ZEN_ASSERT(It.SegmentIndex < SegmentCount); + while (WriteView.GetSize() > 0 && It.SegmentIndex < SegmentCount) + { + const SharedBuffer& Segment = m_Segments[It.SegmentIndex]; + size_t SegmentSize = Segment.GetSize(); + size_t CopySize = zen::Min(SegmentSize - It.OffsetInSegment, SizeLeft); + MemoryView ReadView = Segment.GetView(); + ReadView.RightChopInline(It.OffsetInSegment); + ReadView.LeftInline(CopySize); + WriteView = WriteView.CopyFrom(ReadView); + It.OffsetInSegment += CopySize; + ZEN_ASSERT_SLOW(It.OffsetInSegment <= SegmentSize); + if (It.OffsetInSegment == SegmentSize) + { + It.SegmentIndex++; + It.OffsetInSegment = 0; + } + SizeLeft -= CopySize; + } +} + void CompositeBuffer::CopyTo(MutableMemoryView Target, uint64_t Offset) const { diff --git a/zencore/compress.cpp b/zencore/compress.cpp index 35a5acb3a..15cc5f6a7 100644 --- a/zencore/compress.cpp +++ b/zencore/compress.cpp @@ -65,7 +65,8 @@ struct BufferHeader BufferHeader Header; if (sizeof(BufferHeader) <= CompressedData.GetSize()) { - CompressedData.CopyTo(MakeMutableMemoryView(&Header, &Header + 1)); + CompositeBuffer::Iterator It; + CompressedData.CopyTo(MakeMutableMemoryView(&Header, &Header + 1), It); Header.ByteSwap(); } return Header; @@ -235,10 +236,13 @@ BlockEncoder::Compress(const CompositeBuffer& RawData, const uint64_t BlockSize) { UniqueBuffer RawBlockCopy; MutableMemoryView CompressedBlocksView = CompressedData.GetMutableView() + sizeof(BufferHeader) + MetaSize; + + CompositeBuffer::Iterator It = RawData.GetIterator(0); + for (uint64_t RawOffset = 0; RawOffset < RawSize;) { const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize); - const MemoryView RawBlock = RawData.ViewOrCopyRange(RawOffset, RawBlockSize, RawBlockCopy); + const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy); RawHash.Append(RawBlock); MutableMemoryView CompressedBlock = CompressedBlocksView; @@ -669,8 +673,10 @@ BufferHeader::IsValid(const CompositeBuffer& CompressedData) { if (const BaseDecoder* const Decoder = GetDecoder(Header.Method)) { - UniqueBuffer HeaderCopy; - const MemoryView HeaderView = CompressedData.ViewOrCopyRange(0, Decoder->GetHeaderSize(Header), HeaderCopy); + UniqueBuffer HeaderCopy = UniqueBuffer::Alloc(Decoder->GetHeaderSize(Header)); + CompositeBuffer::Iterator It; + CompressedData.CopyTo(HeaderCopy.GetMutableView(), It); + const MemoryView HeaderView = HeaderCopy.GetView(); if (Header.Crc32 == BufferHeader::CalculateCrc32(HeaderView)) { return true; @@ -851,6 +857,30 @@ CompressedBuffer::FromCompressed(SharedBuffer&& InCompressedData) return Local; } +CompressedBuffer +CompressedBuffer::FromCompressedNoValidate(IoBuffer&& InCompressedData) +{ + if (InCompressedData.GetSize() <= sizeof(detail::BufferHeader)) + { + return CompressedBuffer(); + } + CompressedBuffer Local; + Local.CompressedData = CompositeBuffer(SharedBuffer(std::move(InCompressedData))); + return Local; +} + +CompressedBuffer +CompressedBuffer::FromCompressedNoValidate(CompositeBuffer&& InCompressedData) +{ + if (InCompressedData.GetSize() <= sizeof(detail::BufferHeader)) + { + return CompressedBuffer(); + } + CompressedBuffer Local; + Local.CompressedData = std::move(InCompressedData); + return Local; +} + uint64_t CompressedBuffer::GetRawSize() const { @@ -927,7 +957,9 @@ CompressedBuffer::DecompressToComposite() const } bool -CompressedBuffer::TryGetCompressParameters(OodleCompressor& OutCompressor, OodleCompressionLevel& OutCompressionLevel) const +CompressedBuffer::TryGetCompressParameters(OodleCompressor& OutCompressor, + OodleCompressionLevel& OutCompressionLevel, + uint64_t& OutBlockSize) const { using namespace detail; if (CompressedData) @@ -937,10 +969,12 @@ CompressedBuffer::TryGetCompressParameters(OodleCompressor& OutCompressor, Oodle case CompressionMethod::None: OutCompressor = OodleCompressor::NotSet; OutCompressionLevel = OodleCompressionLevel::None; + OutBlockSize = 0; return true; case CompressionMethod::Oodle: OutCompressor = OodleCompressor(Header.Compressor); OutCompressionLevel = OodleCompressionLevel(Header.CompressionLevel); + OutBlockSize = uint64_t(1) << Header.BlockSizeExponent; return true; default: break; diff --git a/zencore/include/zencore/compactbinarypackage.h b/zencore/include/zencore/compactbinarypackage.h index c3e587f40..16f723edc 100644 --- a/zencore/include/zencore/compactbinarypackage.h +++ b/zencore/include/zencore/compactbinarypackage.h @@ -64,8 +64,8 @@ public: ZENCORE_API explicit CbAttachment(CompositeBuffer&& InValue, const IoHash& Hash); /** Construct a compressed binary attachment. Value is cloned if not owned. */ - ZENCORE_API explicit CbAttachment(const CompressedBuffer& InValue); - ZENCORE_API explicit CbAttachment(CompressedBuffer&& InValue); + ZENCORE_API explicit CbAttachment(const CompressedBuffer& InValue, const IoHash& Hash); + ZENCORE_API explicit CbAttachment(CompressedBuffer&& InValue, const IoHash& Hash); /** Reset this to a null attachment. */ inline void Reset() { *this = CbAttachment(); } @@ -133,28 +133,8 @@ public: private: ZENCORE_API CbAttachment(const CbObject& Value, const IoHash* Hash); - struct CbObjectValue - { - CbObject Object; - IoHash Hash; - - CbObjectValue(const CbObject& InObject, const IoHash& InHash) : Object(InObject), Hash(InHash) {} - CbObjectValue(CbObject&& InObject, const IoHash& InHash) : Object(std::move(InObject)), Hash(InHash) {} - }; - - struct BinaryValue - { - CompositeBuffer Buffer; - IoHash Hash; - - BinaryValue(const CompositeBuffer& InBuffer) : Buffer(InBuffer.MakeOwned()), Hash(IoHash::HashBuffer(InBuffer)) {} - BinaryValue(const CompositeBuffer& InBuffer, const IoHash& InHash) : Buffer(InBuffer.MakeOwned()), Hash(InHash) {} - BinaryValue(CompositeBuffer&& InBuffer) : Buffer(std::move(InBuffer)), Hash(IoHash::HashBuffer(Buffer)) {} - BinaryValue(CompositeBuffer&& InBuffer, const IoHash& InHash) : Buffer(std::move(InBuffer)), Hash(InHash) {} - BinaryValue(SharedBuffer&& InBuffer, const IoHash& InHash) : Buffer(std::move(InBuffer)), Hash(InHash) {} - }; - - std::variant<std::nullptr_t, CbObjectValue, BinaryValue, CompressedBuffer> Value; + IoHash Hash; + std::variant<std::nullptr_t, CbObject, CompositeBuffer, CompressedBuffer> Value; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -300,6 +280,8 @@ public: /** Add the attachment to this package, along with any references that can be resolved. */ inline void AddAttachment(const CbAttachment& Attachment, AttachmentResolver Resolver) { AddAttachment(Attachment, &Resolver); } + void AddAttachments(std::span<const CbAttachment> Attachments); + /** * Remove an attachment by hash. * diff --git a/zencore/include/zencore/compositebuffer.h b/zencore/include/zencore/compositebuffer.h index 4a3b60428..4e4b4d002 100644 --- a/zencore/include/zencore/compositebuffer.h +++ b/zencore/include/zencore/compositebuffer.h @@ -99,6 +99,15 @@ public: uint64_t Size, std::function<void(MemoryView View, const SharedBuffer& ViewOuter)> Visitor) const; + struct Iterator + { + size_t SegmentIndex = 0; + uint64_t OffsetInSegment = 0; + }; + ZENCORE_API Iterator GetIterator(uint64_t Offset) const; + ZENCORE_API MemoryView ViewOrCopyRange(Iterator& It, uint64_t Size, UniqueBuffer& CopyBuffer) const; + ZENCORE_API void CopyTo(MutableMemoryView Target, Iterator& It) const; + /** A null composite buffer. */ static const CompositeBuffer Null; diff --git a/zencore/include/zencore/compress.h b/zencore/include/zencore/compress.h index d37ecfa79..92dc1fb76 100644 --- a/zencore/include/zencore/compress.h +++ b/zencore/include/zencore/compress.h @@ -75,6 +75,8 @@ public: [[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressed(CompositeBuffer&& CompressedData); [[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressed(const SharedBuffer& CompressedData); [[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressed(SharedBuffer&& CompressedData); + [[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(IoBuffer&& CompressedData); + [[nodiscard]] ZENCORE_API static CompressedBuffer FromCompressedNoValidate(CompositeBuffer&& CompressedData); /** Reset this to null. */ inline void Reset() { CompressedData.Reset(); } @@ -89,8 +91,8 @@ public: [[nodiscard]] inline bool IsOwned() const { return CompressedData.IsOwned(); } /** Returns a copy of the compressed buffer that owns its underlying memory. */ - [[nodiscard]] inline CompressedBuffer MakeOwned() const& { return FromCompressed(CompressedData.MakeOwned()); } - [[nodiscard]] inline CompressedBuffer MakeOwned() && { return FromCompressed(std::move(CompressedData).MakeOwned()); } + [[nodiscard]] inline CompressedBuffer MakeOwned() const& { return FromCompressedNoValidate(CompressedData.MakeOwned()); } + [[nodiscard]] inline CompressedBuffer MakeOwned() && { return FromCompressedNoValidate(std::move(CompressedData).MakeOwned()); } /** Returns a composite buffer containing the compressed data. May be null. May not be owned. */ [[nodiscard]] inline const CompositeBuffer& GetCompressed() const& { return CompressedData; } @@ -117,7 +119,8 @@ public: * @return True if parameters were written, otherwise false. */ [[nodiscard]] ZENCORE_API bool TryGetCompressParameters(OodleCompressor& OutCompressor, - OodleCompressionLevel& OutCompressionLevel) const; + OodleCompressionLevel& OutCompressionLevel, + uint64_t& OutBlockSize) const; /** * Decompress into a memory view that is less or equal GetRawSize() bytes. diff --git a/zencore/include/zencore/stream.h b/zencore/include/zencore/stream.h index ec303e1f8..9e4996249 100644 --- a/zencore/include/zencore/stream.h +++ b/zencore/include/zencore/stream.h @@ -28,6 +28,7 @@ public: } inline void Write(MemoryView Memory) { Write(Memory.GetData(), Memory.GetSize()); } + void Write(std::initializer_list<const MemoryView> Buffers); inline uint64_t CurrentOffset() const { return m_Offset; } @@ -41,7 +42,6 @@ public: inline MutableMemoryView GetMutableView() { return MutableMemoryView(m_Buffer.data(), m_Offset); } private: - RwLock m_Lock; std::vector<uint8_t> m_Buffer; uint64_t m_Offset = 0; diff --git a/zencore/stream.cpp b/zencore/stream.cpp index 8faf90af2..3402e51be 100644 --- a/zencore/stream.cpp +++ b/zencore/stream.cpp @@ -11,10 +11,28 @@ namespace zen { void -BinaryWriter::Write(const void* data, size_t ByteCount, uint64_t Offset) +BinaryWriter::Write(std::initializer_list<const MemoryView> Buffers) { - RwLock::ExclusiveLockScope _(m_Lock); + size_t TotalByteCount = 0; + for (const MemoryView& View : Buffers) + { + TotalByteCount += View.GetSize(); + } + const size_t NeedEnd = m_Offset + TotalByteCount; + if (NeedEnd > m_Buffer.size()) + { + m_Buffer.resize(NeedEnd); + } + for (const MemoryView& View : Buffers) + { + memcpy(m_Buffer.data() + m_Offset, View.GetData(), View.GetSize()); + m_Offset += View.GetSize(); + } +} +void +BinaryWriter::Write(const void* data, size_t ByteCount, uint64_t Offset) +{ const size_t NeedEnd = Offset + ByteCount; if (NeedEnd > m_Buffer.size()) @@ -28,8 +46,6 @@ BinaryWriter::Write(const void* data, size_t ByteCount, uint64_t Offset) void BinaryWriter::Reset() { - RwLock::ExclusiveLockScope _(m_Lock); - m_Buffer.clear(); m_Offset = 0; } @@ -41,6 +57,18 @@ BinaryWriter::Reset() #if ZEN_WITH_TESTS +TEST_CASE("binary.writer.span") +{ + BinaryWriter Writer; + const MemoryView View1("apa", 3); + const MemoryView View2(" ", 1); + const MemoryView View3("banan", 5); + Writer.Write({View1, View2, View3}); + MemoryView Result = Writer.GetView(); + CHECK(Result.GetSize() == 9); + CHECK(memcmp(Result.GetData(), "apa banan", 9) == 0); +} + void stream_forcelink() { diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index a7dca5441..e2f061a87 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -17,6 +17,10 @@ #include <span> #include <vector> +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + namespace zen { std::vector<IoBuffer> @@ -93,10 +97,12 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) ResponseBuffers.push_back(std::move(RefBuffer)); }; - auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, - bool DenyPartialLocalReferences, - CbAttachmentReferenceHeader& LocalRef, - std::string& Path8) -> bool { + tsl::robin_map<void*, std::string> FileNameMap; + + auto IsLocalRef = [&FileNameMap](const CompositeBuffer& AttachmentBinary, + bool DenyPartialLocalReferences, + CbAttachmentReferenceHeader& LocalRef, + std::string& Path8) -> bool { const SharedBuffer& Segment = AttachmentBinary.GetSegments().front(); IoBufferFileReference Ref; const IoBuffer& SegmentBuffer = Segment.AsIoBuffer(); @@ -111,9 +117,17 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) return false; } - ExtendablePathBuilder<256> LocalRefFile; - LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle))); - Path8 = LocalRefFile.ToUtf8(); + if (auto It = FileNameMap.find(Ref.FileHandle); It != FileNameMap.end()) + { + Path8 = It->second; + } + else + { + ExtendablePathBuilder<256> LocalRefFile; + LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle))); + Path8 = LocalRefFile.ToUtf8(); + FileNameMap.insert_or_assign(Ref.FileHandle, Path8); + } LocalRef.AbsolutePathLength = gsl::narrow<uint16_t>(Path8.size()); LocalRef.PayloadByteOffset = Ref.FileChunkOffset; @@ -131,7 +145,7 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary()) { CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); - IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); + IoHash AttachmentHash = Attachment.GetHash(); // If the data is either not backed by a file, or there are multiple // fragments then we cannot marshal it by local reference. We might @@ -158,7 +172,7 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) { *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), .Flags = CbAttachmentEntry::kIsCompressed, - .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; + .AttachmentHash = AttachmentHash}; for (const SharedBuffer& Segment : Compressed.GetSegments()) { @@ -177,7 +191,7 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) } else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary()) { - IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); + IoHash AttachmentHash = Attachment.GetHash(); bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1); bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences); @@ -211,6 +225,7 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) ZEN_NOT_IMPLEMENTED("Unknown attachment kind"); } } + FileNameMap.clear(); return ResponseBuffers; } @@ -262,6 +277,11 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint CbPackage Package; + std::vector<CbAttachment> Attachments; + Attachments.reserve(ChunkCount); // Guessing here... + + tsl::robin_map<std::string, IoBuffer> PartialFileBuffers; + for (uint32_t i = 0; i < ChunkCount; ++i) { const CbAttachmentEntry& Entry = AttachmentEntries[i]; @@ -283,23 +303,18 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint std::filesystem::path Path{std::u8string_view(PathPointer, AttachRefHdr->AbsolutePathLength)}; - if (IoBuffer ChunkReference = - IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)) + IoBuffer FullFileBuffer; + if (auto It = PartialFileBuffers.find(Path.string()); It != PartialFileBuffers.end()) { - CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference))); - if (!CompBuf) - { - throw std::runtime_error(fmt::format("invalid format for chunk #{} at '{}' (offset {}, size {})", - i, - PathToUtf8(Path), - AttachRefHdr->PayloadByteOffset, - AttachRefHdr->PayloadByteSize)); - } - CbAttachment Attachment(std::move(CompBuf)); - Package.AddAttachment(Attachment); + FullFileBuffer = It->second; } else { + FullFileBuffer = PartialFileBuffers.insert_or_assign(Path.string(), IoBufferBuilder::MakeFromFile(Path)).first->second; + } + + if (!FullFileBuffer) + { // Unable to open chunk reference throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})", i, @@ -307,6 +322,21 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)); } + + IoBuffer ChunkReference = AttachRefHdr->PayloadByteOffset == 0 && AttachRefHdr->PayloadByteSize == FullFileBuffer.GetSize() + ? FullFileBuffer + : IoBuffer(FullFileBuffer, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize); + + CompressedBuffer CompBuf(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkReference))); + if (!CompBuf) + { + throw std::runtime_error(fmt::format("invalid format for chunk #{} at '{}' (offset {}, size {})", + i, + PathToUtf8(Path), + AttachRefHdr->PayloadByteOffset, + AttachRefHdr->PayloadByteSize)); + } + Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash)); } else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { @@ -341,8 +371,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint throw std::runtime_error(fmt::format("invalid format for chunk #{} expected compressed buffer for attachment", i)); } - CbAttachment Attachment(std::move(CompBuf)); - Package.AddAttachment(Attachment); + Attachments.emplace_back(CbAttachment(std::move(CompBuf), Entry.AttachmentHash)); } } else /* not compressed */ @@ -367,10 +396,13 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint AttachmentBufferCopy.GetMutableView().CopyFrom(AttachmentBuffer.GetView()); CbAttachment Attachment(SharedBuffer{AttachmentBufferCopy}); - Package.AddAttachment(Attachment); + Attachments.emplace_back(SharedBuffer{AttachmentBufferCopy}); } } } + PartialFileBuffers.clear(); + + Package.AddAttachments(Attachments); return Package; } @@ -517,12 +549,13 @@ CbPackageReader::Finalize() if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { - m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)))); + m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)), Entry.AttachmentHash)); } else { - m_Attachments.push_back(CbAttachment( - CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None))); + CompressedBuffer Compressed = + CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None); + m_Attachments.push_back(CbAttachment(std::move(Compressed), IoHash::FromBLAKE3(Compressed.GetRawHash()))); } } diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 82b770e3c..1d0596c29 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -523,7 +523,8 @@ TEST_CASE("project.basic") { uint8_t AttachData[] = {1, 2, 3}; - zen::CbAttachment Attach{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}))}; + zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3})); + zen::CbAttachment Attach{Attachment, IoHash::FromBLAKE3(Attachment.GetRawHash())}; zen::CbObjectWriter OpWriter; OpWriter << "key" @@ -773,7 +774,7 @@ TEST_CASE("zcache.cbpackage") zen::CbPackage Package; Package.SetObject(Obj.Save().AsObject()); - Package.AddAttachment(zen::CbAttachment(CompressedData)); + Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); return Package; }; @@ -999,7 +1000,7 @@ TEST_CASE("zcache.policy") zen::CbPackage Package; Package.SetObject(CacheRecord); - Package.AddAttachment(zen::CbAttachment(CompressedData)); + Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey)); return Package; }; @@ -2655,12 +2656,14 @@ TEST_CASE("http.package") static const uint8_t Data1[] = {0, 1, 2, 3}; static const uint8_t Data2[] = {0, 1, 2, 3, 4, 5, 6, 7, 8}; - zen::CbAttachment Attach1{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data1, 4}), - zen::OodleCompressor::NotSet, - zen::OodleCompressionLevel::None)}; - zen::CbAttachment Attach2{zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data2, 8}), - zen::OodleCompressor::NotSet, - zen::OodleCompressionLevel::None)}; + zen::CompressedBuffer AttachmentData1 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data1, 4}), + zen::OodleCompressor::NotSet, + zen::OodleCompressionLevel::None); + zen::CbAttachment Attach1{AttachmentData1, IoHash::FromBLAKE3(AttachmentData1.GetRawHash())}; + zen::CompressedBuffer AttachmentData2 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data2, 8}), + zen::OodleCompressor::NotSet, + zen::OodleCompressionLevel::None); + zen::CbAttachment Attach2{AttachmentData2, IoHash::FromBLAKE3(AttachmentData2.GetRawHash())}; zen::CbObjectWriter Writer; diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index c5beef4b3..dfb69c0fe 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -530,7 +530,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + Package.AddAttachment( + CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), AttachmentHash.AsHash())); } else { @@ -681,7 +682,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + Package.AddAttachment( + CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), HashView.AsHash())); Count.Valid++; } } @@ -1605,7 +1607,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt { if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload) { - ResponsePackage.AddAttachment(CbAttachment(Value.Payload)); + ResponsePackage.AddAttachment(CbAttachment(Value.Payload, IoHash::FromBLAKE3(Value.Payload.GetRawHash()))); } } @@ -1959,10 +1961,11 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http const CompressedBuffer& Result = Request.Result; if (Result) { - ResponseObject.AddHash("RawHash"sv, IoHash::FromBLAKE3(Result.GetRawHash())); + IoHash Hash = IoHash::FromBLAKE3(Result.GetRawHash()); + ResponseObject.AddHash("RawHash"sv, Hash); if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData)) { - RpcResponse.AddAttachment(CbAttachment(Result)); + RpcResponse.AddAttachment(CbAttachment(Result, Hash)); } else { @@ -2475,7 +2478,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept Writer.AddHash("RawHash"sv, Request.Key->ChunkId); if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { - RpcResponse.AddAttachment(CbAttachment(Request.Value)); + RpcResponse.AddAttachment(CbAttachment(Request.Value, Request.Key->ChunkId)); } else { diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index d46b312d3..6940583d1 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -5,6 +5,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> +#include <zencore/compactbinaryvalue.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> @@ -15,6 +16,8 @@ #include <zencore/testutils.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpshared.h> #include <zenstore/caslog.h> #include <zenstore/scrubcontext.h> #include <zenutil/basicfile.h> @@ -61,7 +64,7 @@ namespace { // We can't move our folder, probably because it is busy, bail.. return false; } - zen::Sleep(100); + Sleep(100); } while (true); } } // namespace @@ -77,7 +80,7 @@ OpKeyStringAsOId(std::string_view OpKey) Writer << "key"sv << OpKey; XXH3_128Stream KeyHasher; - Writer.Save()["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + Writer.Save()["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash = KeyHasher.GetHash(); Oid OpId; @@ -142,7 +145,7 @@ struct ProjectStore::OplogStorage : public RefCounted uint64_t InvalidEntries = 0; m_Oplog.Replay( - [&](const zen::OplogEntry& LogEntry) { + [&](const OplogEntry& LogEntry) { if (LogEntry.OpCoreSize == 0) { ++InvalidEntries; @@ -209,6 +212,8 @@ struct ProjectStore::OplogStorage : public RefCounted { ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp"); + using namespace std::literals; + SharedBuffer Buffer = Op.GetBuffer(); const uint64_t WriteSize = Buffer.GetSize(); const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); @@ -216,7 +221,7 @@ struct ProjectStore::OplogStorage : public RefCounted ZEN_ASSERT(WriteSize != 0); XXH3_128Stream KeyHasher; - Op["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); XXH3_128 KeyHash = KeyHasher.GetHash(); RwLock::ExclusiveLockScope _(m_RwLock); @@ -275,7 +280,7 @@ ProjectStore::Oplog::Oplog(std::string_view Id, Project* Project, CidStore& Stor m_TempPath = m_BasePath / "temp"sv; - zen::CleanDirectory(m_TempPath); + CleanDirectory(m_TempPath); } ProjectStore::Oplog::~Oplog() @@ -476,6 +481,11 @@ ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, return false; } + if (Hash != IoHash::Zero) + { + m_ChunkMap.insert_or_assign(FileId, Hash); + } + FileMapEntry Entry; Entry.ServerPath = ServerPath; Entry.ClientPath = ClientPath; @@ -599,9 +609,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) using namespace std::literals; - const CbObject& Core = OpPackage.GetObject(); - const OplogEntry OpEntry = m_Storage->AppendOp(Core); - // Persist attachments uint64_t AttachmentBytes = 0; @@ -624,12 +631,25 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) AttachmentBytes += AttachmentSize; } - const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry); + const CbObject& Core = OpPackage.GetObject(); + const uint32_t EntryId = AppendNewOplogEntry(Core); + + ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); + + return EntryId; +} + +uint32_t +ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) +{ + ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", - EntryId, - zen::NiceBytes(NewAttachmentBytes), - zen::NiceBytes(AttachmentBytes)); + ZEN_ASSERT(m_Storage); + + using namespace std::literals; + + const OplogEntry OpEntry = m_Storage->AppendOp(Core); + const uint32_t EntryId = RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry); return EntryId; } @@ -672,11 +692,11 @@ ProjectStore::Project::Read() { CbObject Cfg = LoadCompactBinaryObject(Obj); - Identifier = Cfg["id"].AsString(); - RootDir = Cfg["root"].AsString(); - ProjectRootDir = Cfg["project"].AsString(); - EngineRootDir = Cfg["engine"].AsString(); - ProjectFilePath = Cfg["projectfile"].AsString(); + Identifier = Cfg["id"sv].AsString(); + RootDir = Cfg["root"sv].AsString(); + ProjectRootDir = Cfg["project"sv].AsString(); + EngineRootDir = Cfg["engine"sv].AsString(); + ProjectFilePath = Cfg["projectfile"sv].AsString(); } else { @@ -930,7 +950,7 @@ ProjectStore::Project::IsExpired() const ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcManager& Gc) : GcStorage(Gc) , GcContributor(Gc) -, m_Log(zen::logging::Get("project")) +, m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectBasePath(BasePath) { @@ -1245,6 +1265,251 @@ ProjectStore::Exists(std::string_view ProjectId) return Project::Exists(BasePathForProject(ProjectId)); } +CbArray +ProjectStore::GetProjectsList() +{ + using namespace std::literals; + + DiscoverProjects(); + + CbWriter Response; + Response.BeginArray(); + + IterateProjects([&Response](ProjectStore::Project& Prj) { + Response.BeginObject(); + Response << "Id"sv << Prj.Identifier; + Response << "RootDir"sv << Prj.RootDir.string(); + Response << "ProjectRootDir"sv << Prj.ProjectRootDir; + Response << "EngineRootDir"sv << Prj.EngineRootDir; + Response << "ProjectFilePath"sv << Prj.ProjectFilePath; + Response.EndObject(); + }); + Response.EndArray(); + return Response.Save().AsArray(); +} + +HttpResponseCode +ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, bool FilterClient, CbObject& OutPayload) +{ + using namespace std::literals; + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + ZEN_INFO("Project file request for unknown project '{}'", ProjectId); + return HttpResponseCode::NotFound; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + ZEN_INFO("Project file for unknown oplog '{}/{}'", ProjectId, OplogId); + return HttpResponseCode::NotFound; + } + + CbObjectWriter Response; + Response.BeginArray("files"sv); + + FoundLog->IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { + Response.BeginObject(); + Response << "id"sv << Id; + Response << "clientpath"sv << ClientPath; + if (!FilterClient) + { + Response << "serverpath"sv << ServerPath; + } + Response.EndObject(); + }); + + Response.EndArray(); + OutPayload = Response.Save(); + return HttpResponseCode::OK; +} + +HttpResponseCode +ProjectStore::GetChunkInfo(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + CbObject& OutPayload) +{ + using namespace std::literals; + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + ZEN_INFO("Chunk info request for unknown project '{}'", ProjectId); + return HttpResponseCode::NotFound; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + ZEN_INFO("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId); + return HttpResponseCode::NotFound; + } + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + ZEN_INFO("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId); + return HttpResponseCode::BadRequest; + } + + const Oid Obj = Oid::FromHexString(ChunkId); + + IoBuffer Chunk = FoundLog->FindChunk(Obj); + if (!Chunk) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpResponseCode::NotFound; + } + + uint64_t ChunkSize = Chunk.GetSize(); + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + ZEN_ASSERT(!Compressed.IsNull()); + ChunkSize = Compressed.GetRawSize(); + } + + CbObjectWriter Response; + Response << "size"sv << ChunkSize; + OutPayload = Response.Save(); + return HttpResponseCode::OK; +} + +HttpResponseCode +ProjectStore::GetChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + IoBuffer& OutChunk) +{ + bool IsOffset = Offset != 0 || Size != ~(0ull); + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + ZEN_INFO("Chunk request for unknown project '{}'", ProjectId); + return HttpResponseCode::NotFound; + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + ZEN_INFO("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId); + return HttpResponseCode::NotFound; + } + + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + ZEN_INFO("Chunk request for invalid chunk id '{}/{}/{}'", ProjectId, OplogId, ChunkId); + return HttpResponseCode::BadRequest; + } + + const Oid Obj = Oid::FromHexString(ChunkId); + + IoBuffer Chunk = FoundLog->FindChunk(Obj); + if (!Chunk) + { + ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); + return HttpResponseCode::NotFound; + } + + OutChunk = Chunk; + HttpContentType ContentType = Chunk.GetContentType(); + + if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + ZEN_ASSERT(!Compressed.IsNull()); + + if (IsOffset) + { + uint64_t RawSize = Compressed.GetRawSize(); + if ((Offset + Size) > RawSize) + { + Size = RawSize - Offset; + } + + if (AcceptType == HttpContentType::kBinary) + { + OutChunk = Compressed.Decompress(Offset, Size).AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kBinary); + } + else + { + // Value will be a range of compressed blocks that covers the requested range + // The client will have to compensate for any offsets that do not land on an even block size multiple + OutChunk = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kCompressedBinary); + } + } + else + { + if (AcceptType == HttpContentType::kBinary) + { + OutChunk = Compressed.Decompress().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kBinary); + } + else + { + OutChunk = Compressed.GetCompressed().Flatten().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kCompressedBinary); + } + } + } + else if (IsOffset) + { + if ((Offset + Size) > Chunk.GetSize()) + { + Size = Chunk.GetSize() - Offset; + } + OutChunk = IoBuffer(Chunk, Offset, Size); + OutChunk.SetContentType(ContentType); + } + + ZEN_DEBUG("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType)); + + return HttpResponseCode::OK; +} + +HttpResponseCode +ProjectStore::GetChunk(const std::string_view Cid, ZenContentType AcceptType, IoBuffer& OutChunk) +{ + using namespace std::literals; + + if (Cid.length() != IoHash::StringLength) + { + ZEN_INFO("Chunk request for invalid chunk hash '{}'", Cid); + return HttpResponseCode::BadRequest; + } + + const IoHash Hash = IoHash::FromHexString(Cid); + OutChunk = m_CidStore.FindChunkByCid(Hash); + + if (!OutChunk) + { + ZEN_DEBUG("chunk - '{}' MISSING", Cid); + return HttpResponseCode::NotFound; + } + + if (AcceptType == HttpContentType::kBinary) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(OutChunk)); + OutChunk = Compressed.Decompress().AsIoBuffer(); + OutChunk.SetContentType(HttpContentType::kBinary); + } + else + { + OutChunk.SetContentType(HttpContentType::kCompressedBinary); + } + return HttpResponseCode::OK; +} + ////////////////////////////////////////////////////////////////////////// HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) @@ -1264,34 +1529,15 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) // currently not possible for (arbitrary, external) technical reasons m_Router.RegisterRoute( "list", - [this](HttpRouterRequest& Req) { - m_ProjectStore->DiscoverProjects(); - - CbWriter Response; - Response.BeginArray(); - - m_ProjectStore->IterateProjects([&Response](ProjectStore::Project& Prj) { - Response.BeginObject(); - Response << "Id"sv << Prj.Identifier; - Response << "RootDir"sv << Prj.RootDir.string(); - Response << "ProjectRootDir"sv << Prj.ProjectRootDir; - Response << "EngineRootDir"sv << Prj.EngineRootDir; - Response << "ProjectFilePath"sv << Prj.ProjectFilePath; - Response.EndObject(); - }); - Response.EndArray(); - - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Response.Save().AsArray()); - }, + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_ProjectStore->GetProjectsList()); }, HttpVerb::kGet); m_Router.RegisterRoute( "{project}/oplog/{log}/batch", [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); + HttpServerRequest& HttpReq = Req.ServerRequest(); + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) @@ -1412,7 +1658,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); ResponsePtr += sizeof(ResponseChunk); } - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, OutBlobs); }, HttpVerb::kPost); @@ -1427,42 +1672,17 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) const auto& ProjectId = Req.GetCapture(1); const auto& OplogId = Req.GetCapture(2); - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); const bool FilterClient = Params.GetValue("filter"sv) == "client"sv; - CbObjectWriter Response; - Response.BeginArray("files"); - - Log.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { - Response.BeginObject(); - Response << "id"sv << Id; - Response << "clientpath"sv << ClientPath; - if (!FilterClient) - { - Response << "serverpath"sv << ServerPath; - } - Response.EndObject(); - }); - - Response.EndArray(); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + CbObject ResponsePayload; + HttpResponseCode Response = m_ProjectStore->GetProjectFiles(ProjectId, OplogId, FilterClient, ResponsePayload); + if (Response != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(Response); + } + return HttpReq.WriteResponse(Response, ResponsePayload); }, HttpVerb::kGet); @@ -1475,41 +1695,13 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) const auto& OplogId = Req.GetCapture(2); const auto& ChunkId = Req.GetCapture(3); - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - - const Oid Obj = Oid::FromHexString(ChunkId); - - IoBuffer Chunk = Log.FindChunk(Obj); - if (!Chunk) - { - m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - uint64_t ChunkSize = Chunk.GetSize(); - if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) + CbObject ResponsePayload; + HttpResponseCode Response = m_ProjectStore->GetChunkInfo(ProjectId, OplogId, ChunkId, ResponsePayload); + if (Response != HttpResponseCode::OK) { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); - ZEN_ASSERT(!Compressed.IsNull()); - ChunkSize = Compressed.GetRawSize(); + return HttpReq.WriteResponse(Response); } - - CbObjectWriter Response; - Response << "size"sv << ChunkSize; - HttpReq.WriteResponse(HttpResponseCode::OK, Response.Save()); + HttpReq.WriteResponse(HttpResponseCode::OK, ResponsePayload); }, HttpVerb::kGet); @@ -1522,9 +1714,8 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) const auto& OplogId = Req.GetCapture(2); const auto& ChunkId = Req.GetCapture(3); - bool IsOffset = false; - uint64_t Offset = 0; - uint64_t Size = ~(0ull); + uint64_t Offset = 0; + uint64_t Size = ~(0ull); auto QueryParms = Req.ServerRequest().GetQueryParams(); @@ -1532,8 +1723,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) { if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm)) { - Offset = OffsetVal.value(); - IsOffset = true; + Offset = OffsetVal.value(); } else { @@ -1545,8 +1735,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) { if (auto SizeVal = ParseInt<uint64_t>(SizeParm)) { - Size = SizeVal.value(); - IsOffset = true; + Size = SizeVal.value(); } else { @@ -1555,87 +1744,16 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) } HttpContentType AcceptType = HttpReq.AcceptContentType(); - if (AcceptType == HttpContentType::kUnknownContentType) - { - AcceptType = HttpContentType::kBinary; - } - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - m_Log.warn("chunk - '{}/{}/{}' FAILED, missing project", ProjectId, OplogId, ChunkId); - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - m_Log.warn("chunk - '{}/{}/{}' FAILED, missing oplog", ProjectId, OplogId, ChunkId); - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog& Log = *FoundLog; - Oid Obj = Oid::FromHexString(ChunkId); - - IoBuffer Chunk = Log.FindChunk(Obj); - if (!Chunk) - { - m_Log.debug("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, ChunkId); - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - IoBuffer Value = Chunk; - HttpContentType ContentType = Chunk.GetContentType(); - - if (Chunk.GetContentType() == HttpContentType::kCompressedBinary) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); - ZEN_ASSERT(!Compressed.IsNull()); - - if (IsOffset) - { - if ((Offset + Size) > Compressed.GetRawSize()) - { - Size = Compressed.GetRawSize() - Offset; - } - if (AcceptType == HttpContentType::kBinary) - { - Value = Compressed.Decompress(Offset, Size).AsIoBuffer(); - ContentType = HttpContentType::kBinary; - } - else - { - Value = Compressed.CopyRange(Offset, Size).GetCompressed().Flatten().AsIoBuffer(); - ContentType = HttpContentType::kCompressedBinary; - } - } - else - { - if (AcceptType == HttpContentType::kBinary) - { - Value = Compressed.Decompress().AsIoBuffer(); - ContentType = HttpContentType::kBinary; - } - else - { - Value = Compressed.GetCompressed().Flatten().AsIoBuffer(); - ContentType = HttpContentType::kCompressedBinary; - } - } - } - else if (IsOffset) + IoBuffer Chunk; + HttpResponseCode Response = m_ProjectStore->GetChunk(ProjectId, OplogId, ChunkId, Offset, Size, AcceptType, Chunk); + if (Response != HttpResponseCode::OK) { - if ((Offset + Size) > Chunk.GetSize()) - { - Size = Chunk.GetSize() - Offset; - } - Value = IoBuffer(Chunk, Offset, Size); + return HttpReq.WriteResponse(Response); } - m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(ContentType)); - return HttpReq.WriteResponse(HttpResponseCode::OK, ContentType, Value); + m_Log.debug("chunk - '{}/{}/{}' '{}'", ProjectId, OplogId, ChunkId, ToString(Chunk.GetContentType())); + return HttpReq.WriteResponse(HttpResponseCode::OK, Chunk.GetContentType(), Chunk); }, HttpVerb::kGet | HttpVerb::kHead); @@ -1814,7 +1932,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) ZEN_WARN("Received malformed package! Saving payload to '{}'", BadPackagePath); - zen::WriteFile(BadPackagePath, Payload); + WriteFile(BadPackagePath, Payload); return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); } @@ -1872,7 +1990,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) ProjectStore::Oplog& Oplog = *FoundLog; - if (const std::optional<int32_t> OpId = zen::ParseInt<uint32_t>(OpIdString)) + if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString)) { if (std::optional<CbObject> MaybeOp = Oplog.GetOpByIndex(OpId.value())) { @@ -1910,7 +2028,7 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) case ZenContentType::kCompressedBinary: if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload))) { - Package.AddAttachment(CbAttachment(Compressed)); + Package.AddAttachment(CbAttachment(Compressed, AttachmentHash)); } else { @@ -1942,6 +2060,196 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) HttpVerb::kGet); m_Router.RegisterRoute( + "{project}/oplog/{log}/archive", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); + if (!Project) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + + if (!FoundLog) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + + switch (Req.ServerRequest().RequestVerb()) + { + case HttpVerb::kGet: + { + CbObjectWriter Response; + Response.BeginArray("entries"sv); + std::unordered_set<IoHash> AttachementHashes; + size_t OpCount = 0; + IoHashStream Hasher; + + FoundLog->IterateOplog([this, &Hasher, &Response, &AttachementHashes, &OpCount](CbObject Op) { + SharedBuffer Buffer = Op.GetBuffer(); + Hasher.Append(Buffer.GetView()); + Response << Op; + Op.IterateAttachments([this, &AttachementHashes, &OpCount](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsAttachment(); + AttachementHashes.emplace(AttachmentHash); + }); + OpCount++; + }); + Response.EndArray(); + + IoHash Checksum = Hasher.GetHash(); + Response.AddHash("checksum"sv, Checksum); + + ZEN_INFO("Exporting {} ops and {} chunks from '{}/{}' with checksum '{}'", + OpCount, + AttachementHashes.size(), + ProjectId, + OplogId, + Checksum); + + CbPackage ResponsePackage; + ResponsePackage.SetObject(Response.Save()); + + std::vector<CbAttachment> Attachments; + Attachments.reserve(AttachementHashes.size()); + for (const IoHash& AttachmentHash : AttachementHashes) + { + IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); + if (Payload) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)); + ZEN_ASSERT(Compressed); + Attachments.emplace_back(CbAttachment(Compressed, AttachmentHash)); + } + } + ResponsePackage.AddAttachments(Attachments); + + std::vector<IoBuffer> ResponsePayload = FormatPackageMessage(ResponsePackage, FormatFlags::kAllowLocalReferences); + const ZenContentType AcceptType = HttpReq.AcceptContentType(); + if (AcceptType == ZenContentType::kCompressedBinary) + { + std::vector<SharedBuffer> Parts; + Parts.reserve(ResponsePayload.size()); + for (const auto& I : ResponsePayload) + { + Parts.emplace_back(SharedBuffer(I)); + } + CompositeBuffer Cmp(std::move(Parts)); + CompressedBuffer CompressedResponse = CompressedBuffer::Compress(Cmp); + HttpReq.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCompressedBinary, + CompressedResponse.GetCompressed().Flatten().AsIoBuffer()); + } + else if (AcceptType == ZenContentType::kCbPackage) + { + HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponsePayload); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + } + break; + case HttpVerb::kPost: + { + ZEN_INFO("Importing oplog '{}/{}'", ProjectId, OplogId); + IoBuffer CompressedPayload = HttpReq.ReadPayload(); + IoBuffer Payload = CompressedBuffer::FromCompressed(SharedBuffer(CompressedPayload)).Decompress().AsIoBuffer(); + + CbPackage RequestPackage = ParsePackageMessage(Payload); + CbObject Request = RequestPackage.GetObject(); + IoHash Checksum = Request["checksum"sv].AsHash(); + + std::span<const CbAttachment> Attachments = RequestPackage.GetAttachments(); + zen ::CbArrayView Entries = Request["entries"sv].AsArrayView(); + + ZEN_INFO("Importing oplog with {} ops and {} attachments with checksum '{}' to '{}/{}'", + Entries.Num(), + Attachments.size(), + Checksum, + ProjectId, + OplogId); + std::vector<CbObject> Ops; + Ops.reserve(Entries.Num()); + IoHashStream Hasher; + for (auto& OpEntry : Entries) + { + CbObjectView Core = OpEntry.AsObjectView(); + + if (!Core["key"sv]) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "No oplog entry key specified"); + } + + BinaryWriter Writer; + Core.CopyTo(Writer); + MemoryView OpView = Writer.GetView(); + Hasher.Append(OpView); + IoBuffer OpBuffer(IoBuffer::Clone, OpView.GetData(), OpView.GetSize()); + CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); + Ops.emplace_back(Op); + } + IoHash CalculatedChecksum = Hasher.GetHash(); + if (CalculatedChecksum != Checksum) + { + ZEN_WARN("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); + } + + ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId); + + // Spread over multiple threads? + WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u)); + std::atomic_int64_t JobCount = 0; + for (const CbAttachment& Attachment : Attachments) + { + JobCount.fetch_add(1); + WorkerPool.ScheduleWork([this, &Attachment, &JobCount]() { + CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary(); + m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly); + JobCount.fetch_add(-1); + }); + } + while (JobCount.load()) + { + Sleep(1); + } + + ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId); + for (const CbObject& Op : Ops) + { + const uint32_t OpLsn = FoundLog->AppendNewOplogEntry(Op); + ZEN_DEBUG("oplog entry #{}", OpLsn); + + if (OpLsn == ProjectStore::Oplog::kInvalidOp) + { + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", + ProjectId, + OplogId, + OpLsn, + NiceBytes(Op.GetSize()), + Op["key"sv].AsString()); + } + ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId); + return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); + } + break; + default: + break; + } + }, + HttpVerb::kPost | HttpVerb::kGet); + m_Router.RegisterRoute( "{project}/oplog/{log}", [this](HttpRouterRequest& Req) { const auto& ProjectId = Req.GetCapture(1); @@ -2120,7 +2428,11 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) std::vector<std::string> OpLogs = Project->ScanForOplogs(); CbObjectWriter Response; - Response << "id"sv << Project->Identifier << "root"sv << PathToUtf8(Project->RootDir); + Response << "id"sv << Project->Identifier; + Response << "root"sv << PathToUtf8(Project->RootDir); + Response << "engine"sv << PathToUtf8(Project->EngineRootDir); + Response << "project"sv << PathToUtf8(Project->ProjectRootDir); + Response << "projectfile"sv << PathToUtf8(Project->ProjectFilePath); Response.BeginArray("oplogs"sv); for (const std::string& OplogId : OpLogs) @@ -2190,24 +2502,32 @@ HttpProjectService::HandleRequest(HttpServerRequest& Request) namespace testutils { using namespace std::literals; - CbPackage CreateOplogPackage(const Oid& Id, const std::span<const CompressedBuffer>& Attachments) + std::string OidAsString(const Oid& Id) + { + StringBuilder<25> OidStringBuilder; + Id.ToString(OidStringBuilder); + return OidStringBuilder.ToString(); + } + + CbPackage CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments) { CbPackage Package; CbObjectWriter Object; - Object << "key"sv << Id; + Object << "key"sv << OidAsString(Id); if (!Attachments.empty()) { Object.BeginArray("bulkdata"); - for (const CompressedBuffer& Attachment : Attachments) + for (const auto& Attachment : Attachments) { + CbAttachment Attach(Attachment.second, IoHash::FromBLAKE3(Attachment.second.GetRawHash())); Object.BeginObject(); - Object << "id" << Oid::NewOid(); - Object << "type" - << "Standard"; - Object << "data" << CbAttachment(Attachment); + Object << "id"sv << Attachment.first; + Object << "type"sv + << "Standard"sv; + Object << "data"sv << Attach; Object.EndObject(); - Package.AddAttachment(CbAttachment(Attachment)); + Package.AddAttachment(Attach); } Object.EndArray(); } @@ -2215,9 +2535,9 @@ namespace testutils { return Package; }; - std::vector<CompressedBuffer> CreateAttachments(const std::span<const size_t>& Sizes) + std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes) { - std::vector<CompressedBuffer> Result; + std::vector<std::pair<Oid, CompressedBuffer>> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { @@ -2227,12 +2547,28 @@ namespace testutils { { Data[Idx] = Idx % 255; } - - Result.emplace_back(zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()))); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); + Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed)); } return Result; } + uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) + { + if (RawOffset > 0) + { + uint64 BlockSize = 0; + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + if (!Buffer.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + return 0; + } + return BlockSize > 0 ? RawOffset % BlockSize : 0; + } + return 0; + } + } // namespace testutils TEST_CASE("project.store.create") @@ -2391,6 +2727,91 @@ TEST_CASE("project.store.gc") } } +TEST_CASE("project.store.partial.read") +{ + using namespace std::literals; + using namespace testutils; + + ScopedTemporaryDirectory TempDir; + + GcManager Gc; + CidStore CidStore(Gc); + CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + CidStore.Initialize(CidConfig); + + std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; + ProjectStore ProjectStore(CidStore, BasePath, Gc); + std::filesystem::path RootDir = TempDir.Path() / "root"sv; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; + + std::filesystem::path Project1RootDir = TempDir.Path() / "game1"sv; + std::filesystem::path Project1FilePath = TempDir.Path() / "game1"sv / "game.uproject"sv; + { + CreateDirectories(Project1FilePath.parent_path()); + BasicFile ProjectFile; + ProjectFile.Open(Project1FilePath, BasicFile::Mode::kTruncate); + } + + std::vector<Oid> OpIds; + OpIds.insert(OpIds.end(), {Oid::NewOid(), Oid::NewOid(), Oid::NewOid(), Oid::NewOid()}); + std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments; + { + Ref<ProjectStore::Project> Project1(ProjectStore.NewProject(BasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + Project1RootDir.string(), + Project1FilePath.string())); + ProjectStore::Oplog* Oplog = Project1->NewOplog("oplog1"sv); + CHECK(Oplog != nullptr); + Attachments[OpIds[0]] = {}; + Attachments[OpIds[1]] = CreateAttachments(std::initializer_list<size_t>{77}); + Attachments[OpIds[2]] = CreateAttachments(std::initializer_list<size_t>{7123, 9583, 690, 99}); + Attachments[OpIds[3]] = CreateAttachments(std::initializer_list<size_t>{55, 122}); + for (auto It : Attachments) + { + Oplog->AppendNewOplogEntry(CreateOplogPackage(It.first, It.second)); + } + } + { + IoBuffer Chunk; + CHECK(ProjectStore.GetChunk(IoHash::FromBLAKE3(Attachments[OpIds[1]][0].second.GetRawHash()).ToHexString(), + HttpContentType::kCompressedBinary, + Chunk) == HttpResponseCode::OK); + CompressedBuffer Attachment = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + CHECK(Attachment.GetRawSize() == Attachments[OpIds[1]][0].second.GetRawSize()); + } + + IoBuffer ChunkResult; + CHECK(ProjectStore.GetChunk("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 0, + ~0ull, + HttpContentType::kCompressedBinary, + ChunkResult) == HttpResponseCode::OK); + CHECK(ChunkResult); + CHECK(CompressedBuffer::FromCompressed(SharedBuffer(ChunkResult)).GetRawSize() == Attachments[OpIds[2]][1].second.GetRawSize()); + + IoBuffer PartialChunkResult; + CHECK(ProjectStore.GetChunk("proj1"sv, + "oplog1"sv, + OidAsString(Attachments[OpIds[2]][1].first), + 5, + 1773, + HttpContentType::kCompressedBinary, + PartialChunkResult) == HttpResponseCode::OK); + CHECK(PartialChunkResult); + CompressedBuffer PartialCompressedResult = CompressedBuffer::FromCompressed(SharedBuffer(PartialChunkResult)); + CHECK(PartialCompressedResult.GetRawSize() >= 1773); + + uint64_t RawOffsetInPartialCompressed = GetCompressedOffset(PartialCompressedResult, 5); + SharedBuffer PartialDecompressed = PartialCompressedResult.Decompress(RawOffsetInPartialCompressed); + SharedBuffer FullDecompressed = Attachments[OpIds[2]][1].second.Decompress(); + const uint8_t* FullDataPtr = &(reinterpret_cast<const uint8_t*>(FullDecompressed.GetView().GetData())[5]); + const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData()); + CHECK(FullDataPtr[0] == PartialDataPtr[0]); +} #endif void diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h index 00fb613d9..c62e7840d 100644 --- a/zenserver/projectstore.h +++ b/zenserver/projectstore.h @@ -90,6 +90,8 @@ public: */ uint32_t AppendNewOplogEntry(CbPackage Op); + uint32_t AppendNewOplogEntry(CbObject Core); + enum UpdateType { kUpdateNewEntry, @@ -222,6 +224,24 @@ public: virtual void CollectGarbage(GcContext& GcCtx) override; virtual GcStorageSize StorageSize() const override; + CbArray GetProjectsList(); + HttpResponseCode GetProjectFiles(const std::string_view ProjectId, + const std::string_view OplogId, + bool FilterClient, + CbObject& OutPayload); + HttpResponseCode GetChunkInfo(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + CbObject& OutPayload); + HttpResponseCode GetChunk(const std::string_view ProjectId, + const std::string_view OplogId, + const std::string_view ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + IoBuffer& OutChunk); + HttpResponseCode GetChunk(const std::string_view Cid, ZenContentType AcceptType, IoBuffer& OutChunk); + private: spdlog::logger& m_Log; CidStore& m_CidStore; diff --git a/zenserver/upstream/hordecompute.cpp b/zenserver/upstream/hordecompute.cpp index 22b06d9c4..988386726 100644 --- a/zenserver/upstream/hordecompute.cpp +++ b/zenserver/upstream/hordecompute.cpp @@ -999,7 +999,7 @@ namespace detail { ApplyResult.TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); ApplyResult.TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize(); - CbAttachment Attachment(AttachmentBuffer); + CbAttachment Attachment(AttachmentBuffer, DecompressedId); OutputPackage.AddAttachment(Attachment); }); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index f9673bcd6..bc06653b9 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -253,7 +253,7 @@ namespace detail { if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) { - Package.AddAttachment(CbAttachment(Chunk)); + Package.AddAttachment(CbAttachment(Chunk, AttachmentHash.AsHash())); } else { @@ -337,7 +337,7 @@ namespace detail { { if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response))) { - Package.AddAttachment(CbAttachment(Chunk)); + Package.AddAttachment(CbAttachment(Chunk, AttachmentHash.AsHash())); } } }); @@ -1254,7 +1254,7 @@ namespace detail { { if (CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(Value))) { - Package.AddAttachment(CbAttachment(AttachmentBuffer)); + Package.AddAttachment(CbAttachment(AttachmentBuffer, IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()))); } else { @@ -1312,8 +1312,9 @@ namespace detail { } BatchWriter.EndObject(); // Policy unspecified and expected to be Default - BatchWriter.AddBinaryAttachment("RawHash"sv, IoHash::FromBLAKE3(Compressed.GetRawHash())); - BatchPackage.AddAttachment(CbAttachment(Compressed)); + IoHash Hash = IoHash::FromBLAKE3(Compressed.GetRawHash()); + BatchWriter.AddBinaryAttachment("RawHash"sv, Hash); + BatchPackage.AddAttachment(CbAttachment(Compressed, Hash)); } BatchWriter.EndObject(); } diff --git a/zenstore/cas.cpp b/zenstore/cas.cpp index f8fc41341..fdec78c60 100644 --- a/zenstore/cas.cpp +++ b/zenstore/cas.cpp @@ -47,7 +47,7 @@ public: virtual ~CasImpl(); virtual void Initialize(const CidStoreConfiguration& InConfig) override; - virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) override; + virtual CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) override; virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; virtual bool ContainsChunk(const IoHash& ChunkHash) override; virtual void FilterChunks(HashKeySet& InOutChunks) override; @@ -191,7 +191,7 @@ CasImpl::UpdateManifest() } CasStore::InsertResult -CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) +CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, InsertMode Mode) { ZEN_TRACE_CPU("CAS::InsertChunk"); @@ -208,7 +208,7 @@ CasImpl::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) return m_SmallStrategy.InsertChunk(Chunk, ChunkHash); } - return m_LargeStrategy.InsertChunk(Chunk, ChunkHash); + return m_LargeStrategy.InsertChunk(Chunk, ChunkHash, Mode); } IoBuffer diff --git a/zenstore/cas.h b/zenstore/cas.h index 2ad160d28..9c48d4707 100644 --- a/zenstore/cas.h +++ b/zenstore/cas.h @@ -39,15 +39,21 @@ public: bool New = false; }; - virtual void Initialize(const CidStoreConfiguration& Config) = 0; - virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash) = 0; - virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; - virtual bool ContainsChunk(const IoHash& ChunkHash) = 0; - virtual void FilterChunks(HashKeySet& InOutChunks) = 0; - virtual void Flush() = 0; - virtual void Scrub(ScrubContext& Ctx) = 0; - virtual void GarbageCollect(GcContext& GcCtx) = 0; - virtual CidStoreSize TotalSize() const = 0; + enum class InsertMode + { + kCopyOnly, + kMayBeMovedInPlace + }; + + virtual void Initialize(const CidStoreConfiguration& Config) = 0; + virtual InsertResult InsertChunk(IoBuffer Data, const IoHash& ChunkHash, InsertMode Mode = InsertMode::kMayBeMovedInPlace) = 0; + virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; + virtual bool ContainsChunk(const IoHash& ChunkHash) = 0; + virtual void FilterChunks(HashKeySet& InOutChunks) = 0; + virtual void Flush() = 0; + virtual void Scrub(ScrubContext& Ctx) = 0; + virtual void GarbageCollect(GcContext& GcCtx) = 0; + virtual CidStoreSize TotalSize() const = 0; protected: CidStoreConfiguration m_Config; diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 5a079dbed..8b2797ce9 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -23,14 +23,14 @@ struct CidStore::Impl void Initialize(const CidStoreConfiguration& Config) { m_CasStore.Initialize(Config); } - CidStore::InsertResult AddChunk(const CompressedBuffer& ChunkData) + CidStore::InsertResult AddChunk(const CompressedBuffer& ChunkData, CidStore::InsertMode Mode) { const IoHash DecompressedId = IoHash::FromBLAKE3(ChunkData.GetRawHash()); IoBuffer Payload = ChunkData.GetCompressed().Flatten().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCompressedBinary); - CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, DecompressedId); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, DecompressedId, static_cast<CasStore::InsertMode>(Mode)); return {.New = Result.New}; } @@ -78,9 +78,9 @@ CidStore::Initialize(const CidStoreConfiguration& Config) } CidStore::InsertResult -CidStore::AddChunk(const CompressedBuffer& ChunkData) +CidStore::AddChunk(const CompressedBuffer& ChunkData, InsertMode Mode) { - return m_Impl->AddChunk(ChunkData); + return m_Impl->AddChunk(ChunkData, Mode); } IoBuffer diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index d0a8ad849..9825f225a 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -124,7 +124,7 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN } CasStore::InsertResult -FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) +FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore::InsertMode Mode) { ZEN_ASSERT(m_IsInitialized); @@ -132,6 +132,16 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) ZEN_ASSERT(Chunk.GetContentType() == ZenContentType::kCompressedBinary); #endif + if (Mode == CasStore::InsertMode::kCopyOnly) + { + ShardingHelper Name(m_RootDirectory.c_str(), ChunkHash); + if (std::filesystem::is_regular_file(Name.ShardedPath.ToPath())) + { + return {.New = false}; + } + return InsertChunk(Chunk.Data(), Chunk.Size(), ChunkHash); + } + // File-based chunks have special case handling whereby we move the file into // place in the file store directory, thus avoiding unnecessary copying @@ -153,6 +163,8 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) if (!Success) { + // TODO: We should provide information to this function to tell it if the payload is temporary or not and if we are allowed + // to delete it. ZEN_WARN("Failed to flag temporary payload file '{}' for deletion: '{}'", PathFromHandle(ChunkFileHandle), GetLastErrorAsString()); @@ -294,6 +306,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) ChunkHash); DeletePayloadFileOnClose(); + #elif ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC std::filesystem::path SourcePath = PathFromHandle(FileRef.FileHandle); std::filesystem::path DestPath = Name.ShardedPath.c_str(); diff --git a/zenstore/filecas.h b/zenstore/filecas.h index f14e5d057..de79b8b81 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -34,7 +34,9 @@ struct FileCasStrategy final : public GcStorage void Initialize(const std::filesystem::path& RootDirectory, bool IsNewStore); CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); - CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); + CasStore::InsertResult InsertChunk(IoBuffer Chunk, + const IoHash& ChunkHash, + CasStore::InsertMode Mode = CasStore::InsertMode::kMayBeMovedInPlace); IoBuffer FindChunk(const IoHash& ChunkHash); bool HaveChunk(const IoHash& ChunkHash); void FilterChunks(HashKeySet& InOutChunks); diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index 21e3c3160..e8984a83d 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -63,9 +63,14 @@ public: { bool New = false; }; + enum class InsertMode + { + kCopyOnly, + kMayBeMovedInPlace + }; void Initialize(const CidStoreConfiguration& Config); - InsertResult AddChunk(const CompressedBuffer& ChunkData); + InsertResult AddChunk(const CompressedBuffer& ChunkData, InsertMode Mode = InsertMode::kMayBeMovedInPlace); IoBuffer FindChunkByCid(const IoHash& DecompressedId); bool ContainsChunk(const IoHash& DecompressedId); void FilterChunks(HashKeySet& InOutChunks); diff --git a/zenutil/basicfile.cpp b/zenutil/basicfile.cpp index fb77f4bd1..1e6043d7e 100644 --- a/zenutil/basicfile.cpp +++ b/zenutil/basicfile.cpp @@ -72,7 +72,7 @@ BasicFile::Open(const std::filesystem::path& FileName, Mode Mode, std::error_cod break; } - const DWORD dwShareMode = FILE_SHARE_READ; + const DWORD dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE; const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; HANDLE hTemplateFile = nullptr; diff --git a/zenutil/cache/cacherequests.cpp b/zenutil/cache/cacherequests.cpp index 0ac6c35ed..1ba2721b6 100644 --- a/zenutil/cache/cacherequests.cpp +++ b/zenutil/cache/cacherequests.cpp @@ -268,8 +268,9 @@ namespace cacherequests { const CompressedBuffer& Buffer = Value.Body; if (Buffer) { - Writer.AddBinaryAttachment("RawHash", IoHash::FromBLAKE3(Buffer.GetRawHash())); - OutPackage.AddAttachment(CbAttachment(Buffer)); + IoHash AttachmentHash = IoHash::FromBLAKE3(Buffer.GetRawHash()); + Writer.AddBinaryAttachment("RawHash", AttachmentHash); + OutPackage.AddAttachment(CbAttachment(Buffer, AttachmentHash)); Writer.AddInteger("RawSize", Buffer.GetRawSize()); } else @@ -499,16 +500,17 @@ namespace cacherequests { Writer.BeginArray("Values"); for (const GetCacheRecordResultValue& Value : RecordResult->Values) { + IoHash AttachmentHash = Value.Body ? IoHash::FromBLAKE3(Value.Body.GetRawHash()) : Value.RawHash; Writer.BeginObject(); { Writer.AddObjectId("Id", Value.Id); - Writer.AddHash("RawHash", Value.Body ? IoHash::FromBLAKE3(Value.Body.GetRawHash()) : Value.RawHash); + Writer.AddHash("RawHash", AttachmentHash); Writer.AddInteger("RawSize", Value.Body ? Value.Body.GetRawSize() : Value.RawSize); } Writer.EndObject(); if (Value.Body) { - OutPackage.AddAttachment(CbAttachment(Value.Body)); + OutPackage.AddAttachment(CbAttachment(Value.Body, AttachmentHash)); } } @@ -594,8 +596,9 @@ namespace cacherequests { { return false; } - Writer.AddBinaryAttachment("RawHash", IoHash::FromBLAKE3(ValueRequest.Body.GetRawHash())); - OutPackage.AddAttachment(CbAttachment(ValueRequest.Body)); + IoHash AttachmentHash = IoHash::FromBLAKE3(ValueRequest.Body.GetRawHash()); + Writer.AddBinaryAttachment("RawHash", AttachmentHash); + OutPackage.AddAttachment(CbAttachment(ValueRequest.Body, AttachmentHash)); } else if (ValueRequest.RawHash != IoHash::Zero) { @@ -797,7 +800,7 @@ namespace cacherequests { ResponseObject.AddHash("RawHash", ValueResult.RawHash); if (ValueResult.Body) { - OutPackage.AddAttachment(CbAttachment(ValueResult.Body)); + OutPackage.AddAttachment(CbAttachment(ValueResult.Body, ValueResult.RawHash)); } else { |