diff options
| author | Stefan Boberg <[email protected]> | 2023-05-25 15:42:36 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-25 15:42:36 +0200 |
| commit | 3994bac926bcceb03a89ba74d164dcd687d70db6 (patch) | |
| tree | d967928c38dea7449381c85b1a6a60887964e491 /src | |
| parent | minor: refcount bump elimination (diff) | |
| download | zen-3994bac926bcceb03a89ba74d164dcd687d70db6.tar.xz zen-3994bac926bcceb03a89ba74d164dcd687d70db6.zip | |
oplog snapshot (#317)
Added "snapshot" oplog RPC
this may be used to bring referenced files into the local store instead of referencing them by filename, thus making the project/oplog transportable
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/projectstore.cpp | 34 | ||||
| -rw-r--r-- | src/zencore/include/zencore/compactbinaryutil.h | 42 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 160 |
3 files changed, 219 insertions, 17 deletions
diff --git a/src/zen/cmds/projectstore.cpp b/src/zen/cmds/projectstore.cpp index 70670150a..e95b230b8 100644 --- a/src/zen/cmds/projectstore.cpp +++ b/src/zen/cmds/projectstore.cpp @@ -35,9 +35,12 @@ DropProjectCommand::DropProjectCommand() m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); m_Options.parse_positional({"project", "oplog"}); + m_Options.positional_help("[<projectid> [<oplogid>]]"); } -DropProjectCommand::~DropProjectCommand() = default; +DropProjectCommand::~DropProjectCommand() +{ +} int DropProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) @@ -102,9 +105,12 @@ ProjectInfoCommand::ProjectInfoCommand() m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>"); m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>"); m_Options.parse_positional({"project", "oplog"}); + m_Options.positional_help("[<projectid> [<oplogid>]]"); } -ProjectInfoCommand::~ProjectInfoCommand() = default; +ProjectInfoCommand::~ProjectInfoCommand() +{ +} int ProjectInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) @@ -279,14 +285,12 @@ CreateOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg if (m_ProjectId.empty()) { - ZEN_ERROR("Project name must be given"); - return 1; + throw zen::OptionParseException("project name must be specified"); } if (m_OplogId.empty()) { - ZEN_ERROR("Oplog name must be given"); - return 1; + throw zen::OptionParseException("oplog name must be specified"); } Session.SetUrl({fmt::format("{}/prj/{}/oplog/{}", m_HostName, m_ProjectId, m_OplogId)}); @@ -377,7 +381,9 @@ ExportOplogCommand::ExportOplogCommand() m_Options.parse_positional({"project", "oplog"}); } -ExportOplogCommand::~ExportOplogCommand() = default; +ExportOplogCommand::~ExportOplogCommand() +{ +} int ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) @@ -644,7 +650,9 @@ ImportOplogCommand::ImportOplogCommand() m_Options.parse_positional({"project", "oplog"}); } -ImportOplogCommand::~ImportOplogCommand() = default; +ImportOplogCommand::~ImportOplogCommand() +{ +} int ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) @@ -842,7 +850,9 @@ ProjectStatsCommand::ProjectStatsCommand() m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); } -ProjectStatsCommand::~ProjectStatsCommand() = default; +ProjectStatsCommand::~ProjectStatsCommand() +{ +} int ProjectStatsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) @@ -891,7 +901,7 @@ ProjectDetailsCommand::ProjectDetailsCommand() m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); m_Options.add_option("", "c", "csv", "Output in CSV format (default is JSon)", cxxopts::value(m_CSV), "<csv>"); - m_Options.add_option("", "d", "details", "Detailed info on opslog", cxxopts::value(m_Details), "<details>"); + m_Options.add_option("", "d", "details", "Detailed info on oplog", cxxopts::value(m_Details), "<details>"); m_Options.add_option("", "o", "opdetails", "Details info on oplog body", cxxopts::value(m_OpDetails), "<opdetails>"); m_Options.add_option("", "p", "project", "Project name to get info from", cxxopts::value(m_ProjectName), "<projectid>"); m_Options.add_option("", "l", "oplog", "Oplog name to get info from", cxxopts::value(m_OplogName), "<oplogid>"); @@ -904,7 +914,9 @@ ProjectDetailsCommand::ProjectDetailsCommand() "<attachmentdetails>"); } -ProjectDetailsCommand::~ProjectDetailsCommand() = default; +ProjectDetailsCommand::~ProjectDetailsCommand() +{ +} int ProjectDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) diff --git a/src/zencore/include/zencore/compactbinaryutil.h b/src/zencore/include/zencore/compactbinaryutil.h new file mode 100644 index 000000000..9d69266bf --- /dev/null +++ b/src/zencore/include/zencore/compactbinaryutil.h @@ -0,0 +1,42 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> + +namespace zen { + +/** Object rewrite helper + + This is meant to be used when you have a fully formed CbObject which you wish + to rewrite. Since the compact binary format is not meant for in-place updates + this involves scanning the object, copying the fields we want to keep and + writing new versions of the fields we wish to keep. + + The Rewriter function accepts a reference to the CbObjectWriter which is being + used to build the new version of the object and a reference to a field which is + considered for copy or rewriting. If the function wants to rewrite a field then + it'll use the writer to write a new field and then return `true`, otherwise just + return `false` to have RewriteCbObject copy the field into the new object. +*/ + +CbObject +RewriteCbObject(CbObjectView InObj, Invocable<CbObjectWriter&, CbFieldView&> auto Rewriter) +{ + CbObjectWriter CboWriter; + + for (CbFieldView InnerField : InObj) + { + if (!Rewriter(CboWriter, InnerField)) + { + CboWriter.AddField(InnerField.GetName(), InnerField); + } + } + + return CboWriter.Save(); +} + +} // namespace zen diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index deae58508..6ca43af0e 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -4,6 +4,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryutil.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -318,7 +319,7 @@ struct ProjectStore::OplogStorage : public RefCounted m_NextOpsOffset); } - void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler) + void ReplayLog(const std::span<OplogEntryAddress> Entries, std::function<void(CbObject)>&& Handler) { for (const OplogEntryAddress& Entry : Entries) { @@ -2379,7 +2380,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, std::string_view Method = Cb["method"sv].AsString(); - if (Method == "import") + if (Method == "import"sv) { if (!AreDiskWritesAllowed()) { @@ -2392,7 +2393,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); } - else if (Method == "export") + else if (Method == "export"sv) { std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); if (Result.second.empty()) @@ -2401,7 +2402,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); } - else if (Method == "getchunks") + else if (Method == "getchunks"sv) { CbPackage ResponsePackage; { @@ -2425,7 +2426,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault); return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } - else if (Method == "putchunks") + else if (Method == "putchunks"sv) { if (!AreDiskWritesAllowed()) { @@ -2440,6 +2441,153 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } return HttpReq.WriteResponse(HttpResponseCode::OK); } + else if (Method == "snapshot"sv) + { + if (!AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + // Snapshot all referenced files. This brings the content of all + // files into the CID store + + int OpCount = 0; + uint64_t InlinedBytes = 0; + uint64_t InlinedFiles = 0; + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; + + std::vector<CbObject> NewOps; + + Oplog->IterateOplog([&](CbObject Op) { + bool OpRewritten = false; + bool AllOk = true; + + CbWriter Cbo; + Cbo.BeginArray("files"sv); + + for (CbFieldView& Field : Op["files"sv]) + { + bool CopyField = true; + + if (CbObjectView View = Field.AsObjectView()) + { + const IoHash DataHash = View["data"sv].AsHash(); + + if (DataHash == IoHash::Zero) + { + std::string_view ServerPath = View["serverpath"sv].AsString(); + std::filesystem::path FilePath = Project->RootDir / ServerPath; + BasicFile DataFile; + std::error_code Ec; + DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); + + if (Ec) + { + // Error... + + ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); + + AllOk = false; + } + else + { + // Read file contents into memory, compress and store in CidStore + + IoBuffer FileIoBuffer = DataFile.ReadAll(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(FileIoBuffer)); + const IoHash RawHash = Compressed.DecodeRawHash(); + const uint64_t RawSize = Compressed.DecodeRawSize(); + IoBuffer CompressedBuffer = Compressed.GetCompressed().Flatten().AsIoBuffer(); + CidStore::InsertResult Result = m_CidStore.AddChunk(CompressedBuffer, RawHash); + + TotalBytes += RawSize; + ++TotalFiles; + + if (Result.New) + { + InlinedBytes += RawSize; + ++InlinedFiles; + } + + // Rewrite file array entry with new data reference + + CbObject RewrittenOp = RewriteCbObject(View, [&](CbObjectWriter& Writer, CbFieldView Field) -> bool { + if (Field.GetName() == "data"sv) + { + Writer.AddBinaryAttachment("data"sv, RawHash); + + return true; + } + + return false; + }); + + Cbo.AddObject(std::move(RewrittenOp)); + CopyField = false; + } + } + } + + if (CopyField) + { + Cbo.AddField(Field); + } + else + { + OpRewritten = true; + } + } + + if (OpRewritten && AllOk) + { + Cbo.EndArray(); + CbArray FilesArray = Cbo.Save().AsArray(); + + CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { + if (Field.GetName() == "files"sv) + { + NewWriter.AddArray("files"sv, FilesArray); + + return true; + } + + return false; + }); + + NewOps.push_back(std::move(RewrittenOp)); + } + + OpCount++; + }); + + CbObjectWriter ResponseObj; + + // Persist rewritten oplog entries + + if (!NewOps.empty()) + { + ResponseObj.BeginArray("rewritten_ops"); + + for (CbObject& NewOp : NewOps) + { + uint32_t NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); + + ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn); + + ResponseObj.AddInteger(NewLsn); + } + + ResponseObj.EndArray(); + } + + ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; + ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; + + ZEN_INFO("rewrote {} oplog entries (out of {})", NewOps.size(), OpCount); + + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); + } return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); } @@ -2578,7 +2726,7 @@ namespace testutils { return Result; } - uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) + uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) { if (RawOffset > 0) { |