aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-25 15:42:36 +0200
committerGitHub <[email protected]>2023-05-25 15:42:36 +0200
commit3994bac926bcceb03a89ba74d164dcd687d70db6 (patch)
treed967928c38dea7449381c85b1a6a60887964e491 /src/zenserver
parentminor: refcount bump elimination (diff)
downloadzen-3994bac926bcceb03a89ba74d164dcd687d70db6.tar.xz
zen-3994bac926bcceb03a89ba74d164dcd687d70db6.zip
oplog snapshot (#317)
Added "snapshot" oplog RPC this may be used to bring referenced files into the local store instead of referencing them by filename, thus making the project/oplog transportable
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp160
1 files changed, 154 insertions, 6 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index deae58508..6ca43af0e 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -4,6 +4,7 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryutil.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -318,7 +319,7 @@ struct ProjectStore::OplogStorage : public RefCounted
m_NextOpsOffset);
}
- void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler)
+ void ReplayLog(const std::span<OplogEntryAddress> Entries, std::function<void(CbObject)>&& Handler)
{
for (const OplogEntryAddress& Entry : Entries)
{
@@ -2379,7 +2380,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
std::string_view Method = Cb["method"sv].AsString();
- if (Method == "import")
+ if (Method == "import"sv)
{
if (!AreDiskWritesAllowed())
{
@@ -2392,7 +2393,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
}
- else if (Method == "export")
+ else if (Method == "export"sv)
{
std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
if (Result.second.empty())
@@ -2401,7 +2402,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
}
- else if (Method == "getchunks")
+ else if (Method == "getchunks"sv)
{
CbPackage ResponsePackage;
{
@@ -2425,7 +2426,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault);
return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
- else if (Method == "putchunks")
+ else if (Method == "putchunks"sv)
{
if (!AreDiskWritesAllowed())
{
@@ -2440,6 +2441,153 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
}
return HttpReq.WriteResponse(HttpResponseCode::OK);
}
+ else if (Method == "snapshot"sv)
+ {
+ if (!AreDiskWritesAllowed())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage);
+ }
+
+ // Snapshot all referenced files. This brings the content of all
+ // files into the CID store
+
+ int OpCount = 0;
+ uint64_t InlinedBytes = 0;
+ uint64_t InlinedFiles = 0;
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
+
+ std::vector<CbObject> NewOps;
+
+ Oplog->IterateOplog([&](CbObject Op) {
+ bool OpRewritten = false;
+ bool AllOk = true;
+
+ CbWriter Cbo;
+ Cbo.BeginArray("files"sv);
+
+ for (CbFieldView& Field : Op["files"sv])
+ {
+ bool CopyField = true;
+
+ if (CbObjectView View = Field.AsObjectView())
+ {
+ const IoHash DataHash = View["data"sv].AsHash();
+
+ if (DataHash == IoHash::Zero)
+ {
+ std::string_view ServerPath = View["serverpath"sv].AsString();
+ std::filesystem::path FilePath = Project->RootDir / ServerPath;
+ BasicFile DataFile;
+ std::error_code Ec;
+ DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec);
+
+ if (Ec)
+ {
+ // Error...
+
+ ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message());
+
+ AllOk = false;
+ }
+ else
+ {
+ // Read file contents into memory, compress and store in CidStore
+
+ IoBuffer FileIoBuffer = DataFile.ReadAll();
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(FileIoBuffer));
+ const IoHash RawHash = Compressed.DecodeRawHash();
+ const uint64_t RawSize = Compressed.DecodeRawSize();
+ IoBuffer CompressedBuffer = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ CidStore::InsertResult Result = m_CidStore.AddChunk(CompressedBuffer, RawHash);
+
+ TotalBytes += RawSize;
+ ++TotalFiles;
+
+ if (Result.New)
+ {
+ InlinedBytes += RawSize;
+ ++InlinedFiles;
+ }
+
+ // Rewrite file array entry with new data reference
+
+ CbObject RewrittenOp = RewriteCbObject(View, [&](CbObjectWriter& Writer, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ Writer.AddBinaryAttachment("data"sv, RawHash);
+
+ return true;
+ }
+
+ return false;
+ });
+
+ Cbo.AddObject(std::move(RewrittenOp));
+ CopyField = false;
+ }
+ }
+ }
+
+ if (CopyField)
+ {
+ Cbo.AddField(Field);
+ }
+ else
+ {
+ OpRewritten = true;
+ }
+ }
+
+ if (OpRewritten && AllOk)
+ {
+ Cbo.EndArray();
+ CbArray FilesArray = Cbo.Save().AsArray();
+
+ CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
+ if (Field.GetName() == "files"sv)
+ {
+ NewWriter.AddArray("files"sv, FilesArray);
+
+ return true;
+ }
+
+ return false;
+ });
+
+ NewOps.push_back(std::move(RewrittenOp));
+ }
+
+ OpCount++;
+ });
+
+ CbObjectWriter ResponseObj;
+
+ // Persist rewritten oplog entries
+
+ if (!NewOps.empty())
+ {
+ ResponseObj.BeginArray("rewritten_ops");
+
+ for (CbObject& NewOp : NewOps)
+ {
+ uint32_t NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp));
+
+ ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn);
+
+ ResponseObj.AddInteger(NewLsn);
+ }
+
+ ResponseObj.EndArray();
+ }
+
+ ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles;
+ ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles;
+
+ ZEN_INFO("rewrote {} oplog entries (out of {})", NewOps.size(), OpCount);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save());
+ }
return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
}
@@ -2578,7 +2726,7 @@ namespace testutils {
return Result;
}
- uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset)
+ uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset)
{
if (RawOffset > 0)
{