diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-02 13:59:49 -0800 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-02 13:59:49 -0800 |
| commit | 43eca4b121b76988c1fcda2db42145fe6a22c62e (patch) | |
| tree | 4ffb3eff40e071c47a614d7ec8225f20f6484af5 | |
| parent | Add `project-create` and `oplog-create` to zen command line tool (#221) (diff) | |
| download | zen-43eca4b121b76988c1fcda2db42145fe6a22c62e.tar.xz zen-43eca4b121b76988c1fcda2db42145fe6a22c62e.zip | |
remove legacy `export-project` and `import-project` (#222)
| -rw-r--r-- | CHANGELOG.md | 2 | ||||
| -rw-r--r-- | zen/cmds/projectstore.cpp | 602 | ||||
| -rw-r--r-- | zen/cmds/projectstore.h | 34 | ||||
| -rw-r--r-- | zen/zen.cpp | 4 | ||||
| -rw-r--r-- | zenserver/projectstore/projectstore.cpp | 203 |
5 files changed, 2 insertions, 843 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bb224d8b..6de34c8ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ - Improvement: Faster oplog replay - reduces time to open an existing oplog - Improvement: Clearer error messages and logging when requests to project store fails - Changed: Removed remnants of old mesh experiment +- Changed: Remove obsolete export-project command +- Changed: Removed remnants import-project command ## 0.2.2 - Feature: Added info (GET) endpoints for structured cache diff --git a/zen/cmds/projectstore.cpp b/zen/cmds/projectstore.cpp index d31a11abc..c53f0bc35 100644 --- a/zen/cmds/projectstore.cpp +++ b/zen/cmds/projectstore.cpp @@ -26,109 +26,6 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <gsl/gsl-lite.hpp> ZEN_THIRD_PARTY_INCLUDES_END -namespace { -struct OplogHeader -{ - enum uint32 - { - kMagic = 0x7816'B013 - }; - uint32_t Magic = kMagic; - uint32_t HeaderSize = sizeof(OplogHeader); - uint64_t OpCount = 0; - zen::IoHash Checksum = zen::IoHash::Zero; -}; -struct OplogEntry -{ - uint64_t Offset; - uint32_t OpLength; -}; - -struct ChunksHeader -{ - enum uint32 - { - kMagic = 0x574C'B016 - }; - uint32_t Magic = kMagic; - uint64_t ChunkCount = 0; - uint8_t BlockSizeShift = 31u; - uint8_t Reserved1 = 0; - uint16_t Reserved2 = 0; - uint32_t Reserved3 = 0; -}; -struct ChunkEntry -{ - zen::IoHash ChunkHash; - uint64_t Offset; - uint64_t Length; -}; - -std::filesystem::path -GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog) -{ - return RootPath / (Oplog + ".ops"); -} - -std::filesystem::path -GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash) -{ - zen::ExtendablePathBuilder<128> ShardedPath; - ShardedPath.Append(RootPath.c_str()); - zen::ExtendableStringBuilder<64> HashString; - OpHash.ToHexString(HashString); - const char* str = HashString.c_str(); - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str, str + 3); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 3, str + 5); - - ShardedPath.AppendSeparator(); - ShardedPath.AppendAsciiRange(str + 5, str + 40); - - return ShardedPath.ToPath(); -} - -std::filesystem::path -GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName) -{ - return RootPath / (std::string(ProjectName) + ".zcb"); -} - -std::filesystem::path -GetChunksIndexPath(const std::filesystem::path RootPath) -{ - return RootPath / "chunks.idx"; -} - -std::filesystem::path -GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex) -{ - return RootPath / fmt::format("chunks{}.bin", BlockIndex); -} - -bool -IsSuccess(const cpr::Response& Response, const std::string_view Operation) -{ - if (!zen::IsHttpSuccessCode(Response.status_code)) - { - if (Response.status_code) - { - ZEN_ERROR("{} failed: {}: {} ({})", Operation, Response.status_code, Response.reason, Response.text); - } - else - { - ZEN_ERROR("{} failed: {}", Operation, Response.error.message); - } - - return false; - } - return true; -} - -} // namespace - /////////////////////////////////////// DropProjectCommand::DropProjectCommand() @@ -382,502 +279,3 @@ CreateOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg return GetReturnCode(Response); } - -/////////////////////////////////////// - -ExportProjectCommand::ExportProjectCommand() -{ - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); - m_Options.add_option("", "t", "target", "Target path", cxxopts::value(m_TargetPath), "<targetpath>"); - m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>"); - m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>"); - m_Options.parse_positional({"target", "project", "oplog"}); -} - -ExportProjectCommand::~ExportProjectCommand() = default; - -int -ExportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) -{ - using namespace std::literals; - - ZEN_UNUSED(GlobalOptions); - - if (!ParseOptions(argc, argv)) - { - return 0; - } - - if (m_ProjectName.empty()) - { - ZEN_ERROR("Project name must be given"); - return 1; - } - - if (m_TargetPath.empty()) - { - ZEN_ERROR("Target path must be given"); - return 1; - } - - if (!std::filesystem::exists(m_TargetPath)) - { - zen::CreateDirectories(m_TargetPath); - } - else if (!std::filesystem::is_directory(m_TargetPath)) - { - ZEN_ERROR("Target path '{}' is not a directory", m_TargetPath); - return 1; - } - - const std::string UrlBase = fmt::format("{}/prj", m_HostName); - - cpr::Session Session; - { - ZEN_CONSOLE("Requesting project '{}' from '{}'", m_ProjectName, m_HostName); - - std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName); - Session.SetUrl({ProjectRequest}); - cpr::Response Response = Session.Get(); - if (!IsSuccess(Response, ProjectRequest)) - { - return 1; - } - zen::IoBuffer Payload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - zen::BasicFile ProjectStore; - ProjectStore.Open(GetProjectPath(m_TargetPath, m_ProjectName), zen::BasicFile::Mode::kTruncate); - ProjectStore.Write(Payload.GetView(), 0); - - if (m_OplogNames.empty()) - { - zen::CbObject Params = LoadCompactBinaryObject(Payload); - zen ::CbArrayView Oplogs = Params["oplogs"sv].AsArrayView(); - for (auto& OplogEntry : Oplogs) - { - std::string_view OpLog = OplogEntry.AsObjectView()["id"sv].AsString(); - m_OplogNames.push_back(std::string(OpLog)); - } - } - } - - std::unordered_set<zen::IoHash, zen::IoHash::Hasher> UniqueChunks; - std::vector<zen::CbAttachment> AllAttachments; - std::vector<zen::CbPackage> OplogResponses; - for (const std::string& OplogName : m_OplogNames) - { - ZEN_CONSOLE("Requesting oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_HostName, m_TargetPath); - - std::string GetOplogArchiveRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName); - Session.SetUrl({GetOplogArchiveRequest}); - Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); - cpr::Response Response = Session.Get(); - if (!IsSuccess(Response, GetOplogArchiveRequest)) - { - return 1; - } - zen::IoBuffer CompressedPayload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - zen::IoBuffer Payload = zen::CompressedBuffer::FromCompressedNoValidate(std::move(CompressedPayload)).Decompress().AsIoBuffer(); - - OplogResponses.emplace_back(zen::ParsePackageMessage(Payload)); - zen::CbPackage& ResponsePackage = OplogResponses.back(); - zen::CbObject Result = ResponsePackage.GetObject(); - - zen::IoHash Checksum = Result["checksum"sv].AsHash(); - zen ::CbArrayView Entries = Result["entries"sv].AsArrayView(); - - ZEN_CONSOLE("Exporting {} ops for oplog '{}/{}' with checksum '{}' to '{}'", - Entries.Num(), - m_ProjectName, - OplogName, - Checksum, - m_TargetPath); - { - zen::BasicFile OpStore; - OpStore.Open(GetOplogPath(m_TargetPath, OplogName), zen::BasicFile::Mode::kTruncate); - OplogHeader Header = {.OpCount = Entries.Num(), .Checksum = Checksum}; - OpStore.Write(&Header, sizeof(OplogHeader), 0); - std::vector<OplogEntry> OpEntries; - OpEntries.resize(Entries.Num()); - const uint64_t DataOffset = sizeof(OplogHeader) + OpEntries.size() * sizeof(OplogEntry); - uint64_t BulkOffset = DataOffset; - - zen::IoHashStream Hasher; - - for (uint64_t OpIndex = 0; auto& OpEntry : Entries) - { - zen::BinaryWriter Writer; - OpEntry.CopyTo(Writer); - zen::MemoryView OpView = Writer.GetView(); - Hasher.Append(OpView); - - OpEntries[OpIndex].Offset = BulkOffset; - OpEntries[OpIndex].OpLength = gsl::narrow<uint32_t>(OpView.GetSize()); - OpStore.Write(OpView, BulkOffset); - BulkOffset += OpView.GetSize(); - OpIndex++; - } - zen::IoHash CalculatedChecksum = Hasher.GetHash(); - if (CalculatedChecksum != Checksum) - { - ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); - return 1; - } - OpStore.Write(OpEntries.data(), OpEntries.size() * sizeof(OplogEntry), sizeof(OplogHeader)); - } - - std::span<const zen::CbAttachment> Attachments = ResponsePackage.GetAttachments(); - AllAttachments.reserve(AllAttachments.size() + Attachments.size()); - AllAttachments.reserve(UniqueChunks.size() + Attachments.size()); - for (const zen::CbAttachment& Attachment : Attachments) - { - if (UniqueChunks.insert(Attachment.GetHash()).second) - { - AllAttachments.push_back(Attachment); - } - } - - ZEN_CONSOLE("Exported {} ops referencing {} chunks for {}", Entries.Num(), Attachments.size(), OplogName); - } - - size_t ChunkCount = AllAttachments.size(); - zen::BasicFile ChunkStoreIndex; - ChunkStoreIndex.Open(GetChunksIndexPath(m_TargetPath), zen::BasicFile::Mode::kTruncate); - ChunksHeader Header = {.ChunkCount = ChunkCount}; - ChunkStoreIndex.Write(&Header, sizeof(ChunksHeader), 0); - std::vector<ChunkEntry> ChunkEntries; - ChunkEntries.resize(ChunkCount); - uint64_t ChunkOffset = 0; - - zen::WorkerThreadPool WorkerPool(std::thread::hardware_concurrency()); - std::atomic_int64_t JobCount = 0; - std::vector<size_t> BlockChunkIndexes; - const size_t BlockSize = 1ull << Header.BlockSizeShift; - uint32_t CurrentBlockIndex = 0; - - auto WriteBlockAsync = [](const std::string& TargetPath, - size_t WriteBlockOffset, - uint32_t BlockIndex, - const std::vector<size_t>& BlockChunkIndexes, - const std::vector<ChunkEntry>& ChunkEntries, - const std::vector<zen::CbAttachment>& Attachments, - zen::WorkerThreadPool& WorkerPool, - std::atomic_int64_t& JobCount) { - JobCount.fetch_add(1); - WorkerPool.ScheduleWork([&TargetPath, WriteBlockOffset, BlockIndex, BlockChunkIndexes, &ChunkEntries, &Attachments, &JobCount]() { - zen::BasicFile ChunkBlock; - ChunkBlock.Open(GetChunksPath(TargetPath, BlockIndex), zen::BasicFile::Mode::kTruncate); - for (size_t ChunkIndex : BlockChunkIndexes) - { - const ChunkEntry& Chunk = ChunkEntries[ChunkIndex]; - zen::CompositeBuffer AttachmentBody = Attachments[ChunkIndex].AsCompressedBinary().GetCompressed(); - size_t AttachmentBulkOffset = Chunk.Offset - WriteBlockOffset; - for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments()) - { - size_t SegmentSize = Segment.GetSize(); - ChunkBlock.Write(Segment.GetData(), Segment.GetSize(), AttachmentBulkOffset); - AttachmentBulkOffset += SegmentSize; - } - } - JobCount.fetch_add(-1); - }); - }; - - ZEN_CONSOLE("Exporting {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath); - for (size_t ChunkIndex = 0; const zen::CbAttachment& Attachment : AllAttachments) - { - ChunkEntry& Chunk = ChunkEntries[ChunkIndex]; - Chunk.ChunkHash = Attachment.GetHash(); - zen::CompositeBuffer AttachmentBody = Attachment.AsCompressedBinary().GetCompressed(); - Chunk.Length = AttachmentBody.GetSize(); - - if (Chunk.Length < 1 * 1024 * 1024) // Use reasonable length for file - { - uint32_t BlockIndex = gsl::narrow<uint32_t>((ChunkOffset + Chunk.Length) / BlockSize); - if (BlockIndex != CurrentBlockIndex) - { - size_t WriteBlockOffset = CurrentBlockIndex * BlockSize; - WriteBlockAsync(m_TargetPath, - WriteBlockOffset, - CurrentBlockIndex, - BlockChunkIndexes, - ChunkEntries, - AllAttachments, - WorkerPool, - JobCount); - - ChunkOffset = BlockIndex * BlockSize; - CurrentBlockIndex = BlockIndex; - BlockChunkIndexes.clear(); - } - - Chunk.Offset = ChunkOffset; - ChunkOffset = Chunk.Offset + Chunk.Length; - BlockChunkIndexes.push_back(ChunkIndex); - } - else - { - Chunk.Offset = ~0ull; - JobCount.fetch_add(1); - WorkerPool.ScheduleWork([this, AttachmentBody, &Chunk, &JobCount]() { - std::filesystem::path Path = GetLargeChunkPath(m_TargetPath, Chunk.ChunkHash); - zen::CreateDirectories(Path.parent_path()); - zen::BasicFile ChunkFile; - ChunkFile.Open(Path, zen::BasicFile::Mode::kTruncate); - uint64_t Offset = 0; - for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments()) - { - size_t SegmentSize = Segment.GetSize(); - ChunkFile.Write(Segment.GetData(), Segment.GetSize(), Offset); - Offset += SegmentSize; - } - JobCount.fetch_add(-1); - }); - } - ChunkIndex++; - } - if (!BlockChunkIndexes.empty()) - { - size_t WriteBlockOffset = CurrentBlockIndex * BlockSize; - WriteBlockAsync(m_TargetPath, - WriteBlockOffset, - CurrentBlockIndex, - BlockChunkIndexes, - ChunkEntries, - AllAttachments, - WorkerPool, - JobCount); - } - - while (JobCount.load()) - { - zen::Sleep(1); - } - - ChunkStoreIndex.Write(ChunkEntries.data(), ChunkEntries.size() * sizeof(ChunkEntry), sizeof(ChunksHeader)); - - ZEN_CONSOLE("Exported {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath); - - return 0; -} - -//////////////////////////// - -ImportProjectCommand::ImportProjectCommand() -{ - m_Options.add_options()("h,help", "Print help"); - m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); - m_Options.add_option("", "s", "source", "Source path", cxxopts::value(m_SourcePath), "<sourcepath>"); - m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>"); - m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>"); - m_Options.parse_positional({"source", "project", "oplog"}); -} - -ImportProjectCommand::~ImportProjectCommand() = default; - -int -ImportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) -{ - using namespace std::literals; - - ZEN_UNUSED(GlobalOptions); - - if (!ParseOptions(argc, argv)) - { - return 0; - } - - if (m_ProjectName.empty()) - { - ZEN_ERROR("Project name must be given"); - return 1; - } - - if (m_SourcePath.empty()) - { - ZEN_ERROR("Source path must be given"); - return 1; - } - - if (!std::filesystem::is_directory(m_SourcePath)) - { - ZEN_ERROR("Source path '{}' is not a directory", m_SourcePath); - return 1; - } - - const std::string UrlBase = fmt::format("{}/prj", m_HostName); - - cpr::Session Session; - - { - ZEN_CONSOLE("Requesting project '{}' from '{}'", m_ProjectName, m_HostName); - - zen::BasicFile ProjectStore; - ProjectStore.Open(GetProjectPath(m_SourcePath, m_ProjectName), zen::BasicFile::Mode::kRead); - zen::IoBuffer Payload = ProjectStore.ReadAll(); - - std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName); - Session.SetUrl({ProjectRequest}); - Session.SetBody(cpr::Body{(const char*)Payload.GetData(), Payload.GetSize()}); - cpr::Response Response = Session.Post(); - if (!IsSuccess(Response, ProjectRequest)) - { - return 1; - } - if (m_OplogNames.empty()) - { - zen::DirectoryContent Content; - zen::GetDirectoryContent(m_SourcePath, zen::DirectoryContent::IncludeFilesFlag, Content); - for (auto& File : Content.Files) - { - if (File.extension() == ".ops") - { - m_OplogNames.push_back(File.stem().string()); - } - } - } - } - - zen::IoBuffer ChunkStoreIndex = zen::IoBufferBuilder::MakeFromFile(GetChunksIndexPath(m_SourcePath)); - zen::IoBuffer ChunkStoreHeaderMem(ChunkStoreIndex, 0, sizeof(ChunksHeader)); - const ChunksHeader* Header = reinterpret_cast<const ChunksHeader*>(ChunkStoreHeaderMem.GetView().GetData()); - - if (Header->Magic != ChunksHeader::kMagic) - { - ZEN_ERROR("Invalid chunk index header"); - return 1; - } - const size_t BlockSize = 1ull << Header->BlockSizeShift; - - zen::IoBuffer ChunkStoreEntriesMem(ChunkStoreIndex, sizeof(Header), sizeof(ChunkEntry) * Header->ChunkCount); - const ChunkEntry* ChunkEntries = reinterpret_cast<const ChunkEntry*>(ChunkStoreEntriesMem.GetView().GetData()); - - for (const std::string& OplogName : m_OplogNames) - { - ZEN_CONSOLE("Importing oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_SourcePath, m_HostName); - std::string GetOplogRequest = fmt::format("{}/{}/oplog/{}", UrlBase, m_ProjectName, OplogName); - Session.SetUrl(GetOplogRequest); - cpr::Response OplogResponse = Session.Get(); - if (OplogResponse.status_code == static_cast<long>(zen::HttpResponseCode::NotFound)) - { - OplogResponse = Session.Post(); - if (!IsSuccess(OplogResponse, GetOplogRequest)) - { - return 1; - } - IsSuccess(OplogResponse, GetOplogRequest); - OplogResponse = Session.Get(); - if (!IsSuccess(OplogResponse, GetOplogRequest)) - { - return 1; - } - } - - zen::BasicFile OpStore; - OpStore.Open(GetOplogPath(m_SourcePath, OplogName), zen::BasicFile::Mode::kRead); - OplogHeader OplogHeader; - OpStore.Read(&OplogHeader, sizeof(OplogHeader), 0); - if (OplogHeader.Magic != OplogHeader::kMagic || OplogHeader.HeaderSize != sizeof(OplogHeader)) - { - ZEN_ERROR("Invalid oplog header"); - return 1; - } - zen::IoHash Checksum = OplogHeader.Checksum; - std::vector<OplogEntry> OpEntries; - OpEntries.resize(OplogHeader.OpCount); - OpStore.Read(OpEntries.data(), sizeof(OplogEntry) * OplogHeader.OpCount, sizeof(OplogHeader)); - - ZEN_CONSOLE("Constructing oplog with {} ops with checksum '{}' for '{}/{}'", - OpEntries.size(), - OplogHeader.Checksum, - m_ProjectName, - OplogName); - zen::IoHashStream Hasher; - zen::CbObjectWriter Request; - Request.BeginArray("entries"sv); - for (auto& OpEntry : OpEntries) - { - zen::IoBuffer CoreData(OpEntry.OpLength); - OpStore.Read(CoreData.MutableData(), OpEntry.OpLength, OpEntry.Offset); - Hasher.Append(CoreData.GetView()); - zen::SharedBuffer SharedCoreData(CoreData); - - zen::CbObject Op(SharedCoreData); - Request << Op; - } - Request.EndArray(); - zen::IoHash CalculatedChecksum = Hasher.GetHash(); - if (CalculatedChecksum != Checksum) - { - ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); - return 1; - } - Request.AddHash("checksum"sv, Checksum); - - zen::CbPackage RequestPackage; - RequestPackage.SetObject(Request.Save()); - - ZEN_CONSOLE("Assembling {} attachments", Header->ChunkCount); - std::vector<zen::CbAttachment> Attachments; - Attachments.reserve(Header->ChunkCount); - uint32_t ReadBlockIndex = 0; - zen::IoBuffer BlockStore = zen::IoBufferBuilder::MakeFromFile(GetChunksPath(m_SourcePath, ReadBlockIndex)); - for (uint64_t ChunkIndex = 0; ChunkIndex < Header->ChunkCount; ++ChunkIndex) - { - const ChunkEntry& ChunkEntry = ChunkEntries[ChunkIndex]; - if (ChunkEntry.Offset == ~0ull) - { - zen::IoBuffer ChunkBuffer = zen::IoBufferBuilder::MakeFromFile(GetLargeChunkPath(m_SourcePath, ChunkEntry.ChunkHash)); - zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); - ZEN_ASSERT(Chunk); - Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash)); - } - else - { - uint32_t BlockIndex = gsl::narrow<uint32_t>(ChunkEntry.Offset / BlockSize); - if (BlockIndex != ReadBlockIndex) - { - ReadBlockIndex = BlockIndex; - BlockStore = zen::IoBufferBuilder::MakeFromFile(GetChunksPath(m_SourcePath, ReadBlockIndex)); - ZEN_ASSERT(BlockStore); - } - size_t BlockOffset = BlockIndex * BlockSize; - size_t AttachmentBulkOffset = ChunkEntry.Offset - BlockOffset; - zen::IoBuffer ChunkBuffer(BlockStore, AttachmentBulkOffset, ChunkEntry.Length); - zen::CompressedBuffer Chunk = zen::CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); - Attachments.push_back(zen::CbAttachment(Chunk, ChunkEntry.ChunkHash)); - } - } - RequestPackage.AddAttachments(Attachments); - - ZEN_CONSOLE("Sending oplog with {} ops and {} attachments for '{}/{}' to {}", - OpEntries.size(), - Header->ChunkCount, - m_ProjectName, - OplogName, - m_HostName); - std::vector<zen::IoBuffer> RequestPayload = zen::FormatPackageMessage(RequestPackage, zen::FormatFlags::kAllowLocalReferences); - std::vector<zen::SharedBuffer> Parts; - Parts.reserve(RequestPayload.size()); - for (const auto& I : RequestPayload) - { - Parts.emplace_back(zen::SharedBuffer(I)); - } - zen::CompositeBuffer Cmp(std::move(Parts)); - zen::CompressedBuffer CompressedRequest = zen::CompressedBuffer::Compress(Cmp); - - std::string AppendOplogRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName); - Session.SetUrl(AppendOplogRequest); - - zen::IoBuffer TmpBuffer = CompressedRequest.GetCompressed().Flatten().AsIoBuffer(); - Session.SetBody(cpr::Body{(const char*)TmpBuffer.GetData(), TmpBuffer.GetSize()}); - cpr::Response Response = Session.Post(); - if (!IsSuccess(Response, AppendOplogRequest)) - { - return 1; - } - - ZEN_CONSOLE("Imported {} ops and {} chunks", OpEntries.size(), Header->ChunkCount); - } - return 0; -} diff --git a/zen/cmds/projectstore.h b/zen/cmds/projectstore.h index 1bdcbdfac..73cba8f66 100644 --- a/zen/cmds/projectstore.h +++ b/zen/cmds/projectstore.h @@ -70,37 +70,3 @@ private: std::string m_OplogId; std::string m_GcPath; }; - -class ExportProjectCommand : public ZenCmdBase -{ -public: - ExportProjectCommand(); - ~ExportProjectCommand(); - - virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; - virtual cxxopts::Options& Options() override { return m_Options; } - -private: - cxxopts::Options m_Options{"export-project", "Export one or more project oplogs to disk"}; - std::string m_HostName; - std::string m_TargetPath; - std::string m_ProjectName; - std::vector<std::string> m_OplogNames; -}; - -class ImportProjectCommand : public ZenCmdBase -{ -public: - ImportProjectCommand(); - ~ImportProjectCommand(); - - virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; - virtual cxxopts::Options& Options() override { return m_Options; } - -private: - cxxopts::Options m_Options{"import-project", "Import project oplogs from disk"}; - std::string m_HostName; - std::string m_SourcePath; - std::string m_ProjectName; - std::vector<std::string> m_OplogNames; -}; diff --git a/zen/zen.cpp b/zen/zen.cpp index d7b552814..2b6a529fe 100644 --- a/zen/zen.cpp +++ b/zen/zen.cpp @@ -215,8 +215,6 @@ main(int argc, char** argv) PsCommand PsCmd; UpCommand UpCmd; DownCommand DownCmd; - ExportProjectCommand ExportProjectCmd; - ImportProjectCommand ImportProjectCmd; VersionCommand VersionCmd; CacheInfoCommand CacheInfoCmd; DropProjectCommand ProjectDropCmd; @@ -241,9 +239,7 @@ main(int argc, char** argv) {"copy", &CopyCmd, "Copy file(s)"}, {"dedup", &DedupCmd, "Dedup files"}, {"drop", &DropCmd, "Drop cache namespace or bucket"}, - {"export-project", &ExportProjectCmd, "Export project store oplog"}, {"hash", &HashCmd, "Compute file hashes"}, - {"import-project", &ImportProjectCmd, "Import project store oplog"}, {"print", &PrintCmd, "Print compact binary object"}, {"printpackage", &PrintPkgCmd, "Print compact binary package"}, {"status", &StatusCmd, "Show zen status"}, diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp index 9a17443a8..188865edb 100644 --- a/zenserver/projectstore/projectstore.cpp +++ b/zenserver/projectstore/projectstore.cpp @@ -2311,209 +2311,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) HttpVerb::kGet); m_Router.RegisterRoute( - "{project}/oplog/{log}/archive", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - switch (Req.ServerRequest().RequestVerb()) - { - case HttpVerb::kGet: - { - CbObjectWriter Response; - Response.BeginArray("entries"sv); - std::unordered_set<IoHash> AttachementHashes; - size_t OpCount = 0; - IoHashStream Hasher; - - FoundLog->IterateOplog([this, &Hasher, &Response, &AttachementHashes, &OpCount](CbObject Op) { - SharedBuffer Buffer = Op.GetBuffer(); - Hasher.Append(Buffer.GetView()); - Response << Op; - Op.IterateAttachments([this, &AttachementHashes, &OpCount](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - AttachementHashes.emplace(AttachmentHash); - }); - OpCount++; - }); - Response.EndArray(); - - IoHash Checksum = Hasher.GetHash(); - Response.AddHash("checksum"sv, Checksum); - - ZEN_INFO("Exporting {} ops and {} chunks from '{}/{}' with checksum '{}'", - OpCount, - AttachementHashes.size(), - ProjectId, - OplogId, - Checksum); - - CbPackage ResponsePackage; - ResponsePackage.SetObject(Response.Save()); - - std::vector<CbAttachment> Attachments; - Attachments.reserve(AttachementHashes.size()); - for (const IoHash& AttachmentHash : AttachementHashes) - { - IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); - if (Payload) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)); - ZEN_ASSERT(Compressed); - Attachments.emplace_back(CbAttachment(Compressed, AttachmentHash)); - } - } - ResponsePackage.AddAttachments(Attachments); - - std::vector<IoBuffer> ResponsePayload = FormatPackageMessage(ResponsePackage, FormatFlags::kAllowLocalReferences); - const ZenContentType AcceptType = HttpReq.AcceptContentType(); - if (AcceptType == ZenContentType::kCompressedBinary) - { - std::vector<SharedBuffer> Parts; - Parts.reserve(ResponsePayload.size()); - for (const auto& I : ResponsePayload) - { - Parts.emplace_back(SharedBuffer(I)); - } - CompositeBuffer Cmp(std::move(Parts)); - CompressedBuffer CompressedResponse = CompressedBuffer::Compress(Cmp); - HttpReq.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCompressedBinary, - CompressedResponse.GetCompressed().Flatten().AsIoBuffer()); - } - else if (AcceptType == ZenContentType::kCbPackage) - { - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponsePayload); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - } - break; - case HttpVerb::kPost: - { - ZEN_INFO("Importing oplog '{}/{}'", ProjectId, OplogId); - IoBuffer CompressedPayload = HttpReq.ReadPayload(); - IoBuffer Payload = - CompressedBuffer::FromCompressedNoValidate(std::move(CompressedPayload)).Decompress().AsIoBuffer(); - - CbPackage RequestPackage = ParsePackageMessage(Payload); - CbObject Request = RequestPackage.GetObject(); - IoHash Checksum = Request["checksum"sv].AsHash(); - - std::span<const CbAttachment> Attachments = RequestPackage.GetAttachments(); - zen ::CbArrayView Entries = Request["entries"sv].AsArrayView(); - - ZEN_INFO("Importing oplog with {} ops and {} attachments with checksum '{}' to '{}/{}'", - Entries.Num(), - Attachments.size(), - Checksum, - ProjectId, - OplogId); - std::vector<CbObject> Ops; - Ops.reserve(Entries.Num()); - IoHashStream Hasher; - for (auto& OpEntry : Entries) - { - CbObjectView Core = OpEntry.AsObjectView(); - - if (!Core["key"sv]) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "No oplog entry key specified"); - } - - BinaryWriter Writer; - Core.CopyTo(Writer); - MemoryView OpView = Writer.GetView(); - Hasher.Append(OpView); - IoBuffer OpBuffer(IoBuffer::Clone, OpView.GetData(), OpView.GetSize()); - CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); - Ops.emplace_back(Op); - } - IoHash CalculatedChecksum = Hasher.GetHash(); - if (CalculatedChecksum != Checksum) - { - ZEN_WARN("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); - return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); - } - - ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId); - for (const CbObject& Op : Ops) - { - const uint32_t OpLsn = FoundLog->AppendNewOplogEntry(Op); - ZEN_DEBUG("oplog entry #{}", OpLsn); - - if (OpLsn == ProjectStore::Oplog::kInvalidOp) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - - ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", - ProjectId, - OplogId, - OpLsn, - NiceBytes(Op.GetSize()), - Op["key"sv].AsString()); - } - - // Persist attachments after oplog entry so GC won't find attachments without references - ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId); - - // We are creating a worker thread pool here since we are storing a lot of attachments in one go - // Doing import is a rare and transient occation so we don't want to keep a WorkerThreadPool alive. - WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u)); - Latch JobCount{gsl::narrow_cast<std::ptrdiff_t>(Attachments.size())}; - for (const CbAttachment& Attachment : Attachments) - { - WorkerPool.ScheduleWork([this, &Attachment, &JobCount, ProjectId, OplogId]() { - try - { - CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary(); - m_CidStore.AddChunk(AttachmentBody.GetCompressed().Flatten().AsIoBuffer(), - Attachment.GetHash(), - CidStore::InsertMode::kCopyOnly); - } - catch (std::exception& e) - { - ZEN_ERROR("Failed to store attachment {} for '{}/{}', reason: '{}'", - Attachment.GetHash(), - ProjectId, - OplogId, - e.what()); - } - JobCount.CountDown(); - }); - } - JobCount.Wait(); - - ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId); - return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); - } - break; - default: - break; - } - }, - HttpVerb::kPost | HttpVerb::kGet); - m_Router.RegisterRoute( "{project}/oplog/{log}", [this](HttpRouterRequest& Req) { const auto& ProjectId = Req.GetCapture(1); |