diff options
| author | Stefan Boberg <[email protected]> | 2023-05-25 15:42:36 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-25 15:42:36 +0200 |
| commit | 3994bac926bcceb03a89ba74d164dcd687d70db6 (patch) | |
| tree | d967928c38dea7449381c85b1a6a60887964e491 /src/zenserver | |
| parent | minor: refcount bump elimination (diff) | |
| download | zen-3994bac926bcceb03a89ba74d164dcd687d70db6.tar.xz zen-3994bac926bcceb03a89ba74d164dcd687d70db6.zip | |
oplog snapshot (#317)
Added "snapshot" oplog RPC
this may be used to bring referenced files into the local store instead of referencing them by filename, thus making the project/oplog transportable
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 160 |
1 files changed, 154 insertions, 6 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index deae58508..6ca43af0e 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -4,6 +4,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryutil.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -318,7 +319,7 @@ struct ProjectStore::OplogStorage : public RefCounted m_NextOpsOffset); } - void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler) + void ReplayLog(const std::span<OplogEntryAddress> Entries, std::function<void(CbObject)>&& Handler) { for (const OplogEntryAddress& Entry : Entries) { @@ -2379,7 +2380,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, std::string_view Method = Cb["method"sv].AsString(); - if (Method == "import") + if (Method == "import"sv) { if (!AreDiskWritesAllowed()) { @@ -2392,7 +2393,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); } - else if (Method == "export") + else if (Method == "export"sv) { std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager); if (Result.second.empty()) @@ -2401,7 +2402,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second); } - else if (Method == "getchunks") + else if (Method == "getchunks"sv) { CbPackage ResponsePackage; { @@ -2425,7 +2426,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault); return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } - else if (Method == "putchunks") + else if (Method == "putchunks"sv) { if (!AreDiskWritesAllowed()) { @@ -2440,6 +2441,153 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } return HttpReq.WriteResponse(HttpResponseCode::OK); } + else if (Method == "snapshot"sv) + { + if (!AreDiskWritesAllowed()) + { + return HttpReq.WriteResponse(HttpResponseCode::InsufficientStorage); + } + + // Snapshot all referenced files. This brings the content of all + // files into the CID store + + int OpCount = 0; + uint64_t InlinedBytes = 0; + uint64_t InlinedFiles = 0; + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; + + std::vector<CbObject> NewOps; + + Oplog->IterateOplog([&](CbObject Op) { + bool OpRewritten = false; + bool AllOk = true; + + CbWriter Cbo; + Cbo.BeginArray("files"sv); + + for (CbFieldView& Field : Op["files"sv]) + { + bool CopyField = true; + + if (CbObjectView View = Field.AsObjectView()) + { + const IoHash DataHash = View["data"sv].AsHash(); + + if (DataHash == IoHash::Zero) + { + std::string_view ServerPath = View["serverpath"sv].AsString(); + std::filesystem::path FilePath = Project->RootDir / ServerPath; + BasicFile DataFile; + std::error_code Ec; + DataFile.Open(FilePath, BasicFile::Mode::kRead, Ec); + + if (Ec) + { + // Error... + + ZEN_ERROR("unable to read data from file '{}': {}", FilePath, Ec.message()); + + AllOk = false; + } + else + { + // Read file contents into memory, compress and store in CidStore + + IoBuffer FileIoBuffer = DataFile.ReadAll(); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(FileIoBuffer)); + const IoHash RawHash = Compressed.DecodeRawHash(); + const uint64_t RawSize = Compressed.DecodeRawSize(); + IoBuffer CompressedBuffer = Compressed.GetCompressed().Flatten().AsIoBuffer(); + CidStore::InsertResult Result = m_CidStore.AddChunk(CompressedBuffer, RawHash); + + TotalBytes += RawSize; + ++TotalFiles; + + if (Result.New) + { + InlinedBytes += RawSize; + ++InlinedFiles; + } + + // Rewrite file array entry with new data reference + + CbObject RewrittenOp = RewriteCbObject(View, [&](CbObjectWriter& Writer, CbFieldView Field) -> bool { + if (Field.GetName() == "data"sv) + { + Writer.AddBinaryAttachment("data"sv, RawHash); + + return true; + } + + return false; + }); + + Cbo.AddObject(std::move(RewrittenOp)); + CopyField = false; + } + } + } + + if (CopyField) + { + Cbo.AddField(Field); + } + else + { + OpRewritten = true; + } + } + + if (OpRewritten && AllOk) + { + Cbo.EndArray(); + CbArray FilesArray = Cbo.Save().AsArray(); + + CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { + if (Field.GetName() == "files"sv) + { + NewWriter.AddArray("files"sv, FilesArray); + + return true; + } + + return false; + }); + + NewOps.push_back(std::move(RewrittenOp)); + } + + OpCount++; + }); + + CbObjectWriter ResponseObj; + + // Persist rewritten oplog entries + + if (!NewOps.empty()) + { + ResponseObj.BeginArray("rewritten_ops"); + + for (CbObject& NewOp : NewOps) + { + uint32_t NewLsn = Oplog->AppendNewOplogEntry(std::move(NewOp)); + + ZEN_DEBUG("appended rewritten op at LSN: {}", NewLsn); + + ResponseObj.AddInteger(NewLsn); + } + + ResponseObj.EndArray(); + } + + ResponseObj << "inlined_bytes" << InlinedBytes << "inlined_files" << InlinedFiles; + ResponseObj << "total_bytes" << TotalBytes << "total_files" << TotalFiles; + + ZEN_INFO("rewrote {} oplog entries (out of {})", NewOps.size(), OpCount); + + return HttpReq.WriteResponse(HttpResponseCode::OK, ResponseObj.Save()); + } return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method)); } @@ -2578,7 +2726,7 @@ namespace testutils { return Result; } - uint64 GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) + uint64_t GetCompressedOffset(const CompressedBuffer& Buffer, uint64 RawOffset) { if (RawOffset > 0) { |