aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-02-01 01:00:23 -0800
committerGitHub <[email protected]>2023-02-01 01:00:23 -0800
commit149e8636b2965ec0cea0e25285a7d90e312d2b71 (patch)
tree243b56d4b15c50e29c323d87c624dd2026e63a58
parentfix formatting of zenutil/include/zenutil/zenserverprocess.h (diff)
downloadzen-149e8636b2965ec0cea0e25285a7d90e312d2b71.tar.xz
zen-149e8636b2965ec0cea0e25285a7d90e312d2b71.zip
Clean up project store file structure (#218)
* move project store to separate folder * moved import/export project commands into projectstore cmd files
-rw-r--r--zen/cmds/exportproject.cpp362
-rw-r--r--zen/cmds/exportproject.h79
-rw-r--r--zen/cmds/importproject.cpp249
-rw-r--r--zen/cmds/importproject.h22
-rw-r--r--zen/cmds/projectstore.cpp619
-rw-r--r--zen/cmds/projectstore.h34
-rw-r--r--zen/zen.cpp5
-rw-r--r--zenserver/projectstore/projectstore.cpp (renamed from zenserver/projectstore.cpp)0
-rw-r--r--zenserver/projectstore/projectstore.h (renamed from zenserver/projectstore.h)0
-rw-r--r--zenserver/zenserver.cpp2
10 files changed, 656 insertions, 716 deletions
diff --git a/zen/cmds/exportproject.cpp b/zen/cmds/exportproject.cpp
deleted file mode 100644
index 6925c9f03..000000000
--- a/zen/cmds/exportproject.cpp
+++ /dev/null
@@ -1,362 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "exportproject.h"
-
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalue.h>
-#include <zencore/compress.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/stream.h>
-#include <zencore/uid.h>
-#include <zencore/workthreadpool.h>
-#include <zenhttp/httpcommon.h>
-#include <zenhttp/httpshared.h>
-#include <zenutil/basicfile.h>
-
-#include <memory>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <cpr/cpr.h>
-#include <gsl/gsl-lite.hpp>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-ExportProjectCommand::ExportProjectCommand()
-{
- m_Options.add_options()("h,help", "Print help");
- m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
- m_Options.add_option("", "t", "target", "Target path", cxxopts::value(m_TargetPath), "<targetpath>");
- m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>");
- m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>");
- m_Options.parse_positional({"target", "project", "oplog"});
-}
-
-ExportProjectCommand::~ExportProjectCommand() = default;
-
-bool
-ExportProjectCommand::IsSuccess(const cpr::Response& Response, const std::string_view Operation)
-{
- if (!zen::IsHttpSuccessCode(Response.status_code))
- {
- if (Response.status_code)
- {
- ZEN_ERROR("{} failed: {}: {} ({})", Operation, Response.status_code, Response.reason, Response.text);
- }
- else
- {
- ZEN_ERROR("{} failed: {}", Operation, Response.error.message);
- }
-
- return false;
- }
- return true;
-}
-
-int
-ExportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
-{
- using namespace std::literals;
-
- ZEN_UNUSED(GlobalOptions);
-
- if (!ParseOptions(argc, argv))
- {
- return 0;
- }
-
- if (m_ProjectName.empty())
- {
- ZEN_ERROR("Project name must be given");
- return 1;
- }
-
- if (m_TargetPath.empty())
- {
- ZEN_ERROR("Target path must be given");
- return 1;
- }
-
- if (!std::filesystem::exists(m_TargetPath))
- {
- zen::CreateDirectories(m_TargetPath);
- }
- else if (!std::filesystem::is_directory(m_TargetPath))
- {
- ZEN_ERROR("Target path '{}' is not a directory", m_TargetPath);
- return 1;
- }
-
- const std::string UrlBase = fmt::format("{}/prj", m_HostName);
-
- cpr::Session Session;
- {
- ZEN_CONSOLE("Requesting project '{}' from '{}'", m_ProjectName, m_HostName);
-
- std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName);
- Session.SetUrl({ProjectRequest});
- cpr::Response Response = Session.Get();
- if (!IsSuccess(Response, ProjectRequest))
- {
- return 1;
- }
- zen::IoBuffer Payload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
- zen::BasicFile ProjectStore;
- ProjectStore.Open(GetProjectPath(m_TargetPath, m_ProjectName), zen::BasicFile::Mode::kTruncate);
- ProjectStore.Write(Payload.GetView(), 0);
-
- if (m_OplogNames.empty())
- {
- zen::CbObject Params = LoadCompactBinaryObject(Payload);
- zen ::CbArrayView Oplogs = Params["oplogs"sv].AsArrayView();
- for (auto& OplogEntry : Oplogs)
- {
- std::string_view OpLog = OplogEntry.AsObjectView()["id"sv].AsString();
- m_OplogNames.push_back(std::string(OpLog));
- }
- }
- }
-
- std::unordered_set<zen::IoHash, zen::IoHash::Hasher> UniqueChunks;
- std::vector<zen::CbAttachment> AllAttachments;
- std::vector<zen::CbPackage> OplogResponses;
- for (const std::string& OplogName : m_OplogNames)
- {
- ZEN_CONSOLE("Requesting oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_HostName, m_TargetPath);
-
- std::string GetOplogArchiveRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName);
- Session.SetUrl({GetOplogArchiveRequest});
- Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}});
- cpr::Response Response = Session.Get();
- if (!IsSuccess(Response, GetOplogArchiveRequest))
- {
- return 1;
- }
- zen::IoBuffer CompressedPayload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
- zen::IoBuffer Payload = zen::CompressedBuffer::FromCompressedNoValidate(std::move(CompressedPayload)).Decompress().AsIoBuffer();
-
- OplogResponses.emplace_back(zen::ParsePackageMessage(Payload));
- zen::CbPackage& ResponsePackage = OplogResponses.back();
- zen::CbObject Result = ResponsePackage.GetObject();
-
- zen::IoHash Checksum = Result["checksum"sv].AsHash();
- zen ::CbArrayView Entries = Result["entries"sv].AsArrayView();
-
- ZEN_CONSOLE("Exporting {} ops for oplog '{}/{}' with checksum '{}' to '{}'",
- Entries.Num(),
- m_ProjectName,
- OplogName,
- Checksum,
- m_TargetPath);
- {
- zen::BasicFile OpStore;
- OpStore.Open(GetOplogPath(m_TargetPath, OplogName), zen::BasicFile::Mode::kTruncate);
- OplogHeader Header = {.OpCount = Entries.Num(), .Checksum = Checksum};
- OpStore.Write(&Header, sizeof(OplogHeader), 0);
- std::vector<OplogEntry> OpEntries;
- OpEntries.resize(Entries.Num());
- const uint64_t DataOffset = sizeof(OplogHeader) + OpEntries.size() * sizeof(OplogEntry);
- uint64_t BulkOffset = DataOffset;
-
- zen::IoHashStream Hasher;
-
- for (uint64_t OpIndex = 0; auto& OpEntry : Entries)
- {
- zen::BinaryWriter Writer;
- OpEntry.CopyTo(Writer);
- zen::MemoryView OpView = Writer.GetView();
- Hasher.Append(OpView);
-
- OpEntries[OpIndex].Offset = BulkOffset;
- OpEntries[OpIndex].OpLength = gsl::narrow<uint32_t>(OpView.GetSize());
- OpStore.Write(OpView, BulkOffset);
- BulkOffset += OpView.GetSize();
- OpIndex++;
- }
- zen::IoHash CalculatedChecksum = Hasher.GetHash();
- if (CalculatedChecksum != Checksum)
- {
- ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum);
- return 1;
- }
- OpStore.Write(OpEntries.data(), OpEntries.size() * sizeof(OplogEntry), sizeof(OplogHeader));
- }
-
- std::span<const zen::CbAttachment> Attachments = ResponsePackage.GetAttachments();
- AllAttachments.reserve(AllAttachments.size() + Attachments.size());
- AllAttachments.reserve(UniqueChunks.size() + Attachments.size());
- for (const zen::CbAttachment& Attachment : Attachments)
- {
- if (UniqueChunks.insert(Attachment.GetHash()).second)
- {
- AllAttachments.push_back(Attachment);
- }
- }
-
- ZEN_CONSOLE("Exported {} ops referencing {} chunks for {}", Entries.Num(), Attachments.size(), OplogName);
- }
-
- size_t ChunkCount = AllAttachments.size();
- zen::BasicFile ChunkStoreIndex;
- ChunkStoreIndex.Open(GetChunksIndexPath(m_TargetPath), zen::BasicFile::Mode::kTruncate);
- ChunksHeader Header = {.ChunkCount = ChunkCount};
- ChunkStoreIndex.Write(&Header, sizeof(ChunksHeader), 0);
- std::vector<ChunkEntry> ChunkEntries;
- ChunkEntries.resize(ChunkCount);
- uint64_t ChunkOffset = 0;
-
- zen::WorkerThreadPool WorkerPool(std::thread::hardware_concurrency());
- std::atomic_int64_t JobCount = 0;
- std::vector<size_t> BlockChunkIndexes;
- const size_t BlockSize = 1ull << Header.BlockSizeShift;
- uint32_t CurrentBlockIndex = 0;
-
- auto WriteBlockAsync = [](const std::string& TargetPath,
- size_t WriteBlockOffset,
- uint32_t BlockIndex,
- const std::vector<size_t>& BlockChunkIndexes,
- const std::vector<ChunkEntry>& ChunkEntries,
- const std::vector<zen::CbAttachment>& Attachments,
- zen::WorkerThreadPool& WorkerPool,
- std::atomic_int64_t& JobCount) {
- JobCount.fetch_add(1);
- WorkerPool.ScheduleWork([&TargetPath, WriteBlockOffset, BlockIndex, BlockChunkIndexes, &ChunkEntries, &Attachments, &JobCount]() {
- zen::BasicFile ChunkBlock;
- ChunkBlock.Open(GetChunksPath(TargetPath, BlockIndex), zen::BasicFile::Mode::kTruncate);
- for (size_t ChunkIndex : BlockChunkIndexes)
- {
- const ChunkEntry& Chunk = ChunkEntries[ChunkIndex];
- zen::CompositeBuffer AttachmentBody = Attachments[ChunkIndex].AsCompressedBinary().GetCompressed();
- size_t AttachmentBulkOffset = Chunk.Offset - WriteBlockOffset;
- for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments())
- {
- size_t SegmentSize = Segment.GetSize();
- ChunkBlock.Write(Segment.GetData(), Segment.GetSize(), AttachmentBulkOffset);
- AttachmentBulkOffset += SegmentSize;
- }
- }
- JobCount.fetch_add(-1);
- });
- };
-
- ZEN_CONSOLE("Exporting {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath);
- for (size_t ChunkIndex = 0; const zen::CbAttachment& Attachment : AllAttachments)
- {
- ChunkEntry& Chunk = ChunkEntries[ChunkIndex];
- Chunk.ChunkHash = Attachment.GetHash();
- zen::CompositeBuffer AttachmentBody = Attachment.AsCompressedBinary().GetCompressed();
- Chunk.Length = AttachmentBody.GetSize();
-
- if (Chunk.Length < 1 * 1024 * 1024) // Use reasonable length for file
- {
- uint32_t BlockIndex = gsl::narrow<uint32_t>((ChunkOffset + Chunk.Length) / BlockSize);
- if (BlockIndex != CurrentBlockIndex)
- {
- size_t WriteBlockOffset = CurrentBlockIndex * BlockSize;
- WriteBlockAsync(m_TargetPath,
- WriteBlockOffset,
- CurrentBlockIndex,
- BlockChunkIndexes,
- ChunkEntries,
- AllAttachments,
- WorkerPool,
- JobCount);
-
- ChunkOffset = BlockIndex * BlockSize;
- CurrentBlockIndex = BlockIndex;
- BlockChunkIndexes.clear();
- }
-
- Chunk.Offset = ChunkOffset;
- ChunkOffset = Chunk.Offset + Chunk.Length;
- BlockChunkIndexes.push_back(ChunkIndex);
- }
- else
- {
- Chunk.Offset = ~0ull;
- JobCount.fetch_add(1);
- WorkerPool.ScheduleWork([this, AttachmentBody, &Chunk, &JobCount]() {
- std::filesystem::path Path = GetLargeChunkPath(m_TargetPath, Chunk.ChunkHash);
- zen::CreateDirectories(Path.parent_path());
- zen::BasicFile ChunkFile;
- ChunkFile.Open(Path, zen::BasicFile::Mode::kTruncate);
- uint64_t Offset = 0;
- for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments())
- {
- size_t SegmentSize = Segment.GetSize();
- ChunkFile.Write(Segment.GetData(), Segment.GetSize(), Offset);
- Offset += SegmentSize;
- }
- JobCount.fetch_add(-1);
- });
- }
- ChunkIndex++;
- }
- if (!BlockChunkIndexes.empty())
- {
- size_t WriteBlockOffset = CurrentBlockIndex * BlockSize;
- WriteBlockAsync(m_TargetPath,
- WriteBlockOffset,
- CurrentBlockIndex,
- BlockChunkIndexes,
- ChunkEntries,
- AllAttachments,
- WorkerPool,
- JobCount);
- }
-
- while (JobCount.load())
- {
- zen::Sleep(1);
- }
-
- ChunkStoreIndex.Write(ChunkEntries.data(), ChunkEntries.size() * sizeof(ChunkEntry), sizeof(ChunksHeader));
-
- ZEN_CONSOLE("Exported {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath);
-
- return 0;
-}
-
-std::filesystem::path
-ExportProjectCommand::GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog)
-{
- return RootPath / (Oplog + ".ops");
-}
-
-std::filesystem::path
-ExportProjectCommand::GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash)
-{
- zen::ExtendablePathBuilder<128> ShardedPath;
- ShardedPath.Append(RootPath.c_str());
- zen::ExtendableStringBuilder<64> HashString;
- OpHash.ToHexString(HashString);
- const char* str = HashString.c_str();
- ShardedPath.AppendSeparator();
- ShardedPath.AppendAsciiRange(str, str + 3);
-
- ShardedPath.AppendSeparator();
- ShardedPath.AppendAsciiRange(str + 3, str + 5);
-
- ShardedPath.AppendSeparator();
- ShardedPath.AppendAsciiRange(str + 5, str + 40);
-
- return ShardedPath.ToPath();
-}
-
-std::filesystem::path
-ExportProjectCommand::GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName)
-{
- return RootPath / (std::string(ProjectName) + ".zcb");
-}
-
-std::filesystem::path
-ExportProjectCommand::GetChunksIndexPath(const std::filesystem::path RootPath)
-{
- return RootPath / "chunks.idx";
-}
-
-std::filesystem::path
-ExportProjectCommand::GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex)
-{
- return RootPath / fmt::format("chunks{}.bin", BlockIndex);
-}
diff --git a/zen/cmds/exportproject.h b/zen/cmds/exportproject.h
deleted file mode 100644
index 3fe7a2263..000000000
--- a/zen/cmds/exportproject.h
+++ /dev/null
@@ -1,79 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "../zen.h"
-
-#include <zencore/filesystem.h>
-#include <zencore/iohash.h>
-
-#include <functional>
-
-namespace zen {
-class CbObjectView;
-}
-namespace cpr {
-class Response;
-}
-
-class ExportProjectCommand : public ZenCmdBase
-{
-public:
- ExportProjectCommand();
- ~ExportProjectCommand();
-
- virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
- virtual cxxopts::Options& Options() override { return m_Options; }
-
- struct OplogHeader
- {
- enum uint32
- {
- kMagic = 0x7816'B013
- };
- uint32_t Magic = kMagic;
- uint32_t HeaderSize = sizeof(OplogHeader);
- uint64_t OpCount = 0;
- zen::IoHash Checksum = zen::IoHash::Zero;
- };
- struct OplogEntry
- {
- uint64_t Offset;
- uint32_t OpLength;
- };
-
- struct ChunksHeader
- {
- enum uint32
- {
- kMagic = 0x574C'B016
- };
- uint32_t Magic = kMagic;
- uint64_t ChunkCount = 0;
- uint8_t BlockSizeShift = 31u;
- uint8_t Reserved1 = 0;
- uint16_t Reserved2 = 0;
- uint32_t Reserved3 = 0;
- };
- struct ChunkEntry
- {
- zen::IoHash ChunkHash;
- uint64_t Offset;
- uint64_t Length;
- };
-
- static std::filesystem::path GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog);
- static std::filesystem::path GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash);
- static std::filesystem::path GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName);
- static std::filesystem::path GetChunksIndexPath(const std::filesystem::path RootPath);
- static std::filesystem::path GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex);
-
- static bool IsSuccess(const cpr::Response& Response, const std::string_view Operation);
-
-private:
- cxxopts::Options m_Options{"export-project", "Export one or more project oplogs to disk"};
- std::string m_HostName;
- std::string m_TargetPath;
- std::string m_ProjectName;
- std::vector<std::string> m_OplogNames;
-};
diff --git a/zen/cmds/importproject.cpp b/zen/cmds/importproject.cpp
deleted file mode 100644
index 3a2605ebb..000000000
--- a/zen/cmds/importproject.cpp
+++ /dev/null
@@ -1,249 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "importproject.h"
-#include "exportproject.h"
-
-#include <zencore/compactbinary.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinarypackage.h>
-#include <zencore/compress.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/stream.h>
-#include <zenhttp/httpcommon.h>
-#include <zenhttp/httpshared.h>
-#include <zenutil/basicfile.h>
-
-#include <memory>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <cpr/cpr.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-ImportProjectCommand::ImportProjectCommand()
-{
- m_Options.add_options()("h,help", "Print help");
- m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
- m_Options.add_option("", "s", "source", "Source path", cxxopts::value(m_SourcePath), "<sourcepath>");
- m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>");
- m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>");
- m_Options.parse_positional({"source", "project", "oplog"});
-}
-
-ImportProjectCommand::~ImportProjectCommand() = default;
-
-int
-ImportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
-{
- using namespace std::literals;
-
- ZEN_UNUSED(GlobalOptions);
-
- if (!ParseOptions(argc, argv))
- {
- return 0;
- }
-
- if (m_ProjectName.empty())
- {
- ZEN_ERROR("Project name must be given");
- return 1;
- }
-
- if (m_SourcePath.empty())
- {
- ZEN_ERROR("Source path must be given");
- return 1;
- }
-
- if (!std::filesystem::is_directory(m_SourcePath))
- {
- ZEN_ERROR("Source path '{}' is not a directory", m_SourcePath);
- return 1;
- }
-
- const std::string UrlBase = fmt::format("{}/prj", m_HostName);
-
- cpr::Session Session;
-
- {
- ZEN_CONSOLE("Requesting project '{}' from '{}'", m_ProjectName, m_HostName);
-
- zen::BasicFile ProjectStore;
- ProjectStore.Open(ExportProjectCommand::GetProjectPath(m_SourcePath, m_ProjectName), zen::BasicFile::Mode::kRead);
- zen::IoBuffer Payload = ProjectStore.ReadAll();
-
- std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName);
- Session.SetUrl({ProjectRequest});
- Session.SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()});
- cpr::Response Response = Session.Post();
- if (!ExportProjectCommand::IsSuccess(Response, ProjectRequest))
- {
- return 1;
- }
- if (m_OplogNames.empty())
- {
- zen::DirectoryContent Content;
- zen::GetDirectoryContent(m_SourcePath, zen::DirectoryContent::IncludeFilesFlag, Content);
- for (auto& File : Content.Files)
- {
- if (File.extension() == ".ops")
- {
- m_OplogNames.push_back(File.stem().string());
- }
- }
- }
- }
-
- zen::IoBuffer ChunkStoreIndex = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksIndexPath(m_SourcePath));
- zen::IoBuffer ChunkStoreHeaderMem(ChunkStoreIndex, 0, sizeof(ExportProjectCommand::ChunksHeader));
- const ExportProjectCommand::ChunksHeader* ChunksHeader =
- reinterpret_cast<const ExportProjectCommand::ChunksHeader*>(ChunkStoreHeaderMem.GetView().GetData());
-
- if (ChunksHeader->Magic != ExportProjectCommand::ChunksHeader::kMagic)
- {
- ZEN_ERROR("Invalid chunk index header");
- return 1;
- }
- const size_t BlockSize = 1ull << ChunksHeader->BlockSizeShift;
-
- zen::IoBuffer ChunkStoreEntriesMem(ChunkStoreIndex,
- sizeof(ExportProjectCommand::ChunksHeader),
- sizeof(ExportProjectCommand::ChunkEntry) * ChunksHeader->ChunkCount);
- const ExportProjectCommand::ChunkEntry* ChunkEntries =
- reinterpret_cast<const ExportProjectCommand::ChunkEntry*>(ChunkStoreEntriesMem.GetView().GetData());
-
- for (const std::string& OplogName : m_OplogNames)
- {
- ZEN_CONSOLE("Importing oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_SourcePath, m_HostName);
- std::string GetOplogRequest = fmt::format("{}/{}/oplog/{}", UrlBase, m_ProjectName, OplogName);
- Session.SetUrl(GetOplogRequest);
- cpr::Response OplogResponse = Session.Get();
- if (OplogResponse.status_code == static_cast<long>(zen::HttpResponseCode::NotFound))
- {
- OplogResponse = Session.Post();
- if (!ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest))
- {
- return 1;
- }
- ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest);
- OplogResponse = Session.Get();
- if (!ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest))
- {
- return 1;
- }
- }
-
- zen::BasicFile OpStore;
- OpStore.Open(ExportProjectCommand::GetOplogPath(m_SourcePath, OplogName), zen::BasicFile::Mode::kRead);
- ExportProjectCommand::OplogHeader OplogHeader;
- OpStore.Read(&OplogHeader, sizeof(ExportProjectCommand::OplogHeader), 0);
- if (OplogHeader.Magic != ExportProjectCommand::OplogHeader::kMagic ||
- OplogHeader.HeaderSize != sizeof(ExportProjectCommand::OplogHeader))
- {
- ZEN_ERROR("Invalid oplog header");
- return 1;
- }
- zen::IoHash Checksum = OplogHeader.Checksum;
- std::vector<ExportProjectCommand::OplogEntry> OpEntries;
- OpEntries.resize(OplogHeader.OpCount);
- OpStore.Read(OpEntries.data(),
- sizeof(ExportProjectCommand::OplogEntry) * OplogHeader.OpCount,
- sizeof(ExportProjectCommand::OplogHeader));
-
- ZEN_CONSOLE("Constructing oplog with {} ops with checksum '{}' for '{}/{}'",
- OpEntries.size(),
- OplogHeader.Checksum,
- m_ProjectName,
- OplogName);
- zen::IoHashStream Hasher;
- zen::CbObjectWriter Request;
- Request.BeginArray("entries"sv);
- for (auto& OpEntry : OpEntries)
- {
- zen::IoBuffer CoreData(OpEntry.OpLength);
- OpStore.Read(CoreData.MutableData(), OpEntry.OpLength, OpEntry.Offset);
- Hasher.Append(CoreData.GetView());
- zen::SharedBuffer SharedCoreData(CoreData);
-
- zen::CbObject Op(SharedCoreData);
- Request << Op;
- }
- Request.EndArray();
- zen::IoHash CalculatedChecksum = Hasher.GetHash();
- if (CalculatedChecksum != Checksum)
- {
- ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum);
- return 1;
- }
- Request.AddHash("checksum"sv, Checksum);
-
- zen::CbPackage RequestPackage;
- RequestPackage.SetObject(Request.Save());
-
- ZEN_CONSOLE("Assembling {} attachments", ChunksHeader->ChunkCount);
- std::vector<zen::CbAttachment> Attachments;
- Attachments.reserve(ChunksHeader->ChunkCount);
- uint32_t ReadBlockIndex = 0;
- zen::IoBuffer BlockStore = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksPath(m_SourcePath, ReadBlockIndex));
- for (uint64_t ChunkIndex = 0; ChunkIndex < ChunksHeader->ChunkCount; ++ChunkIndex)
- {
- const ExportProjectCommand::ChunkEntry& ChunkEntry = ChunkEntries[ChunkIndex];
- if (ChunkEntry.Offset == ~0ull)
- {
- zen::IoBuffer ChunkBuffer =
- zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetLargeChunkPath(m_SourcePath, ChunkEntry.ChunkHash));
- zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer));
- ZEN_ASSERT(Chunk);
- Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash));
- }
- else
- {
- uint32_t BlockIndex = gsl::narrow<uint32_t>(ChunkEntry.Offset / BlockSize);
- if (BlockIndex != ReadBlockIndex)
- {
- ReadBlockIndex = BlockIndex;
- BlockStore = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksPath(m_SourcePath, ReadBlockIndex));
- ZEN_ASSERT(BlockStore);
- }
- size_t BlockOffset = BlockIndex * BlockSize;
- size_t AttachmentBulkOffset = ChunkEntry.Offset - BlockOffset;
- zen::IoBuffer ChunkBuffer(BlockStore, AttachmentBulkOffset, ChunkEntry.Length);
- zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer));
- Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash));
- }
- }
- RequestPackage.AddAttachments(Attachments);
-
- ZEN_CONSOLE("Sending oplog with {} ops and {} attachments for '{}/{}' to {}",
- OpEntries.size(),
- ChunksHeader->ChunkCount,
- m_ProjectName,
- OplogName,
- m_HostName);
- std::vector<zen::IoBuffer> RequestPayload = zen::FormatPackageMessage(RequestPackage, zen::FormatFlags::kAllowLocalReferences);
- std::vector<zen::SharedBuffer> Parts;
- Parts.reserve(RequestPayload.size());
- for (const auto& I : RequestPayload)
- {
- Parts.emplace_back(zen::SharedBuffer(I));
- }
- zen::CompositeBuffer Cmp(std::move(Parts));
- zen::CompressedBuffer CompressedRequest = zen::CompressedBuffer::Compress(Cmp);
-
- std::string AppendOplogRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName);
- Session.SetUrl(AppendOplogRequest);
-
- zen::IoBuffer TmpBuffer = CompressedRequest.GetCompressed().Flatten().AsIoBuffer();
- Session.SetBody(cpr::Body{(const char*)TmpBuffer.GetData(), TmpBuffer.GetSize()});
- cpr::Response Response = Session.Post();
- if (!ExportProjectCommand::IsSuccess(Response, AppendOplogRequest))
- {
- return 1;
- }
-
- ZEN_CONSOLE("Imported {} ops and {} chunks", OpEntries.size(), ChunksHeader->ChunkCount);
- }
- return 0;
-}
diff --git a/zen/cmds/importproject.h b/zen/cmds/importproject.h
deleted file mode 100644
index 8d79f06fd..000000000
--- a/zen/cmds/importproject.h
+++ /dev/null
@@ -1,22 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include "../zen.h"
-
-class ImportProjectCommand : public ZenCmdBase
-{
-public:
- ImportProjectCommand();
- ~ImportProjectCommand();
-
- virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
- virtual cxxopts::Options& Options() override { return m_Options; }
-
-private:
- cxxopts::Options m_Options{"import-project", "Import project oplogs from disk"};
- std::string m_HostName;
- std::string m_SourcePath;
- std::string m_ProjectName;
- std::vector<std::string> m_OplogNames;
-};
diff --git a/zen/cmds/projectstore.cpp b/zen/cmds/projectstore.cpp
index d83bff53c..03e1130cb 100644
--- a/zen/cmds/projectstore.cpp
+++ b/zen/cmds/projectstore.cpp
@@ -2,17 +2,135 @@
#include "projectstore.h"
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalue.h>
+#include <zencore/compress.h>
#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/iohash.h>
#include <zencore/logging.h>
+#include <zencore/stream.h>
+#include <zencore/uid.h>
+#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
+#include <zenhttp/httpshared.h>
+#include <zenutil/basicfile.h>
#include <zenutil/zenserverprocess.h>
#include <memory>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
+#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
+namespace {
+struct OplogHeader
+{
+ enum uint32
+ {
+ kMagic = 0x7816'B013
+ };
+ uint32_t Magic = kMagic;
+ uint32_t HeaderSize = sizeof(OplogHeader);
+ uint64_t OpCount = 0;
+ zen::IoHash Checksum = zen::IoHash::Zero;
+};
+struct OplogEntry
+{
+ uint64_t Offset;
+ uint32_t OpLength;
+};
+
+struct ChunksHeader
+{
+ enum uint32
+ {
+ kMagic = 0x574C'B016
+ };
+ uint32_t Magic = kMagic;
+ uint64_t ChunkCount = 0;
+ uint8_t BlockSizeShift = 31u;
+ uint8_t Reserved1 = 0;
+ uint16_t Reserved2 = 0;
+ uint32_t Reserved3 = 0;
+};
+struct ChunkEntry
+{
+ zen::IoHash ChunkHash;
+ uint64_t Offset;
+ uint64_t Length;
+};
+
+std::filesystem::path
+GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog)
+{
+ return RootPath / (Oplog + ".ops");
+}
+
+std::filesystem::path
+GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash)
+{
+ zen::ExtendablePathBuilder<128> ShardedPath;
+ ShardedPath.Append(RootPath.c_str());
+ zen::ExtendableStringBuilder<64> HashString;
+ OpHash.ToHexString(HashString);
+ const char* str = HashString.c_str();
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str, str + 3);
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 3, str + 5);
+
+ ShardedPath.AppendSeparator();
+ ShardedPath.AppendAsciiRange(str + 5, str + 40);
+
+ return ShardedPath.ToPath();
+}
+
+std::filesystem::path
+GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName)
+{
+ return RootPath / (std::string(ProjectName) + ".zcb");
+}
+
+std::filesystem::path
+GetChunksIndexPath(const std::filesystem::path RootPath)
+{
+ return RootPath / "chunks.idx";
+}
+
+std::filesystem::path
+GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex)
+{
+ return RootPath / fmt::format("chunks{}.bin", BlockIndex);
+}
+
+bool
+IsSuccess(const cpr::Response& Response, const std::string_view Operation)
+{
+ if (!zen::IsHttpSuccessCode(Response.status_code))
+ {
+ if (Response.status_code)
+ {
+ ZEN_ERROR("{} failed: {}: {} ({})", Operation, Response.status_code, Response.reason, Response.text);
+ }
+ else
+ {
+ ZEN_ERROR("{} failed: {}", Operation, Response.error.message);
+ }
+
+ return false;
+ }
+ return true;
+}
+
+} // namespace
+
+///////////////////////////////////////
+
DropProjectCommand::DropProjectCommand()
{
m_Options.add_options()("h,help", "Print help");
@@ -71,6 +189,8 @@ DropProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
return 1;
}
+///////////////////////////////////////
+
ProjectInfoCommand::ProjectInfoCommand()
{
m_Options.add_options()("h,help", "Print help");
@@ -130,3 +250,502 @@ ProjectInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
return 1;
}
+
+///////////////////////////////////////
+
+ExportProjectCommand::ExportProjectCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
+ m_Options.add_option("", "t", "target", "Target path", cxxopts::value(m_TargetPath), "<targetpath>");
+ m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>");
+ m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>");
+ m_Options.parse_positional({"target", "project", "oplog"});
+}
+
+ExportProjectCommand::~ExportProjectCommand() = default;
+
+int
+ExportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ using namespace std::literals;
+
+ ZEN_UNUSED(GlobalOptions);
+
+ if (!ParseOptions(argc, argv))
+ {
+ return 0;
+ }
+
+ if (m_ProjectName.empty())
+ {
+ ZEN_ERROR("Project name must be given");
+ return 1;
+ }
+
+ if (m_TargetPath.empty())
+ {
+ ZEN_ERROR("Target path must be given");
+ return 1;
+ }
+
+ if (!std::filesystem::exists(m_TargetPath))
+ {
+ zen::CreateDirectories(m_TargetPath);
+ }
+ else if (!std::filesystem::is_directory(m_TargetPath))
+ {
+ ZEN_ERROR("Target path '{}' is not a directory", m_TargetPath);
+ return 1;
+ }
+
+ const std::string UrlBase = fmt::format("{}/prj", m_HostName);
+
+ cpr::Session Session;
+ {
+ ZEN_CONSOLE("Requesting project '{}' from '{}'", m_ProjectName, m_HostName);
+
+ std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName);
+ Session.SetUrl({ProjectRequest});
+ cpr::Response Response = Session.Get();
+ if (!IsSuccess(Response, ProjectRequest))
+ {
+ return 1;
+ }
+ zen::IoBuffer Payload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ zen::BasicFile ProjectStore;
+ ProjectStore.Open(GetProjectPath(m_TargetPath, m_ProjectName), zen::BasicFile::Mode::kTruncate);
+ ProjectStore.Write(Payload.GetView(), 0);
+
+ if (m_OplogNames.empty())
+ {
+ zen::CbObject Params = LoadCompactBinaryObject(Payload);
+ zen ::CbArrayView Oplogs = Params["oplogs"sv].AsArrayView();
+ for (auto& OplogEntry : Oplogs)
+ {
+ std::string_view OpLog = OplogEntry.AsObjectView()["id"sv].AsString();
+ m_OplogNames.push_back(std::string(OpLog));
+ }
+ }
+ }
+
+ std::unordered_set<zen::IoHash, zen::IoHash::Hasher> UniqueChunks;
+ std::vector<zen::CbAttachment> AllAttachments;
+ std::vector<zen::CbPackage> OplogResponses;
+ for (const std::string& OplogName : m_OplogNames)
+ {
+ ZEN_CONSOLE("Requesting oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_HostName, m_TargetPath);
+
+ std::string GetOplogArchiveRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName);
+ Session.SetUrl({GetOplogArchiveRequest});
+ Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}});
+ cpr::Response Response = Session.Get();
+ if (!IsSuccess(Response, GetOplogArchiveRequest))
+ {
+ return 1;
+ }
+ zen::IoBuffer CompressedPayload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ zen::IoBuffer Payload = zen::CompressedBuffer::FromCompressedNoValidate(std::move(CompressedPayload)).Decompress().AsIoBuffer();
+
+ OplogResponses.emplace_back(zen::ParsePackageMessage(Payload));
+ zen::CbPackage& ResponsePackage = OplogResponses.back();
+ zen::CbObject Result = ResponsePackage.GetObject();
+
+ zen::IoHash Checksum = Result["checksum"sv].AsHash();
+ zen ::CbArrayView Entries = Result["entries"sv].AsArrayView();
+
+ ZEN_CONSOLE("Exporting {} ops for oplog '{}/{}' with checksum '{}' to '{}'",
+ Entries.Num(),
+ m_ProjectName,
+ OplogName,
+ Checksum,
+ m_TargetPath);
+ {
+ zen::BasicFile OpStore;
+ OpStore.Open(GetOplogPath(m_TargetPath, OplogName), zen::BasicFile::Mode::kTruncate);
+ OplogHeader Header = {.OpCount = Entries.Num(), .Checksum = Checksum};
+ OpStore.Write(&Header, sizeof(OplogHeader), 0);
+ std::vector<OplogEntry> OpEntries;
+ OpEntries.resize(Entries.Num());
+ const uint64_t DataOffset = sizeof(OplogHeader) + OpEntries.size() * sizeof(OplogEntry);
+ uint64_t BulkOffset = DataOffset;
+
+ zen::IoHashStream Hasher;
+
+ for (uint64_t OpIndex = 0; auto& OpEntry : Entries)
+ {
+ zen::BinaryWriter Writer;
+ OpEntry.CopyTo(Writer);
+ zen::MemoryView OpView = Writer.GetView();
+ Hasher.Append(OpView);
+
+ OpEntries[OpIndex].Offset = BulkOffset;
+ OpEntries[OpIndex].OpLength = gsl::narrow<uint32_t>(OpView.GetSize());
+ OpStore.Write(OpView, BulkOffset);
+ BulkOffset += OpView.GetSize();
+ OpIndex++;
+ }
+ zen::IoHash CalculatedChecksum = Hasher.GetHash();
+ if (CalculatedChecksum != Checksum)
+ {
+ ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum);
+ return 1;
+ }
+ OpStore.Write(OpEntries.data(), OpEntries.size() * sizeof(OplogEntry), sizeof(OplogHeader));
+ }
+
+ std::span<const zen::CbAttachment> Attachments = ResponsePackage.GetAttachments();
+ AllAttachments.reserve(AllAttachments.size() + Attachments.size());
+ AllAttachments.reserve(UniqueChunks.size() + Attachments.size());
+ for (const zen::CbAttachment& Attachment : Attachments)
+ {
+ if (UniqueChunks.insert(Attachment.GetHash()).second)
+ {
+ AllAttachments.push_back(Attachment);
+ }
+ }
+
+ ZEN_CONSOLE("Exported {} ops referencing {} chunks for {}", Entries.Num(), Attachments.size(), OplogName);
+ }
+
+ size_t ChunkCount = AllAttachments.size();
+ zen::BasicFile ChunkStoreIndex;
+ ChunkStoreIndex.Open(GetChunksIndexPath(m_TargetPath), zen::BasicFile::Mode::kTruncate);
+ ChunksHeader Header = {.ChunkCount = ChunkCount};
+ ChunkStoreIndex.Write(&Header, sizeof(ChunksHeader), 0);
+ std::vector<ChunkEntry> ChunkEntries;
+ ChunkEntries.resize(ChunkCount);
+ uint64_t ChunkOffset = 0;
+
+ zen::WorkerThreadPool WorkerPool(std::thread::hardware_concurrency());
+ std::atomic_int64_t JobCount = 0;
+ std::vector<size_t> BlockChunkIndexes;
+ const size_t BlockSize = 1ull << Header.BlockSizeShift;
+ uint32_t CurrentBlockIndex = 0;
+
+ auto WriteBlockAsync = [](const std::string& TargetPath,
+ size_t WriteBlockOffset,
+ uint32_t BlockIndex,
+ const std::vector<size_t>& BlockChunkIndexes,
+ const std::vector<ChunkEntry>& ChunkEntries,
+ const std::vector<zen::CbAttachment>& Attachments,
+ zen::WorkerThreadPool& WorkerPool,
+ std::atomic_int64_t& JobCount) {
+ JobCount.fetch_add(1);
+ WorkerPool.ScheduleWork([&TargetPath, WriteBlockOffset, BlockIndex, BlockChunkIndexes, &ChunkEntries, &Attachments, &JobCount]() {
+ zen::BasicFile ChunkBlock;
+ ChunkBlock.Open(GetChunksPath(TargetPath, BlockIndex), zen::BasicFile::Mode::kTruncate);
+ for (size_t ChunkIndex : BlockChunkIndexes)
+ {
+ const ChunkEntry& Chunk = ChunkEntries[ChunkIndex];
+ zen::CompositeBuffer AttachmentBody = Attachments[ChunkIndex].AsCompressedBinary().GetCompressed();
+ size_t AttachmentBulkOffset = Chunk.Offset - WriteBlockOffset;
+ for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments())
+ {
+ size_t SegmentSize = Segment.GetSize();
+ ChunkBlock.Write(Segment.GetData(), Segment.GetSize(), AttachmentBulkOffset);
+ AttachmentBulkOffset += SegmentSize;
+ }
+ }
+ JobCount.fetch_add(-1);
+ });
+ };
+
+ ZEN_CONSOLE("Exporting {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath);
+ for (size_t ChunkIndex = 0; const zen::CbAttachment& Attachment : AllAttachments)
+ {
+ ChunkEntry& Chunk = ChunkEntries[ChunkIndex];
+ Chunk.ChunkHash = Attachment.GetHash();
+ zen::CompositeBuffer AttachmentBody = Attachment.AsCompressedBinary().GetCompressed();
+ Chunk.Length = AttachmentBody.GetSize();
+
+ if (Chunk.Length < 1 * 1024 * 1024) // Use reasonable length for file
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((ChunkOffset + Chunk.Length) / BlockSize);
+ if (BlockIndex != CurrentBlockIndex)
+ {
+ size_t WriteBlockOffset = CurrentBlockIndex * BlockSize;
+ WriteBlockAsync(m_TargetPath,
+ WriteBlockOffset,
+ CurrentBlockIndex,
+ BlockChunkIndexes,
+ ChunkEntries,
+ AllAttachments,
+ WorkerPool,
+ JobCount);
+
+ ChunkOffset = BlockIndex * BlockSize;
+ CurrentBlockIndex = BlockIndex;
+ BlockChunkIndexes.clear();
+ }
+
+ Chunk.Offset = ChunkOffset;
+ ChunkOffset = Chunk.Offset + Chunk.Length;
+ BlockChunkIndexes.push_back(ChunkIndex);
+ }
+ else
+ {
+ Chunk.Offset = ~0ull;
+ JobCount.fetch_add(1);
+ WorkerPool.ScheduleWork([this, AttachmentBody, &Chunk, &JobCount]() {
+ std::filesystem::path Path = GetLargeChunkPath(m_TargetPath, Chunk.ChunkHash);
+ zen::CreateDirectories(Path.parent_path());
+ zen::BasicFile ChunkFile;
+ ChunkFile.Open(Path, zen::BasicFile::Mode::kTruncate);
+ uint64_t Offset = 0;
+ for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments())
+ {
+ size_t SegmentSize = Segment.GetSize();
+ ChunkFile.Write(Segment.GetData(), Segment.GetSize(), Offset);
+ Offset += SegmentSize;
+ }
+ JobCount.fetch_add(-1);
+ });
+ }
+ ChunkIndex++;
+ }
+ if (!BlockChunkIndexes.empty())
+ {
+ size_t WriteBlockOffset = CurrentBlockIndex * BlockSize;
+ WriteBlockAsync(m_TargetPath,
+ WriteBlockOffset,
+ CurrentBlockIndex,
+ BlockChunkIndexes,
+ ChunkEntries,
+ AllAttachments,
+ WorkerPool,
+ JobCount);
+ }
+
+ while (JobCount.load())
+ {
+ zen::Sleep(1);
+ }
+
+ ChunkStoreIndex.Write(ChunkEntries.data(), ChunkEntries.size() * sizeof(ChunkEntry), sizeof(ChunksHeader));
+
+ ZEN_CONSOLE("Exported {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath);
+
+ return 0;
+}
+
+////////////////////////////
+
+ImportProjectCommand::ImportProjectCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
+ m_Options.add_option("", "s", "source", "Source path", cxxopts::value(m_SourcePath), "<sourcepath>");
+ m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>");
+ m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>");
+ m_Options.parse_positional({"source", "project", "oplog"});
+}
+
+ImportProjectCommand::~ImportProjectCommand() = default;
+
+int
+ImportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ using namespace std::literals;
+
+ ZEN_UNUSED(GlobalOptions);
+
+ if (!ParseOptions(argc, argv))
+ {
+ return 0;
+ }
+
+ if (m_ProjectName.empty())
+ {
+ ZEN_ERROR("Project name must be given");
+ return 1;
+ }
+
+ if (m_SourcePath.empty())
+ {
+ ZEN_ERROR("Source path must be given");
+ return 1;
+ }
+
+ if (!std::filesystem::is_directory(m_SourcePath))
+ {
+ ZEN_ERROR("Source path '{}' is not a directory", m_SourcePath);
+ return 1;
+ }
+
+ const std::string UrlBase = fmt::format("{}/prj", m_HostName);
+
+ cpr::Session Session;
+
+ {
+ ZEN_CONSOLE("Requesting project '{}' from '{}'", m_ProjectName, m_HostName);
+
+ zen::BasicFile ProjectStore;
+ ProjectStore.Open(GetProjectPath(m_SourcePath, m_ProjectName), zen::BasicFile::Mode::kRead);
+ zen::IoBuffer Payload = ProjectStore.ReadAll();
+
+ std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName);
+ Session.SetUrl({ProjectRequest});
+ Session.SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()});
+ cpr::Response Response = Session.Post();
+ if (!IsSuccess(Response, ProjectRequest))
+ {
+ return 1;
+ }
+ if (m_OplogNames.empty())
+ {
+ zen::DirectoryContent Content;
+ zen::GetDirectoryContent(m_SourcePath, zen::DirectoryContent::IncludeFilesFlag, Content);
+ for (auto& File : Content.Files)
+ {
+ if (File.extension() == ".ops")
+ {
+ m_OplogNames.push_back(File.stem().string());
+ }
+ }
+ }
+ }
+
+ zen::IoBuffer ChunkStoreIndex = zen::IoBufferBuilder::MakeFromFile(GetChunksIndexPath(m_SourcePath));
+ zen::IoBuffer ChunkStoreHeaderMem(ChunkStoreIndex, 0, sizeof(ChunksHeader));
+ const ChunksHeader* Header = reinterpret_cast<const ChunksHeader*>(ChunkStoreHeaderMem.GetView().GetData());
+
+ if (Header->Magic != ChunksHeader::kMagic)
+ {
+ ZEN_ERROR("Invalid chunk index header");
+ return 1;
+ }
+ const size_t BlockSize = 1ull << Header->BlockSizeShift;
+
+ zen::IoBuffer ChunkStoreEntriesMem(ChunkStoreIndex, sizeof(Header), sizeof(ChunkEntry) * Header->ChunkCount);
+ const ChunkEntry* ChunkEntries = reinterpret_cast<const ChunkEntry*>(ChunkStoreEntriesMem.GetView().GetData());
+
+ for (const std::string& OplogName : m_OplogNames)
+ {
+ ZEN_CONSOLE("Importing oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_SourcePath, m_HostName);
+ std::string GetOplogRequest = fmt::format("{}/{}/oplog/{}", UrlBase, m_ProjectName, OplogName);
+ Session.SetUrl(GetOplogRequest);
+ cpr::Response OplogResponse = Session.Get();
+ if (OplogResponse.status_code == static_cast<long>(zen::HttpResponseCode::NotFound))
+ {
+ OplogResponse = Session.Post();
+ if (!IsSuccess(OplogResponse, GetOplogRequest))
+ {
+ return 1;
+ }
+ IsSuccess(OplogResponse, GetOplogRequest);
+ OplogResponse = Session.Get();
+ if (!IsSuccess(OplogResponse, GetOplogRequest))
+ {
+ return 1;
+ }
+ }
+
+ zen::BasicFile OpStore;
+ OpStore.Open(GetOplogPath(m_SourcePath, OplogName), zen::BasicFile::Mode::kRead);
+ OplogHeader OplogHeader;
+ OpStore.Read(&OplogHeader, sizeof(OplogHeader), 0);
+ if (OplogHeader.Magic != OplogHeader::kMagic || OplogHeader.HeaderSize != sizeof(OplogHeader))
+ {
+ ZEN_ERROR("Invalid oplog header");
+ return 1;
+ }
+ zen::IoHash Checksum = OplogHeader.Checksum;
+ std::vector<OplogEntry> OpEntries;
+ OpEntries.resize(OplogHeader.OpCount);
+ OpStore.Read(OpEntries.data(), sizeof(OplogEntry) * OplogHeader.OpCount, sizeof(OplogHeader));
+
+ ZEN_CONSOLE("Constructing oplog with {} ops with checksum '{}' for '{}/{}'",
+ OpEntries.size(),
+ OplogHeader.Checksum,
+ m_ProjectName,
+ OplogName);
+ zen::IoHashStream Hasher;
+ zen::CbObjectWriter Request;
+ Request.BeginArray("entries"sv);
+ for (auto& OpEntry : OpEntries)
+ {
+ zen::IoBuffer CoreData(OpEntry.OpLength);
+ OpStore.Read(CoreData.MutableData(), OpEntry.OpLength, OpEntry.Offset);
+ Hasher.Append(CoreData.GetView());
+ zen::SharedBuffer SharedCoreData(CoreData);
+
+ zen::CbObject Op(SharedCoreData);
+ Request << Op;
+ }
+ Request.EndArray();
+ zen::IoHash CalculatedChecksum = Hasher.GetHash();
+ if (CalculatedChecksum != Checksum)
+ {
+ ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum);
+ return 1;
+ }
+ Request.AddHash("checksum"sv, Checksum);
+
+ zen::CbPackage RequestPackage;
+ RequestPackage.SetObject(Request.Save());
+
+ ZEN_CONSOLE("Assembling {} attachments", Header->ChunkCount);
+ std::vector<zen::CbAttachment> Attachments;
+ Attachments.reserve(Header->ChunkCount);
+ uint32_t ReadBlockIndex = 0;
+ zen::IoBuffer BlockStore = zen::IoBufferBuilder::MakeFromFile(GetChunksPath(m_SourcePath, ReadBlockIndex));
+ for (uint64_t ChunkIndex = 0; ChunkIndex < Header->ChunkCount; ++ChunkIndex)
+ {
+ const ChunkEntry& ChunkEntry = ChunkEntries[ChunkIndex];
+ if (ChunkEntry.Offset == ~0ull)
+ {
+ zen::IoBuffer ChunkBuffer = zen::IoBufferBuilder::MakeFromFile(GetLargeChunkPath(m_SourcePath, ChunkEntry.ChunkHash));
+ zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer));
+ ZEN_ASSERT(Chunk);
+ Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash));
+ }
+ else
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>(ChunkEntry.Offset / BlockSize);
+ if (BlockIndex != ReadBlockIndex)
+ {
+ ReadBlockIndex = BlockIndex;
+ BlockStore = zen::IoBufferBuilder::MakeFromFile(GetChunksPath(m_SourcePath, ReadBlockIndex));
+ ZEN_ASSERT(BlockStore);
+ }
+ size_t BlockOffset = BlockIndex * BlockSize;
+ size_t AttachmentBulkOffset = ChunkEntry.Offset - BlockOffset;
+ zen::IoBuffer ChunkBuffer(BlockStore, AttachmentBulkOffset, ChunkEntry.Length);
+ zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer));
+ Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash));
+ }
+ }
+ RequestPackage.AddAttachments(Attachments);
+
+ ZEN_CONSOLE("Sending oplog with {} ops and {} attachments for '{}/{}' to {}",
+ OpEntries.size(),
+ Header->ChunkCount,
+ m_ProjectName,
+ OplogName,
+ m_HostName);
+ std::vector<zen::IoBuffer> RequestPayload = zen::FormatPackageMessage(RequestPackage, zen::FormatFlags::kAllowLocalReferences);
+ std::vector<zen::SharedBuffer> Parts;
+ Parts.reserve(RequestPayload.size());
+ for (const auto& I : RequestPayload)
+ {
+ Parts.emplace_back(zen::SharedBuffer(I));
+ }
+ zen::CompositeBuffer Cmp(std::move(Parts));
+ zen::CompressedBuffer CompressedRequest = zen::CompressedBuffer::Compress(Cmp);
+
+ std::string AppendOplogRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName);
+ Session.SetUrl(AppendOplogRequest);
+
+ zen::IoBuffer TmpBuffer = CompressedRequest.GetCompressed().Flatten().AsIoBuffer();
+ Session.SetBody(cpr::Body{(const char*)TmpBuffer.GetData(), TmpBuffer.GetSize()});
+ cpr::Response Response = Session.Post();
+ if (!IsSuccess(Response, AppendOplogRequest))
+ {
+ return 1;
+ }
+
+ ZEN_CONSOLE("Imported {} ops and {} chunks", OpEntries.size(), Header->ChunkCount);
+ }
+ return 0;
+}
diff --git a/zen/cmds/projectstore.h b/zen/cmds/projectstore.h
index 98e60cb17..5d42afd81 100644
--- a/zen/cmds/projectstore.h
+++ b/zen/cmds/projectstore.h
@@ -34,3 +34,37 @@ private:
std::string m_ProjectName;
std::string m_OplogName;
};
+
+class ExportProjectCommand : public ZenCmdBase
+{
+public:
+ ExportProjectCommand();
+ ~ExportProjectCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+
+private:
+ cxxopts::Options m_Options{"export-project", "Export one or more project oplogs to disk"};
+ std::string m_HostName;
+ std::string m_TargetPath;
+ std::string m_ProjectName;
+ std::vector<std::string> m_OplogNames;
+};
+
+class ImportProjectCommand : public ZenCmdBase
+{
+public:
+ ImportProjectCommand();
+ ~ImportProjectCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+
+private:
+ cxxopts::Options m_Options{"import-project", "Import project oplogs from disk"};
+ std::string m_HostName;
+ std::string m_SourcePath;
+ std::string m_ProjectName;
+ std::vector<std::string> m_OplogNames;
+};
diff --git a/zen/zen.cpp b/zen/zen.cpp
index e02f3a510..180dea5f4 100644
--- a/zen/zen.cpp
+++ b/zen/zen.cpp
@@ -9,9 +9,7 @@
#include "cmds/cache.h"
#include "cmds/copy.h"
#include "cmds/dedup.h"
-#include "cmds/exportproject.h"
#include "cmds/hash.h"
-#include "cmds/importproject.h"
#include "cmds/print.h"
#include "cmds/projectstore.h"
#include "cmds/scrub.h"
@@ -20,6 +18,7 @@
#include "cmds/up.h"
#include "cmds/version.h"
+#include <zencore/filesystem.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
@@ -137,7 +136,7 @@ main(int argc, char** argv)
#endif
zen::logging::InitializeLogging();
- MaximizeOpenFileCount();
+ zen::MaximizeOpenFileCount();
//////////////////////////////////////////////////////////////////////////
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore/projectstore.cpp
index 216a8cbf6..216a8cbf6 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore/projectstore.cpp
diff --git a/zenserver/projectstore.h b/zenserver/projectstore/projectstore.h
index 8267bd9e0..8267bd9e0 100644
--- a/zenserver/projectstore.h
+++ b/zenserver/projectstore/projectstore.h
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 526e27152..cf771c6de 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -108,7 +108,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include "frontend/frontend.h"
#include "monitoring/httpstats.h"
#include "monitoring/httpstatus.h"
-#include "projectstore.h"
+#include "projectstore/projectstore.h"
#include "testing/httptest.h"
#include "upstream/upstream.h"
#include "zenstore/gc.h"