aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-25 15:42:36 +0200
committerGitHub <[email protected]>2023-05-25 15:42:36 +0200
commit3994bac926bcceb03a89ba74d164dcd687d70db6 (patch)
treed967928c38dea7449381c85b1a6a60887964e491 /src
parentminor: refcount bump elimination (diff)
downloadzen-3994bac926bcceb03a89ba74d164dcd687d70db6.tar.xz
zen-3994bac926bcceb03a89ba74d164dcd687d70db6.zip
oplog snapshot (#317)
Added "snapshot" oplog RPC this may be used to bring referenced files into the local store instead of referencing them by filename, thus making the project/oplog transportable
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/projectstore.cpp34
-rw-r--r--src/zencore/include/zencore/compactbinaryutil.h42
-rw-r--r--src/zenserver/projectstore/projectstore.cpp160
3 files changed, 219 insertions, 17 deletions
diff --git a/src/zen/cmds/projectstore.cpp b/src/zen/cmds/projectstore.cpp
index 70670150a..e95b230b8 100644
--- a/src/zen/cmds/projectstore.cpp
+++ b/src/zen/cmds/projectstore.cpp
@@ -35,9 +35,12 @@ DropProjectCommand::DropProjectCommand()
m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>");
m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>");
m_Options.parse_positional({"project", "oplog"});
+ m_Options.positional_help("[<projectid> [<oplogid>]]");
}
-DropProjectCommand::~DropProjectCommand() = default;
+DropProjectCommand::~DropProjectCommand()
+{
+}
int
DropProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
@@ -102,9 +105,12 @@ ProjectInfoCommand::ProjectInfoCommand()
m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectid>");
m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogName), "<oplogid>");
m_Options.parse_positional({"project", "oplog"});
+ m_Options.positional_help("[<projectid> [<oplogid>]]");
}
-ProjectInfoCommand::~ProjectInfoCommand() = default;
+ProjectInfoCommand::~ProjectInfoCommand()
+{
+}
int
ProjectInfoCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
@@ -279,14 +285,12 @@ CreateOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
if (m_ProjectId.empty())
{
- ZEN_ERROR("Project name must be given");
- return 1;
+ throw zen::OptionParseException("project name must be specified");
}
if (m_OplogId.empty())
{
- ZEN_ERROR("Oplog name must be given");
- return 1;
+ throw zen::OptionParseException("oplog name must be specified");
}
Session.SetUrl({fmt::format("{}/prj/{}/oplog/{}", m_HostName, m_ProjectId, m_OplogId)});
@@ -377,7 +381,9 @@ ExportOplogCommand::ExportOplogCommand()
m_Options.parse_positional({"project", "oplog"});
}
-ExportOplogCommand::~ExportOplogCommand() = default;
+ExportOplogCommand::~ExportOplogCommand()
+{
+}
int
ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
@@ -644,7 +650,9 @@ ImportOplogCommand::ImportOplogCommand()
m_Options.parse_positional({"project", "oplog"});
}
-ImportOplogCommand::~ImportOplogCommand() = default;
+ImportOplogCommand::~ImportOplogCommand()
+{
+}
int
ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
@@ -842,7 +850,9 @@ ProjectStatsCommand::ProjectStatsCommand()
m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
}
-ProjectStatsCommand::~ProjectStatsCommand() = default;
+ProjectStatsCommand::~ProjectStatsCommand()
+{
+}
int
ProjectStatsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
@@ -891,7 +901,7 @@ ProjectDetailsCommand::ProjectDetailsCommand()
m_Options.add_options()("h,help", "Print help");
m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>");
m_Options.add_option("", "c", "csv", "Output in CSV format (default is JSon)", cxxopts::value(m_CSV), "<csv>");
- m_Options.add_option("", "d", "details", "Detailed info on opslog", cxxopts::value(m_Details), "<details>");
+ m_Options.add_option("", "d", "details", "Detailed info on oplog", cxxopts::value(m_Details), "<details>");
m_Options.add_option("", "o", "opdetails", "Details info on oplog body", cxxopts::value(m_OpDetails), "<opdetails>");
m_Options.add_option("", "p", "project", "Project name to get info from", cxxopts::value(m_ProjectName), "<projectid>");
m_Options.add_option("", "l", "oplog", "Oplog name to get info from", cxxopts::value(m_OplogName), "<oplogid>");
@@ -904,7 +914,9 @@ ProjectDetailsCommand::ProjectDetailsCommand()
"<attachmentdetails>");
}
-ProjectDetailsCommand::~ProjectDetailsCommand() = default;
+ProjectDetailsCommand::~ProjectDetailsCommand()
+{
+}
int
ProjectDetailsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
diff --git a/src/zencore/include/zencore/compactbinaryutil.h b/src/zencore/include/zencore/compactbinaryutil.h
new file mode 100644
index 000000000..9d69266bf
--- /dev/null
+++ b/src/zencore/include/zencore/compactbinaryutil.h
@@ -0,0 +1,42 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+
+namespace zen {
+
+/** Object rewrite helper
+
+ This is meant to be used when you have a fully formed CbObject which you wish
+ to rewrite. Since the compact binary format is not meant for in-place updates
+ this involves scanning the object, copying the fields we want to keep and
+ writing new versions of the fields we wish to keep.
+
+ The Rewriter function accepts a reference to the CbObjectWriter which is being
+ used to build the new version of the object and a reference to a field which is
+ considered for copy or rewriting. If the function wants to rewrite a field then
+ it'll use the writer to write a new field and then return `true`, otherwise just
+ return `false` to have RewriteCbObject copy the field into the new object.
+*/
+
+CbObject
+RewriteCbObject(CbObjectView InObj, Invocable<CbObjectWriter&, CbFieldView&> auto Rewriter)
+{
+ CbObjectWriter CboWriter;
+
+ for (CbFieldView InnerField : InObj)
+ {
+ if (!Rewriter(CboWriter, InnerField))
+ {
+ CboWriter.AddField(InnerField.GetName(), InnerField);
+ }
+ }
+
+ return CboWriter.Save();
+}
+
+} // namespace zen
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index deae58508..6ca43af0e 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -4,6 +4,7 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryutil.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -318,7 +319,7 @@ struct ProjectStore::OplogStorage : public RefCounted
m_NextOpsOffset);
}
- void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler)
+ void ReplayLog(const std::span<OplogEntryAddress> Entries, std::function<void(CbObject)>&& Handler)
{
for (const OplogEntryAddress& Entry : Entries)
{
@@ -2379,7 +2380,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
std::string_view Method = Cb["method"sv].AsString();
- if (Method == "import")
+ if (Method == "import"sv)
{
if (!AreDiskWritesAllowed())
{
@@ -2392,7 +2393,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
}
- else if (Method == "export")
+ else if (Method == "export"sv)
{
std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
if (Result.second.empty())
@@ -2401,7 +2402,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
}
- else if (Method == "getchunks")
+ else if (Method == "getchunks"sv)
{
CbPackage ResponsePackage;
{
@@ -2425,7 +2426,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault);
return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
- else if (Method == "putchunks")
+ else if (Method == "putchunks"sv)
{
if (!AreDiskWritesAllowed())
{
@@ -2440,6 +2441,153 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
return HttpReq.WriteResponse(HttpResponseCode::OK);
}
+ else if (Method == "snapshot"sv)
+ {
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ // Snapshot all referenced files. This brings the content of all
+ // files into the CID store
+
+ int OpCount = 0;
+ uint64_t InlinedBytes = 0;
+ uint64_t InlinedFiles = 0;
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
+
+ std::vector<CbObject> NewOps;
+
+ Oplog->IterateOplog([&](CbObject Op) {
+ bool OpRewritten = false;
+ bool AllOk = true;
+
+ CbWriter Cbo;
+ Cbo.BeginArray("files"sv);
+
+ for (CbFieldView& Field : Op["files"sv])
+ {
+ bool CopyField = true;
+
+ if (CbObjectView View = Field.AsObjectView())
+ {
+ const IoHash DataHash = View["data"sv].AsHash();
+
+ if (DataHash == IoHash::Zero)
+ {
+ std::string_view ServerPath = View["serverpath"sv].AsString();
+ std::filesystem::path FilePath = Project->RootDir / ServerPath;
+ BasicFile DataFile;
+ std::error_code Ec;
+ DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+
+ if (Ec)
+ {
+ // Error...
+
+ ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message());
+
+ AllOk = false;
+ }
+ else
+ {
+ // Read file contents into memory, compress and store in CidStore
+
+ IoBuffer FileIoBuffer = DataFile.ReadAll();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(FileIoBuffer));
+ const IoHash RawHash = Compressed.DecodeRawHash();
+ const uint64_t RawSize = Compressed.DecodeRawSize();
+ IoBuffer CompressedBuffer = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ CidStore::InsertResult Result = m_CidStore.AddChunk(CompressedBuffer, RawHash);
+
+ TotalBytes += RawSize;
+ ++TotalFiles;
+
+ if (Result.New)
+ {
+ InlinedBytes += RawSize;
+ ++InlinedFiles;
+ }
+
+ // Rewrite file array entry with new data reference
+
+ CbObject RewrittenOp = RewriteCbObject(View, [&](CbObjectWriter& Writer, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ Writer.AddBinaryAttachment("data"sv, RawHash);
+
+ return true;
+ }
+
+ return false;
+ });
+
+ Cbo.AddObject(std::move(RewrittenOp));
+ CopyField = false;
+ }
+ }
+ }
+
+ if (CopyField)
+ {
+ Cbo.AddField(Field);
+ }
+ else
+ {
+ OpRewritten = true;
+ }
+ }
+
+ if (OpRewritten && AllOk)
+ {
+ Cbo.EndArray();
+ CbArray FilesArray = Cbo.Save().AsArray();
+
+ CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
+ if (Field.GetName() == "files"sv)
+ {
+ NewWriter.AddArray("files"sv, FilesArray);
+
+ return true;
+ }
+
+ return false;
+ });
+
+ NewOps.push_back(std::move(RewrittenOp));
+ }
+
+ OpCount++;
+ });
+
+ CbObjectWriter ResponseObj;
+
+ // Persist rewritten oplog entries
+
+ if (!NewOps.empty())
+ {
+ ResponseObj.BeginArray("rewritten_ops");
+
+ for (CbObject& NewOp : NewOps)
+ {
+ uint32_t NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
+
+ ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn);
+
+ ResponseObj.AddInteger(NewLsn);
+ }
+
+ ResponseObj.EndArray();
+ }
+
+ ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
+ ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
+
+ ZEN_INFO("rewrote {} oplog entries (out of {})", NewOps.size(), OpCount);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
+ }
return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
}
@@ -2578,7 +2726,7 @@ namespace testutils {
return Result;
}
- uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset)
+ uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset)
{
if (RawOffset > 0)
{