diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-01 01:00:23 -0800 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-01 01:00:23 -0800 |
| commit | 149e8636b2965ec0cea0e25285a7d90e312d2b71 (patch) | |
| tree | 243b56d4b15c50e29c323d87c624dd2026e63a58 | |
| parent | fix formatting of zenutil/include/zenutil/zenserverprocess.h (diff) | |
| download | zen-149e8636b2965ec0cea0e25285a7d90e312d2b71.tar.xz zen-149e8636b2965ec0cea0e25285a7d90e312d2b71.zip | |
Clean up project store file structure (#218)
* move project store to separate folder
* moved import/export project commands into projectstore cmd files
| -rw-r--r-- | zen/cmds/exportproject.cpp | 362 | ||||
| -rw-r--r-- | zen/cmds/exportproject.h | 79 | ||||
| -rw-r--r-- | zen/cmds/importproject.cpp | 249 | ||||
| -rw-r--r-- | zen/cmds/importproject.h | 22 | ||||
| -rw-r--r-- | zen/cmds/projectstore.cpp | 619 | ||||
| -rw-r--r-- | zen/cmds/projectstore.h | 34 | ||||
| -rw-r--r-- | zen/zen.cpp | 5 | ||||
| -rw-r--r-- | zenserver/projectstore/projectstore.cpp (renamed from zenserver/projectstore.cpp) | 0 | ||||
| -rw-r--r-- | zenserver/projectstore/projectstore.h (renamed from zenserver/projectstore.h) | 0 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 2 |
10 files changed, 656 insertions, 716 deletions
diff --git a/zen/cmds/exportproject.cpp b/zen/cmds/exportproject.cpp deleted file mode 100644 index 6925c9f03..000000000 --- a/zen/cmds/exportproject.cpp +++ /dev/null @@ -1,362 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "exportproject.h" - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalue.h> -#include <zencore/compress.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/stream.h> -#include <zencore/uid.h> -#include <zencore/workthreadpool.h> -#include <zenhttp/httpcommon.h> -#include <zenhttp/httpshared.h> -#include <zenutil/basicfile.h> - -#include <memory> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> -#include <gsl/gsl-lite.hpp> -ZEN_THIRD_PARTY_INCLUDES_END - -ExportProjectCommand::ExportProjectCommand() -{ - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); - m_Options.add_option("", "t", "target", "Target path", cxxopts::value(m_TargetPath), "<targetpath>"); - m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>"); - m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>"); - m_Options.parse_positional({"target", "project", "oplog"}); -} - -ExportProjectCommand::~ExportProjectCommand() = default; - -bool -ExportProjectCommand::IsSuccess(const cpr::Response& Response, const std::string_view Operation) -{ - if (!zen::IsHttpSuccessCode(Response.status_code)) - { - if (Response.status_code) - { - ZEN_ERROR("{} failed: {}: {} ({})", Operation, Response.status_code, Response.reason, Response.text); - } - else - { - ZEN_ERROR("{} failed: {}", Operation, Response.error.message); - } - - return false; - } - return true; -} - -int -ExportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) -{ - using namespace std::literals; - - ZEN_UNUSED(GlobalOptions); - - if (!ParseOptions(argc, argv)) - { - return 0; - } - - if (m_ProjectName.empty()) - { - ZEN_ERROR("Project name must be given"); - return 1; - } - - if (m_TargetPath.empty()) - { - ZEN_ERROR("Target path must be given"); - return 1; - } - - if (!std::filesystem::exists(m_TargetPath)) - { - zen::CreateDirectories(m_TargetPath); - } - else if (!std::filesystem::is_directory(m_TargetPath)) - { - ZEN_ERROR("Target path '{}' is not a directory", m_TargetPath); - return 1; - } - - const std::string UrlBase = fmt::format("{}/prj", m_HostName); - - cpr::Session Session; - { - ZEN_CONSOLE("Requesting project '{}' from '{}'", m_ProjectName, m_HostName); - - std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName); - Session.SetUrl({ProjectRequest}); - cpr::Response Response = Session.Get(); - if (!IsSuccess(Response, ProjectRequest)) - { - return 1; - } - zen::IoBuffer Payload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - zen::BasicFile ProjectStore; - ProjectStore.Open(GetProjectPath(m_TargetPath, m_ProjectName), zen::BasicFile::Mode::kTruncate); - ProjectStore.Write(Payload.GetView(), 0); - - if (m_OplogNames.empty()) - { - zen::CbObject Params = LoadCompactBinaryObject(Payload); - zen ::CbArrayView Oplogs = Params["oplogs"sv].AsArrayView(); - for (auto& OplogEntry : Oplogs) - { - std::string_view OpLog = OplogEntry.AsObjectView()["id"sv].AsString(); - m_OplogNames.push_back(std::string(OpLog)); - } - } - } - - std::unordered_set<zen::IoHash, zen::IoHash::Hasher> UniqueChunks; - std::vector<zen::CbAttachment> AllAttachments; - std::vector<zen::CbPackage> OplogResponses; - for (const std::string& OplogName : m_OplogNames) - { - ZEN_CONSOLE("Requesting oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_HostName, m_TargetPath); - - std::string GetOplogArchiveRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName); - Session.SetUrl({GetOplogArchiveRequest}); - Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); - cpr::Response Response = Session.Get(); - if (!IsSuccess(Response, GetOplogArchiveRequest)) - { - return 1; - } - zen::IoBuffer CompressedPayload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - zen::IoBuffer Payload = zen::CompressedBuffer::FromCompressedNoValidate(std::move(CompressedPayload)).Decompress().AsIoBuffer(); - - OplogResponses.emplace_back(zen::ParsePackageMessage(Payload)); - zen::CbPackage& ResponsePackage = OplogResponses.back(); - zen::CbObject Result = ResponsePackage.GetObject(); - - zen::IoHash Checksum = Result["checksum"sv].AsHash(); - zen ::CbArrayView Entries = Result["entries"sv].AsArrayView(); - - ZEN_CONSOLE("Exporting {} ops for oplog '{}/{}' with checksum '{}' to '{}'", - Entries.Num(), - m_ProjectName, - OplogName, - Checksum, - m_TargetPath); - { - zen::BasicFile OpStore; - OpStore.Open(GetOplogPath(m_TargetPath, OplogName), zen::BasicFile::Mode::kTruncate); - OplogHeader Header = {.OpCount = Entries.Num(), .Checksum = Checksum}; - OpStore.Write(&Header, sizeof(OplogHeader), 0); - std::vector<OplogEntry> OpEntries; - OpEntries.resize(Entries.Num()); - const uint64_t DataOffset = sizeof(OplogHeader) + OpEntries.size() * sizeof(OplogEntry); - uint64_t BulkOffset = DataOffset; - - zen::IoHashStream Hasher; - - for (uint64_t OpIndex = 0; auto& OpEntry : Entries) - { - zen::BinaryWriter Writer; - OpEntry.CopyTo(Writer); - zen::MemoryView OpView = Writer.GetView(); - Hasher.Append(OpView); - - OpEntries[OpIndex].Offset = BulkOffset; - OpEntries[OpIndex].OpLength = gsl::narrow<uint32_t>(OpView.GetSize()); - OpStore.Write(OpView, BulkOffset); - BulkOffset += OpView.GetSize(); - OpIndex++; - } - zen::IoHash CalculatedChecksum = Hasher.GetHash(); - if (CalculatedChecksum != Checksum) - { - ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); - return 1; - } - OpStore.Write(OpEntries.data(), OpEntries.size() * sizeof(OplogEntry), sizeof(OplogHeader)); - } - - std::span<const zen::CbAttachment> Attachments = ResponsePackage.GetAttachments(); - AllAttachments.reserve(AllAttachments.size() + Attachments.size()); - AllAttachments.reserve(UniqueChunks.size() + Attachments.size()); - for (const zen::CbAttachment& Attachment : Attachments) - { - if (UniqueChunks.insert(Attachment.GetHash()).second) - { - AllAttachments.push_back(Attachment); - } - } - - ZEN_CONSOLE("Exported {} ops referencing {} chunks for {}", Entries.Num(), Attachments.size(), OplogName); - } - - size_t ChunkCount = AllAttachments.size(); - zen::BasicFile ChunkStoreIndex; - ChunkStoreIndex.Open(GetChunksIndexPath(m_TargetPath), zen::BasicFile::Mode::kTruncate); - ChunksHeader Header = {.ChunkCount = ChunkCount}; - ChunkStoreIndex.Write(&Header, sizeof(ChunksHeader), 0); - std::vector<ChunkEntry> ChunkEntries; - ChunkEntries.resize(ChunkCount); - uint64_t ChunkOffset = 0; - - zen::WorkerThreadPool WorkerPool(std::thread::hardware_concurrency()); - std::atomic_int64_t JobCount = 0; - std::vector<size_t> BlockChunkIndexes; - const size_t BlockSize = 1ull << Header.BlockSizeShift; - uint32_t CurrentBlockIndex = 0; - - auto WriteBlockAsync = [](const std::string& TargetPath, - size_t WriteBlockOffset, - uint32_t BlockIndex, - const std::vector<size_t>& BlockChunkIndexes, - const std::vector<ChunkEntry>& ChunkEntries, - const std::vector<zen::CbAttachment>& Attachments, - zen::WorkerThreadPool& WorkerPool, - std::atomic_int64_t& JobCount) { - JobCount.fetch_add(1); - WorkerPool.ScheduleWork([&TargetPath, WriteBlockOffset, BlockIndex, BlockChunkIndexes, &ChunkEntries, &Attachments, &JobCount]() { - zen::BasicFile ChunkBlock; - ChunkBlock.Open(GetChunksPath(TargetPath, BlockIndex), zen::BasicFile::Mode::kTruncate); - for (size_t ChunkIndex : BlockChunkIndexes) - { - const ChunkEntry& Chunk = ChunkEntries[ChunkIndex]; - zen::CompositeBuffer AttachmentBody = Attachments[ChunkIndex].AsCompressedBinary().GetCompressed(); - size_t AttachmentBulkOffset = Chunk.Offset - WriteBlockOffset; - for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments()) - { - size_t SegmentSize = Segment.GetSize(); - ChunkBlock.Write(Segment.GetData(), Segment.GetSize(), AttachmentBulkOffset); - AttachmentBulkOffset += SegmentSize; - } - } - JobCount.fetch_add(-1); - }); - }; - - ZEN_CONSOLE("Exporting {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath); - for (size_t ChunkIndex = 0; const zen::CbAttachment& Attachment : AllAttachments) - { - ChunkEntry& Chunk = ChunkEntries[ChunkIndex]; - Chunk.ChunkHash = Attachment.GetHash(); - zen::CompositeBuffer AttachmentBody = Attachment.AsCompressedBinary().GetCompressed(); - Chunk.Length = AttachmentBody.GetSize(); - - if (Chunk.Length < 1 * 1024 * 1024) // Use reasonable length for file - { - uint32_t BlockIndex = gsl::narrow<uint32_t>((ChunkOffset + Chunk.Length) / BlockSize); - if (BlockIndex != CurrentBlockIndex) - { - size_t WriteBlockOffset = CurrentBlockIndex * BlockSize; - WriteBlockAsync(m_TargetPath, - WriteBlockOffset, - CurrentBlockIndex, - BlockChunkIndexes, - ChunkEntries, - AllAttachments, - WorkerPool, - JobCount); - - ChunkOffset = BlockIndex * BlockSize; - CurrentBlockIndex = BlockIndex; - BlockChunkIndexes.clear(); - } - - Chunk.Offset = ChunkOffset; - ChunkOffset = Chunk.Offset + Chunk.Length; - BlockChunkIndexes.push_back(ChunkIndex); - } - else - { - Chunk.Offset = ~0ull; - JobCount.fetch_add(1); - WorkerPool.ScheduleWork([this, AttachmentBody, &Chunk, &JobCount]() { - std::filesystem::path Path = GetLargeChunkPath(m_TargetPath, Chunk.ChunkHash); - zen::CreateDirectories(Path.parent_path()); - zen::BasicFile ChunkFile; - ChunkFile.Open(Path, zen::BasicFile::Mode::kTruncate); - uint64_t Offset = 0; - for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments()) - { - size_t SegmentSize = Segment.GetSize(); - ChunkFile.Write(Segment.GetData(), Segment.GetSize(), Offset); - Offset += SegmentSize; - } - JobCount.fetch_add(-1); - }); - } - ChunkIndex++; - } - if (!BlockChunkIndexes.empty()) - { - size_t WriteBlockOffset = CurrentBlockIndex * BlockSize; - WriteBlockAsync(m_TargetPath, - WriteBlockOffset, - CurrentBlockIndex, - BlockChunkIndexes, - ChunkEntries, - AllAttachments, - WorkerPool, - JobCount); - } - - while (JobCount.load()) - { - zen::Sleep(1); - } - - ChunkStoreIndex.Write(ChunkEntries.data(), ChunkEntries.size() * sizeof(ChunkEntry), sizeof(ChunksHeader)); - - ZEN_CONSOLE("Exported {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath); - - return 0; -} - -std::filesystem::path -ExportProjectCommand::GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog) -{ - return RootPath / (Oplog + ".ops"); -} - -std::filesystem::path -ExportProjectCommand::GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash) -{ - zen::ExtendablePathBuilder<128> ShardedPath; - ShardedPath.Append(RootPath.c_str()); - zen::ExtendableStringBuilder<64> HashString; - OpHash.ToHexString(HashString); - const char* str = HashString.c_str(); - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str, str + 3); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 3, str + 5); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 5, str + 40); - - return ShardedPath.ToPath(); -} - -std::filesystem::path -ExportProjectCommand::GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName) -{ - return RootPath / (std::string(ProjectName) + ".zcb"); -} - -std::filesystem::path -ExportProjectCommand::GetChunksIndexPath(const std::filesystem::path RootPath) -{ - return RootPath / "chunks.idx"; -} - -std::filesystem::path -ExportProjectCommand::GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex) -{ - return RootPath / fmt::format("chunks{}.bin", BlockIndex); -} diff --git a/zen/cmds/exportproject.h b/zen/cmds/exportproject.h deleted file mode 100644 index 3fe7a2263..000000000 --- a/zen/cmds/exportproject.h +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "../zen.h" - -#include <zencore/filesystem.h> -#include <zencore/iohash.h> - -#include <functional> - -namespace zen { -class CbObjectView; -} -namespace cpr { -class Response; -} - -class ExportProjectCommand : public ZenCmdBase -{ -public: - ExportProjectCommand(); - ~ExportProjectCommand(); - - virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; - virtual cxxopts::Options& Options() override { return m_Options; } - - struct OplogHeader - { - enum uint32 - { - kMagic = 0x7816'B013 - }; - uint32_t Magic = kMagic; - uint32_t HeaderSize = sizeof(OplogHeader); - uint64_t OpCount = 0; - zen::IoHash Checksum = zen::IoHash::Zero; - }; - struct OplogEntry - { - uint64_t Offset; - uint32_t OpLength; - }; - - struct ChunksHeader - { - enum uint32 - { - kMagic = 0x574C'B016 - }; - uint32_t Magic = kMagic; - uint64_t ChunkCount = 0; - uint8_t BlockSizeShift = 31u; - uint8_t Reserved1 = 0; - uint16_t Reserved2 = 0; - uint32_t Reserved3 = 0; - }; - struct ChunkEntry - { - zen::IoHash ChunkHash; - uint64_t Offset; - uint64_t Length; - }; - - static std::filesystem::path GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog); - static std::filesystem::path GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash); - static std::filesystem::path GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName); - static std::filesystem::path GetChunksIndexPath(const std::filesystem::path RootPath); - static std::filesystem::path GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex); - - static bool IsSuccess(const cpr::Response& Response, const std::string_view Operation); - -private: - cxxopts::Options m_Options{"export-project", "Export one or more project oplogs to disk"}; - std::string m_HostName; - std::string m_TargetPath; - std::string m_ProjectName; - std::vector<std::string> m_OplogNames; -}; diff --git a/zen/cmds/importproject.cpp b/zen/cmds/importproject.cpp deleted file mode 100644 index 3a2605ebb..000000000 --- a/zen/cmds/importproject.cpp +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "importproject.h" -#include "exportproject.h" - -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinarypackage.h> -#include <zencore/compress.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/stream.h> -#include <zenhttp/httpcommon.h> -#include <zenhttp/httpshared.h> -#include <zenutil/basicfile.h> - -#include <memory> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> -ZEN_THIRD_PARTY_INCLUDES_END - -ImportProjectCommand::ImportProjectCommand() -{ - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); - m_Options.add_option("", "s", "source", "Source path", cxxopts::value(m_SourcePath), "<sourcepath>"); - m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>"); - m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>"); - m_Options.parse_positional({"source", "project", "oplog"}); -} - -ImportProjectCommand::~ImportProjectCommand() = default; - -int -ImportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) -{ - using namespace std::literals; - - ZEN_UNUSED(GlobalOptions); - - if (!ParseOptions(argc, argv)) - { - return 0; - } - - if (m_ProjectName.empty()) - { - ZEN_ERROR("Project name must be given"); - return 1; - } - - if (m_SourcePath.empty()) - { - ZEN_ERROR("Source path must be given"); - return 1; - } - - if (!std::filesystem::is_directory(m_SourcePath)) - { - ZEN_ERROR("Source path '{}' is not a directory", m_SourcePath); - return 1; - } - - const std::string UrlBase = fmt::format("{}/prj", m_HostName); - - cpr::Session Session; - - { - ZEN_CONSOLE("Requesting project '{}' from '{}'", m_ProjectName, m_HostName); - - zen::BasicFile ProjectStore; - ProjectStore.Open(ExportProjectCommand::GetProjectPath(m_SourcePath, m_ProjectName), zen::BasicFile::Mode::kRead); - zen::IoBuffer Payload = ProjectStore.ReadAll(); - - std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName); - Session.SetUrl({ProjectRequest}); - Session.SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()}); - cpr::Response Response = Session.Post(); - if (!ExportProjectCommand::IsSuccess(Response, ProjectRequest)) - { - return 1; - } - if (m_OplogNames.empty()) - { - zen::DirectoryContent Content; - zen::GetDirectoryContent(m_SourcePath, zen::DirectoryContent::IncludeFilesFlag, Content); - for (auto& File : Content.Files) - { - if (File.extension() == ".ops") - { - m_OplogNames.push_back(File.stem().string()); - } - } - } - } - - zen::IoBuffer ChunkStoreIndex = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksIndexPath(m_SourcePath)); - zen::IoBuffer ChunkStoreHeaderMem(ChunkStoreIndex, 0, sizeof(ExportProjectCommand::ChunksHeader)); - const ExportProjectCommand::ChunksHeader* ChunksHeader = - reinterpret_cast<const ExportProjectCommand::ChunksHeader*>(ChunkStoreHeaderMem.GetView().GetData()); - - if (ChunksHeader->Magic != ExportProjectCommand::ChunksHeader::kMagic) - { - ZEN_ERROR("Invalid chunk index header"); - return 1; - } - const size_t BlockSize = 1ull << ChunksHeader->BlockSizeShift; - - zen::IoBuffer ChunkStoreEntriesMem(ChunkStoreIndex, - sizeof(ExportProjectCommand::ChunksHeader), - sizeof(ExportProjectCommand::ChunkEntry) * ChunksHeader->ChunkCount); - const ExportProjectCommand::ChunkEntry* ChunkEntries = - reinterpret_cast<const ExportProjectCommand::ChunkEntry*>(ChunkStoreEntriesMem.GetView().GetData()); - - for (const std::string& OplogName : m_OplogNames) - { - ZEN_CONSOLE("Importing oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_SourcePath, m_HostName); - std::string GetOplogRequest = fmt::format("{}/{}/oplog/{}", UrlBase, m_ProjectName, OplogName); - Session.SetUrl(GetOplogRequest); - cpr::Response OplogResponse = Session.Get(); - if (OplogResponse.status_code == static_cast<long>(zen::HttpResponseCode::NotFound)) - { - OplogResponse = Session.Post(); - if (!ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest)) - { - return 1; - } - ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest); - OplogResponse = Session.Get(); - if (!ExportProjectCommand::IsSuccess(OplogResponse, GetOplogRequest)) - { - return 1; - } - } - - zen::BasicFile OpStore; - OpStore.Open(ExportProjectCommand::GetOplogPath(m_SourcePath, OplogName), zen::BasicFile::Mode::kRead); - ExportProjectCommand::OplogHeader OplogHeader; - OpStore.Read(&OplogHeader, sizeof(ExportProjectCommand::OplogHeader), 0); - if (OplogHeader.Magic != ExportProjectCommand::OplogHeader::kMagic || - OplogHeader.HeaderSize != sizeof(ExportProjectCommand::OplogHeader)) - { - ZEN_ERROR("Invalid oplog header"); - return 1; - } - zen::IoHash Checksum = OplogHeader.Checksum; - std::vector<ExportProjectCommand::OplogEntry> OpEntries; - OpEntries.resize(OplogHeader.OpCount); - OpStore.Read(OpEntries.data(), - sizeof(ExportProjectCommand::OplogEntry) * OplogHeader.OpCount, - sizeof(ExportProjectCommand::OplogHeader)); - - ZEN_CONSOLE("Constructing oplog with {} ops with checksum '{}' for '{}/{}'", - OpEntries.size(), - OplogHeader.Checksum, - m_ProjectName, - OplogName); - zen::IoHashStream Hasher; - zen::CbObjectWriter Request; - Request.BeginArray("entries"sv); - for (auto& OpEntry : OpEntries) - { - zen::IoBuffer CoreData(OpEntry.OpLength); - OpStore.Read(CoreData.MutableData(), OpEntry.OpLength, OpEntry.Offset); - Hasher.Append(CoreData.GetView()); - zen::SharedBuffer SharedCoreData(CoreData); - - zen::CbObject Op(SharedCoreData); - Request << Op; - } - Request.EndArray(); - zen::IoHash CalculatedChecksum = Hasher.GetHash(); - if (CalculatedChecksum != Checksum) - { - ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); - return 1; - } - Request.AddHash("checksum"sv, Checksum); - - zen::CbPackage RequestPackage; - RequestPackage.SetObject(Request.Save()); - - ZEN_CONSOLE("Assembling {} attachments", ChunksHeader->ChunkCount); - std::vector<zen::CbAttachment> Attachments; - Attachments.reserve(ChunksHeader->ChunkCount); - uint32_t ReadBlockIndex = 0; - zen::IoBuffer BlockStore = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksPath(m_SourcePath, ReadBlockIndex)); - for (uint64_t ChunkIndex = 0; ChunkIndex < ChunksHeader->ChunkCount; ++ChunkIndex) - { - const ExportProjectCommand::ChunkEntry& ChunkEntry = ChunkEntries[ChunkIndex]; - if (ChunkEntry.Offset == ~0ull) - { - zen::IoBuffer ChunkBuffer = - zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetLargeChunkPath(m_SourcePath, ChunkEntry.ChunkHash)); - zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); - ZEN_ASSERT(Chunk); - Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash)); - } - else - { - uint32_t BlockIndex = gsl::narrow<uint32_t>(ChunkEntry.Offset / BlockSize); - if (BlockIndex != ReadBlockIndex) - { - ReadBlockIndex = BlockIndex; - BlockStore = zen::IoBufferBuilder::MakeFromFile(ExportProjectCommand::GetChunksPath(m_SourcePath, ReadBlockIndex)); - ZEN_ASSERT(BlockStore); - } - size_t BlockOffset = BlockIndex * BlockSize; - size_t AttachmentBulkOffset = ChunkEntry.Offset - BlockOffset; - zen::IoBuffer ChunkBuffer(BlockStore, AttachmentBulkOffset, ChunkEntry.Length); - zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); - Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash)); - } - } - RequestPackage.AddAttachments(Attachments); - - ZEN_CONSOLE("Sending oplog with {} ops and {} attachments for '{}/{}' to {}", - OpEntries.size(), - ChunksHeader->ChunkCount, - m_ProjectName, - OplogName, - m_HostName); - std::vector<zen::IoBuffer> RequestPayload = zen::FormatPackageMessage(RequestPackage, zen::FormatFlags::kAllowLocalReferences); - std::vector<zen::SharedBuffer> Parts; - Parts.reserve(RequestPayload.size()); - for (const auto& I : RequestPayload) - { - Parts.emplace_back(zen::SharedBuffer(I)); - } - zen::CompositeBuffer Cmp(std::move(Parts)); - zen::CompressedBuffer CompressedRequest = zen::CompressedBuffer::Compress(Cmp); - - std::string AppendOplogRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName); - Session.SetUrl(AppendOplogRequest); - - zen::IoBuffer TmpBuffer = CompressedRequest.GetCompressed().Flatten().AsIoBuffer(); - Session.SetBody(cpr::Body{(const char*)TmpBuffer.GetData(), TmpBuffer.GetSize()}); - cpr::Response Response = Session.Post(); - if (!ExportProjectCommand::IsSuccess(Response, AppendOplogRequest)) - { - return 1; - } - - ZEN_CONSOLE("Imported {} ops and {} chunks", OpEntries.size(), ChunksHeader->ChunkCount); - } - return 0; -} diff --git a/zen/cmds/importproject.h b/zen/cmds/importproject.h deleted file mode 100644 index 8d79f06fd..000000000 --- a/zen/cmds/importproject.h +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include "../zen.h" - -class ImportProjectCommand : public ZenCmdBase -{ -public: - ImportProjectCommand(); - ~ImportProjectCommand(); - - virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; - virtual cxxopts::Options& Options() override { return m_Options; } - -private: - cxxopts::Options m_Options{"import-project", "Import project oplogs from disk"}; - std::string m_HostName; - std::string m_SourcePath; - std::string m_ProjectName; - std::vector<std::string> m_OplogNames; -}; diff --git a/zen/cmds/projectstore.cpp b/zen/cmds/projectstore.cpp index d83bff53c..03e1130cb 100644 --- a/zen/cmds/projectstore.cpp +++ b/zen/cmds/projectstore.cpp @@ -2,17 +2,135 @@ #include "projectstore.h" +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/compress.h> #include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/iohash.h> #include <zencore/logging.h> +#include <zencore/stream.h> +#include <zencore/uid.h> +#include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> +#include <zenhttp/httpshared.h> +#include <zenutil/basicfile.h> #include <zenutil/zenserverprocess.h> #include <memory> ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> +#include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END +namespace { +struct OplogHeader +{ + enum uint32 + { + kMagic = 0x7816'B013 + }; + uint32_t Magic = kMagic; + uint32_t HeaderSize = sizeof(OplogHeader); + uint64_t OpCount = 0; + zen::IoHash Checksum = zen::IoHash::Zero; +}; +struct OplogEntry +{ + uint64_t Offset; + uint32_t OpLength; +}; + +struct ChunksHeader +{ + enum uint32 + { + kMagic = 0x574C'B016 + }; + uint32_t Magic = kMagic; + uint64_t ChunkCount = 0; + uint8_t BlockSizeShift = 31u; + uint8_t Reserved1 = 0; + uint16_t Reserved2 = 0; + uint32_t Reserved3 = 0; +}; +struct ChunkEntry +{ + zen::IoHash ChunkHash; + uint64_t Offset; + uint64_t Length; +}; + +std::filesystem::path +GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog) +{ + return RootPath / (Oplog + ".ops"); +} + +std::filesystem::path +GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash) +{ + zen::ExtendablePathBuilder<128> ShardedPath; + ShardedPath.Append(RootPath.c_str()); + zen::ExtendableStringBuilder<64> HashString; + OpHash.ToHexString(HashString); + const char* str = HashString.c_str(); + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str, str + 3); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 3, str + 5); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 5, str + 40); + + return ShardedPath.ToPath(); +} + +std::filesystem::path +GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName) +{ + return RootPath / (std::string(ProjectName) + ".zcb"); +} + +std::filesystem::path +GetChunksIndexPath(const std::filesystem::path RootPath) +{ + return RootPath / "chunks.idx"; +} + +std::filesystem::path +GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex) +{ + return RootPath / fmt::format("chunks{}.bin", BlockIndex); +} + +bool +IsSuccess(const cpr::Response& Response, const std::string_view Operation) +{ + if (!zen::IsHttpSuccessCode(Response.status_code)) + { + if (Response.status_code) + { + ZEN_ERROR("{} failed: {}: {} ({})", Operation, Response.status_code, Response.reason, Response.text); + } + else + { + ZEN_ERROR("{} failed: {}", Operation, Response.error.message); + } + + return false; + } + return true; +} + +} // namespace + +/////////////////////////////////////// + DropProjectCommand::DropProjectCommand() { m_Options.add_options()("h,help", "Print help"); @@ -71,6 +189,8 @@ DropProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg return 1; } +/////////////////////////////////////// + ProjectInfoCommand::ProjectInfoCommand() { m_Options.add_options()("h,help", "Print help"); @@ -130,3 +250,502 @@ ProjectInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg return 1; } + +/////////////////////////////////////// + +ExportProjectCommand::ExportProjectCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); + m_Options.add_option("", "t", "target", "Target path", cxxopts::value(m_TargetPath), "<targetpath>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>"); + m_Options.parse_positional({"target", "project", "oplog"}); +} + +ExportProjectCommand::~ExportProjectCommand() = default; + +int +ExportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + if (m_ProjectName.empty()) + { + ZEN_ERROR("Project name must be given"); + return 1; + } + + if (m_TargetPath.empty()) + { + ZEN_ERROR("Target path must be given"); + return 1; + } + + if (!std::filesystem::exists(m_TargetPath)) + { + zen::CreateDirectories(m_TargetPath); + } + else if (!std::filesystem::is_directory(m_TargetPath)) + { + ZEN_ERROR("Target path '{}' is not a directory", m_TargetPath); + return 1; + } + + const std::string UrlBase = fmt::format("{}/prj", m_HostName); + + cpr::Session Session; + { + ZEN_CONSOLE("Requesting project '{}' from '{}'", m_ProjectName, m_HostName); + + std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName); + Session.SetUrl({ProjectRequest}); + cpr::Response Response = Session.Get(); + if (!IsSuccess(Response, ProjectRequest)) + { + return 1; + } + zen::IoBuffer Payload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + zen::BasicFile ProjectStore; + ProjectStore.Open(GetProjectPath(m_TargetPath, m_ProjectName), zen::BasicFile::Mode::kTruncate); + ProjectStore.Write(Payload.GetView(), 0); + + if (m_OplogNames.empty()) + { + zen::CbObject Params = LoadCompactBinaryObject(Payload); + zen ::CbArrayView Oplogs = Params["oplogs"sv].AsArrayView(); + for (auto& OplogEntry : Oplogs) + { + std::string_view OpLog = OplogEntry.AsObjectView()["id"sv].AsString(); + m_OplogNames.push_back(std::string(OpLog)); + } + } + } + + std::unordered_set<zen::IoHash, zen::IoHash::Hasher> UniqueChunks; + std::vector<zen::CbAttachment> AllAttachments; + std::vector<zen::CbPackage> OplogResponses; + for (const std::string& OplogName : m_OplogNames) + { + ZEN_CONSOLE("Requesting oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_HostName, m_TargetPath); + + std::string GetOplogArchiveRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName); + Session.SetUrl({GetOplogArchiveRequest}); + Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); + cpr::Response Response = Session.Get(); + if (!IsSuccess(Response, GetOplogArchiveRequest)) + { + return 1; + } + zen::IoBuffer CompressedPayload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + zen::IoBuffer Payload = zen::CompressedBuffer::FromCompressedNoValidate(std::move(CompressedPayload)).Decompress().AsIoBuffer(); + + OplogResponses.emplace_back(zen::ParsePackageMessage(Payload)); + zen::CbPackage& ResponsePackage = OplogResponses.back(); + zen::CbObject Result = ResponsePackage.GetObject(); + + zen::IoHash Checksum = Result["checksum"sv].AsHash(); + zen ::CbArrayView Entries = Result["entries"sv].AsArrayView(); + + ZEN_CONSOLE("Exporting {} ops for oplog '{}/{}' with checksum '{}' to '{}'", + Entries.Num(), + m_ProjectName, + OplogName, + Checksum, + m_TargetPath); + { + zen::BasicFile OpStore; + OpStore.Open(GetOplogPath(m_TargetPath, OplogName), zen::BasicFile::Mode::kTruncate); + OplogHeader Header = {.OpCount = Entries.Num(), .Checksum = Checksum}; + OpStore.Write(&Header, sizeof(OplogHeader), 0); + std::vector<OplogEntry> OpEntries; + OpEntries.resize(Entries.Num()); + const uint64_t DataOffset = sizeof(OplogHeader) + OpEntries.size() * sizeof(OplogEntry); + uint64_t BulkOffset = DataOffset; + + zen::IoHashStream Hasher; + + for (uint64_t OpIndex = 0; auto& OpEntry : Entries) + { + zen::BinaryWriter Writer; + OpEntry.CopyTo(Writer); + zen::MemoryView OpView = Writer.GetView(); + Hasher.Append(OpView); + + OpEntries[OpIndex].Offset = BulkOffset; + OpEntries[OpIndex].OpLength = gsl::narrow<uint32_t>(OpView.GetSize()); + OpStore.Write(OpView, BulkOffset); + BulkOffset += OpView.GetSize(); + OpIndex++; + } + zen::IoHash CalculatedChecksum = Hasher.GetHash(); + if (CalculatedChecksum != Checksum) + { + ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); + return 1; + } + OpStore.Write(OpEntries.data(), OpEntries.size() * sizeof(OplogEntry), sizeof(OplogHeader)); + } + + std::span<const zen::CbAttachment> Attachments = ResponsePackage.GetAttachments(); + AllAttachments.reserve(AllAttachments.size() + Attachments.size()); + AllAttachments.reserve(UniqueChunks.size() + Attachments.size()); + for (const zen::CbAttachment& Attachment : Attachments) + { + if (UniqueChunks.insert(Attachment.GetHash()).second) + { + AllAttachments.push_back(Attachment); + } + } + + ZEN_CONSOLE("Exported {} ops referencing {} chunks for {}", Entries.Num(), Attachments.size(), OplogName); + } + + size_t ChunkCount = AllAttachments.size(); + zen::BasicFile ChunkStoreIndex; + ChunkStoreIndex.Open(GetChunksIndexPath(m_TargetPath), zen::BasicFile::Mode::kTruncate); + ChunksHeader Header = {.ChunkCount = ChunkCount}; + ChunkStoreIndex.Write(&Header, sizeof(ChunksHeader), 0); + std::vector<ChunkEntry> ChunkEntries; + ChunkEntries.resize(ChunkCount); + uint64_t ChunkOffset = 0; + + zen::WorkerThreadPool WorkerPool(std::thread::hardware_concurrency()); + std::atomic_int64_t JobCount = 0; + std::vector<size_t> BlockChunkIndexes; + const size_t BlockSize = 1ull << Header.BlockSizeShift; + uint32_t CurrentBlockIndex = 0; + + auto WriteBlockAsync = [](const std::string& TargetPath, + size_t WriteBlockOffset, + uint32_t BlockIndex, + const std::vector<size_t>& BlockChunkIndexes, + const std::vector<ChunkEntry>& ChunkEntries, + const std::vector<zen::CbAttachment>& Attachments, + zen::WorkerThreadPool& WorkerPool, + std::atomic_int64_t& JobCount) { + JobCount.fetch_add(1); + WorkerPool.ScheduleWork([&TargetPath, WriteBlockOffset, BlockIndex, BlockChunkIndexes, &ChunkEntries, &Attachments, &JobCount]() { + zen::BasicFile ChunkBlock; + ChunkBlock.Open(GetChunksPath(TargetPath, BlockIndex), zen::BasicFile::Mode::kTruncate); + for (size_t ChunkIndex : BlockChunkIndexes) + { + const ChunkEntry& Chunk = ChunkEntries[ChunkIndex]; + zen::CompositeBuffer AttachmentBody = Attachments[ChunkIndex].AsCompressedBinary().GetCompressed(); + size_t AttachmentBulkOffset = Chunk.Offset - WriteBlockOffset; + for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments()) + { + size_t SegmentSize = Segment.GetSize(); + ChunkBlock.Write(Segment.GetData(), Segment.GetSize(), AttachmentBulkOffset); + AttachmentBulkOffset += SegmentSize; + } + } + JobCount.fetch_add(-1); + }); + }; + + ZEN_CONSOLE("Exporting {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath); + for (size_t ChunkIndex = 0; const zen::CbAttachment& Attachment : AllAttachments) + { + ChunkEntry& Chunk = ChunkEntries[ChunkIndex]; + Chunk.ChunkHash = Attachment.GetHash(); + zen::CompositeBuffer AttachmentBody = Attachment.AsCompressedBinary().GetCompressed(); + Chunk.Length = AttachmentBody.GetSize(); + + if (Chunk.Length < 1 * 1024 * 1024) // Use reasonable length for file + { + uint32_t BlockIndex = gsl::narrow<uint32_t>((ChunkOffset + Chunk.Length) / BlockSize); + if (BlockIndex != CurrentBlockIndex) + { + size_t WriteBlockOffset = CurrentBlockIndex * BlockSize; + WriteBlockAsync(m_TargetPath, + WriteBlockOffset, + CurrentBlockIndex, + BlockChunkIndexes, + ChunkEntries, + AllAttachments, + WorkerPool, + JobCount); + + ChunkOffset = BlockIndex * BlockSize; + CurrentBlockIndex = BlockIndex; + BlockChunkIndexes.clear(); + } + + Chunk.Offset = ChunkOffset; + ChunkOffset = Chunk.Offset + Chunk.Length; + BlockChunkIndexes.push_back(ChunkIndex); + } + else + { + Chunk.Offset = ~0ull; + JobCount.fetch_add(1); + WorkerPool.ScheduleWork([this, AttachmentBody, &Chunk, &JobCount]() { + std::filesystem::path Path = GetLargeChunkPath(m_TargetPath, Chunk.ChunkHash); + zen::CreateDirectories(Path.parent_path()); + zen::BasicFile ChunkFile; + ChunkFile.Open(Path, zen::BasicFile::Mode::kTruncate); + uint64_t Offset = 0; + for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments()) + { + size_t SegmentSize = Segment.GetSize(); + ChunkFile.Write(Segment.GetData(), Segment.GetSize(), Offset); + Offset += SegmentSize; + } + JobCount.fetch_add(-1); + }); + } + ChunkIndex++; + } + if (!BlockChunkIndexes.empty()) + { + size_t WriteBlockOffset = CurrentBlockIndex * BlockSize; + WriteBlockAsync(m_TargetPath, + WriteBlockOffset, + CurrentBlockIndex, + BlockChunkIndexes, + ChunkEntries, + AllAttachments, + WorkerPool, + JobCount); + } + + while (JobCount.load()) + { + zen::Sleep(1); + } + + ChunkStoreIndex.Write(ChunkEntries.data(), ChunkEntries.size() * sizeof(ChunkEntry), sizeof(ChunksHeader)); + + ZEN_CONSOLE("Exported {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath); + + return 0; +} + +//////////////////////////// + +ImportProjectCommand::ImportProjectCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); + m_Options.add_option("", "s", "source", "Source path", cxxopts::value(m_SourcePath), "<sourcepath>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>"); + m_Options.parse_positional({"source", "project", "oplog"}); +} + +ImportProjectCommand::~ImportProjectCommand() = default; + +int +ImportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + if (m_ProjectName.empty()) + { + ZEN_ERROR("Project name must be given"); + return 1; + } + + if (m_SourcePath.empty()) + { + ZEN_ERROR("Source path must be given"); + return 1; + } + + if (!std::filesystem::is_directory(m_SourcePath)) + { + ZEN_ERROR("Source path '{}' is not a directory", m_SourcePath); + return 1; + } + + const std::string UrlBase = fmt::format("{}/prj", m_HostName); + + cpr::Session Session; + + { + ZEN_CONSOLE("Requesting project '{}' from '{}'", m_ProjectName, m_HostName); + + zen::BasicFile ProjectStore; + ProjectStore.Open(GetProjectPath(m_SourcePath, m_ProjectName), zen::BasicFile::Mode::kRead); + zen::IoBuffer Payload = ProjectStore.ReadAll(); + + std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName); + Session.SetUrl({ProjectRequest}); + Session.SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()}); + cpr::Response Response = Session.Post(); + if (!IsSuccess(Response, ProjectRequest)) + { + return 1; + } + if (m_OplogNames.empty()) + { + zen::DirectoryContent Content; + zen::GetDirectoryContent(m_SourcePath, zen::DirectoryContent::IncludeFilesFlag, Content); + for (auto& File : Content.Files) + { + if (File.extension() == ".ops") + { + m_OplogNames.push_back(File.stem().string()); + } + } + } + } + + zen::IoBuffer ChunkStoreIndex = zen::IoBufferBuilder::MakeFromFile(GetChunksIndexPath(m_SourcePath)); + zen::IoBuffer ChunkStoreHeaderMem(ChunkStoreIndex, 0, sizeof(ChunksHeader)); + const ChunksHeader* Header = reinterpret_cast<const ChunksHeader*>(ChunkStoreHeaderMem.GetView().GetData()); + + if (Header->Magic != ChunksHeader::kMagic) + { + ZEN_ERROR("Invalid chunk index header"); + return 1; + } + const size_t BlockSize = 1ull << Header->BlockSizeShift; + + zen::IoBuffer ChunkStoreEntriesMem(ChunkStoreIndex, sizeof(Header), sizeof(ChunkEntry) * Header->ChunkCount); + const ChunkEntry* ChunkEntries = reinterpret_cast<const ChunkEntry*>(ChunkStoreEntriesMem.GetView().GetData()); + + for (const std::string& OplogName : m_OplogNames) + { + ZEN_CONSOLE("Importing oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_SourcePath, m_HostName); + std::string GetOplogRequest = fmt::format("{}/{}/oplog/{}", UrlBase, m_ProjectName, OplogName); + Session.SetUrl(GetOplogRequest); + cpr::Response OplogResponse = Session.Get(); + if (OplogResponse.status_code == static_cast<long>(zen::HttpResponseCode::NotFound)) + { + OplogResponse = Session.Post(); + if (!IsSuccess(OplogResponse, GetOplogRequest)) + { + return 1; + } + IsSuccess(OplogResponse, GetOplogRequest); + OplogResponse = Session.Get(); + if (!IsSuccess(OplogResponse, GetOplogRequest)) + { + return 1; + } + } + + zen::BasicFile OpStore; + OpStore.Open(GetOplogPath(m_SourcePath, OplogName), zen::BasicFile::Mode::kRead); + OplogHeader OplogHeader; + OpStore.Read(&OplogHeader, sizeof(OplogHeader), 0); + if (OplogHeader.Magic != OplogHeader::kMagic || OplogHeader.HeaderSize != sizeof(OplogHeader)) + { + ZEN_ERROR("Invalid oplog header"); + return 1; + } + zen::IoHash Checksum = OplogHeader.Checksum; + std::vector<OplogEntry> OpEntries; + OpEntries.resize(OplogHeader.OpCount); + OpStore.Read(OpEntries.data(), sizeof(OplogEntry) * OplogHeader.OpCount, sizeof(OplogHeader)); + + ZEN_CONSOLE("Constructing oplog with {} ops with checksum '{}' for '{}/{}'", + OpEntries.size(), + OplogHeader.Checksum, + m_ProjectName, + OplogName); + zen::IoHashStream Hasher; + zen::CbObjectWriter Request; + Request.BeginArray("entries"sv); + for (auto& OpEntry : OpEntries) + { + zen::IoBuffer CoreData(OpEntry.OpLength); + OpStore.Read(CoreData.MutableData(), OpEntry.OpLength, OpEntry.Offset); + Hasher.Append(CoreData.GetView()); + zen::SharedBuffer SharedCoreData(CoreData); + + zen::CbObject Op(SharedCoreData); + Request << Op; + } + Request.EndArray(); + zen::IoHash CalculatedChecksum = Hasher.GetHash(); + if (CalculatedChecksum != Checksum) + { + ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); + return 1; + } + Request.AddHash("checksum"sv, Checksum); + + zen::CbPackage RequestPackage; + RequestPackage.SetObject(Request.Save()); + + ZEN_CONSOLE("Assembling {} attachments", Header->ChunkCount); + std::vector<zen::CbAttachment> Attachments; + Attachments.reserve(Header->ChunkCount); + uint32_t ReadBlockIndex = 0; + zen::IoBuffer BlockStore = zen::IoBufferBuilder::MakeFromFile(GetChunksPath(m_SourcePath, ReadBlockIndex)); + for (uint64_t ChunkIndex = 0; ChunkIndex < Header->ChunkCount; ++ChunkIndex) + { + const ChunkEntry& ChunkEntry = ChunkEntries[ChunkIndex]; + if (ChunkEntry.Offset == ~0ull) + { + zen::IoBuffer ChunkBuffer = zen::IoBufferBuilder::MakeFromFile(GetLargeChunkPath(m_SourcePath, ChunkEntry.ChunkHash)); + zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); + ZEN_ASSERT(Chunk); + Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash)); + } + else + { + uint32_t BlockIndex = gsl::narrow<uint32_t>(ChunkEntry.Offset / BlockSize); + if (BlockIndex != ReadBlockIndex) + { + ReadBlockIndex = BlockIndex; + BlockStore = zen::IoBufferBuilder::MakeFromFile(GetChunksPath(m_SourcePath, ReadBlockIndex)); + ZEN_ASSERT(BlockStore); + } + size_t BlockOffset = BlockIndex * BlockSize; + size_t AttachmentBulkOffset = ChunkEntry.Offset - BlockOffset; + zen::IoBuffer ChunkBuffer(BlockStore, AttachmentBulkOffset, ChunkEntry.Length); + zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); + Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash)); + } + } + RequestPackage.AddAttachments(Attachments); + + ZEN_CONSOLE("Sending oplog with {} ops and {} attachments for '{}/{}' to {}", + OpEntries.size(), + Header->ChunkCount, + m_ProjectName, + OplogName, + m_HostName); + std::vector<zen::IoBuffer> RequestPayload = zen::FormatPackageMessage(RequestPackage, zen::FormatFlags::kAllowLocalReferences); + std::vector<zen::SharedBuffer> Parts; + Parts.reserve(RequestPayload.size()); + for (const auto& I : RequestPayload) + { + Parts.emplace_back(zen::SharedBuffer(I)); + } + zen::CompositeBuffer Cmp(std::move(Parts)); + zen::CompressedBuffer CompressedRequest = zen::CompressedBuffer::Compress(Cmp); + + std::string AppendOplogRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName); + Session.SetUrl(AppendOplogRequest); + + zen::IoBuffer TmpBuffer = CompressedRequest.GetCompressed().Flatten().AsIoBuffer(); + Session.SetBody(cpr::Body{(const char*)TmpBuffer.GetData(), TmpBuffer.GetSize()}); + cpr::Response Response = Session.Post(); + if (!IsSuccess(Response, AppendOplogRequest)) + { + return 1; + } + + ZEN_CONSOLE("Imported {} ops and {} chunks", OpEntries.size(), Header->ChunkCount); + } + return 0; +} diff --git a/zen/cmds/projectstore.h b/zen/cmds/projectstore.h index 98e60cb17..5d42afd81 100644 --- a/zen/cmds/projectstore.h +++ b/zen/cmds/projectstore.h @@ -34,3 +34,37 @@ private: std::string m_ProjectName; std::string m_OplogName; }; + +class ExportProjectCommand : public ZenCmdBase +{ +public: + ExportProjectCommand(); + ~ExportProjectCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{"export-project", "Export one or more project oplogs to disk"}; + std::string m_HostName; + std::string m_TargetPath; + std::string m_ProjectName; + std::vector<std::string> m_OplogNames; +}; + +class ImportProjectCommand : public ZenCmdBase +{ +public: + ImportProjectCommand(); + ~ImportProjectCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{"import-project", "Import project oplogs from disk"}; + std::string m_HostName; + std::string m_SourcePath; + std::string m_ProjectName; + std::vector<std::string> m_OplogNames; +}; diff --git a/zen/zen.cpp b/zen/zen.cpp index e02f3a510..180dea5f4 100644 --- a/zen/zen.cpp +++ b/zen/zen.cpp @@ -9,9 +9,7 @@ #include "cmds/cache.h" #include "cmds/copy.h" #include "cmds/dedup.h" -#include "cmds/exportproject.h" #include "cmds/hash.h" -#include "cmds/importproject.h" #include "cmds/print.h" #include "cmds/projectstore.h" #include "cmds/scrub.h" @@ -20,6 +18,7 @@ #include "cmds/up.h" #include "cmds/version.h" +#include <zencore/filesystem.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/string.h> @@ -137,7 +136,7 @@ main(int argc, char** argv) #endif zen::logging::InitializeLogging(); - MaximizeOpenFileCount(); + zen::MaximizeOpenFileCount(); ////////////////////////////////////////////////////////////////////////// diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore/projectstore.cpp index 216a8cbf6..216a8cbf6 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore/projectstore.cpp diff --git a/zenserver/projectstore.h b/zenserver/projectstore/projectstore.h index 8267bd9e0..8267bd9e0 100644 --- a/zenserver/projectstore.h +++ b/zenserver/projectstore/projectstore.h diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 526e27152..cf771c6de 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -108,7 +108,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include "frontend/frontend.h" #include "monitoring/httpstats.h" #include "monitoring/httpstatus.h" -#include "projectstore.h" +#include "projectstore/projectstore.h" #include "testing/httptest.h" #include "upstream/upstream.h" #include "zenstore/gc.h" |