diff options
| author | Dan Engelbrecht <[email protected]> | 2023-08-24 12:38:51 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-08-24 12:38:51 +0200 |
| commit | 26134d49cc2b5174cbee1ce0b8c1b023f5e451eb (patch) | |
| tree | ec8c80b0f95c8fbe6d23aaebc517abc0d01a52fa /src | |
| parent | crash in process cache (#375) (diff) | |
| download | zen-26134d49cc2b5174cbee1ce0b8c1b023f5e451eb.tar.xz zen-26134d49cc2b5174cbee1ce0b8c1b023f5e451eb.zip | |
Add `--embedloosefiles` option to `oplog-export` (#376)
* Add `--embedloosefiles` option to `oplog-export` which adds loose files to the export, removing need to call `oplog-snapshot`
* Retain `ServerPath` in oplog when performing `oplog-snapshot`. This is a short-term fix for current incompatability with the UE cooker.
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/projectstore.cpp | 10 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore.h | 1 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 15 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 277 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 23 |
5 files changed, 253 insertions, 73 deletions
diff --git a/src/zen/cmds/projectstore.cpp b/src/zen/cmds/projectstore.cpp index bc526ce8c..5814b9671 100644 --- a/src/zen/cmds/projectstore.cpp +++ b/src/zen/cmds/projectstore.cpp @@ -466,6 +466,12 @@ ExportOplogCommand::ExportOplogCommand() "Max size for attachment to be bundled", cxxopts::value(m_MaxChunkEmbedSize), "<chunksize>"); + m_Options.add_option("", + "", + "embedloosefiles", + "Export additional files referenced by path as attachments", + cxxopts::value(m_EmbedLooseFiles), + "<embedloosefiles>"); m_Options.add_option("", "f", "force", "Force export of all attachments", cxxopts::value(m_Force), "<force>"); m_Options.add_option("", "", @@ -665,6 +671,10 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { Writer.AddInteger("maxchunkembedsize"sv, m_MaxChunkEmbedSize); } + if (m_EmbedLooseFiles) + { + Writer.AddBool("embedloosefiles"sv, true); + } if (m_Force) { Writer.AddBool("force"sv, true); diff --git a/src/zen/cmds/projectstore.h b/src/zen/cmds/projectstore.h index d67aed9aa..6ab49becf 100644 --- a/src/zen/cmds/projectstore.h +++ b/src/zen/cmds/projectstore.h @@ -121,6 +121,7 @@ private: std::string m_OplogName; uint64_t m_MaxBlockSize = 0; uint64_t m_MaxChunkEmbedSize = 0; + bool m_EmbedLooseFiles = false; bool m_Force = false; bool m_DisableBlocks = false; diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 06d5221c4..c37b8c8d4 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -2322,13 +2322,15 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( ChunkStore, + *Project.Get(), *Oplog, MaxBlockSize, MaxChunkEmbedSize, false, [](CompressedBuffer&&, const IoHash) {}, [](const IoHash&) {}, - [](const std::unordered_set<IoHash, IoHash::Hasher>) {}); + [](const std::unordered_set<IoHash, IoHash::Hasher>) {}, + nullptr); OutResponse = std::move(ContainerResult.ContainerObject); return ConvertResult(ContainerResult); @@ -2569,7 +2571,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, Oid ChunkId = View["id"sv].AsObjectId(); IoBuffer FileIoBuffer = DataFile.ReadAll(); - CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(FileIoBuffer)); + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); const IoHash RawHash = Compressed.DecodeRawHash(); const uint64_t RawSize = Compressed.DecodeRawSize(); IoBuffer CompressedBuffer = Compressed.GetCompressed().Flatten().AsIoBuffer(); @@ -2592,12 +2594,6 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, // omit this field as we will write it explicitly ourselves return true; } - else if (Field.GetName() == "serverpath"sv) - { - // omit this field as it's not relevant if there is a hash - return true; - } - return false; }); Writer.AddBinaryAttachment("data"sv, RawHash); @@ -2688,6 +2684,7 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u); size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u); bool Force = Params["force"sv].AsBool(false); + bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false); std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult = CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize); @@ -2708,9 +2705,11 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, RemoteProjectStore::Result Result = SaveOplog(m_CidStore, *RemoteStore, + Project, Oplog, MaxBlockSize, MaxChunkEmbedSize, + EmbedLooseFile, StoreInfo.CreateBlocks, StoreInfo.UseTempBlockFiles, Force); diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 1abd18a5c..2806bc2d1 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -3,6 +3,7 @@ #include "remoteprojectstore.h" #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryutil.h> #include <zencore/compress.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -189,6 +190,7 @@ AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks) CbObject BuildContainer(CidStore& ChunkStore, + ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, @@ -197,6 +199,7 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, const std::function<void(const IoHash&)>& OnLargeAttachment, const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, + tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutLooseAttachments, AsyncRemoteResult& RemoteResult) { using namespace std::literals; @@ -217,20 +220,166 @@ BuildContainer(CidStore& ChunkStore, std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes; - size_t BlockSize = 0; - std::vector<SharedBuffer> ChunksInBlock; - + size_t BlockSize = 0; + std::vector<SharedBuffer> ChunksInBlock; std::unordered_map<IoHash, int, IoHash::Hasher> Attachments; - Oplog.IterateOplogWithKey([&Attachments, &SectionOpsWriter, &OpCount](int LSN, const Oid&, CbObject Op) { + + auto RewriteOp = [&](int LSN, CbObject Op, const std::function<void(CbObject)>& CB) { + bool OpRewritten = false; + CbArrayView Files = Op["files"sv].AsArrayView(); + if (Files.Num() == 0) + { + CB(Op); + return; + } + + CbWriter Cbo; + Cbo.BeginArray("files"sv); + + for (CbFieldView& Field : Files) + { + bool CopyField = true; + + if (CbObjectView View = Field.AsObjectView()) + { + IoHash DataHash = View["data"sv].AsHash(); + + if (DataHash == IoHash::Zero) + { + { + // Read file contents into memory and compress + + std::string_view ServerPath = View["serverpath"sv].AsString(); + std::filesystem::path FilePath = Project.RootDir / ServerPath; + BasicFile DataFile; + DataFile.Open(FilePath, BasicFile::Mode::kRead); + + IoBuffer FileIoBuffer = DataFile.ReadAll(); + DataFile.Close(); + + CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); + + DataHash = Compressed.DecodeRawHash(); + uint64_t PayloadSize = Compressed.GetCompressed().GetSize(); + if (PayloadSize > MaxChunkEmbedSize) + { + // Write it out as a temporary file + IoBuffer AttachmentBuffer; + std::filesystem::path AttachmentPath = Oplog.TempPath() / DataHash.ToHexString(); + if (std::filesystem::is_regular_file(AttachmentPath)) + { + AttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath); + if (AttachmentBuffer.GetSize() != PayloadSize) + { + AttachmentBuffer = IoBuffer{}; + } + } + if (!AttachmentBuffer) + { + BasicFile BlockFile; + BlockFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete); + uint64_t Offset = 0; + for (const SharedBuffer& Buffer : Compressed.GetCompressed().GetSegments()) + { + BlockFile.Write(Buffer.GetView(), Offset); + Offset += Buffer.GetSize(); + } + void* FileHandle = BlockFile.Detach(); + AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset); + + AttachmentBuffer.MarkAsDeleteOnClose(); + ZEN_DEBUG("Saved temp attachment {}, {}", DataHash, NiceBytes(PayloadSize)); + } + OutLooseAttachments->insert_or_assign(DataHash, AttachmentBuffer); + } + else + { + // If it is small we just hang on to the compressed buffer + OutLooseAttachments->insert_or_assign(DataHash, Compressed.GetCompressed().Flatten().AsIoBuffer()); + } + } + + // Rewrite file array entry with new data reference + CbObjectWriter Writer; + RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { + if (Field.GetName() == "data"sv) + { + // omit this field as we will write it explicitly ourselves + return true; + } + return false; + }); + Writer.AddBinaryAttachment("data"sv, DataHash); + + CbObject RewrittenOp = Writer.Save(); + Cbo.AddObject(std::move(RewrittenOp)); + CopyField = false; + + Attachments.insert_or_assign(DataHash, LSN); + } + } + + if (CopyField) + { + Cbo.AddField(Field); + } + else + { + OpRewritten = true; + } + } + + if (!OpRewritten) + { + CB(Op); + return; + } + + Cbo.EndArray(); + CbArray FilesArray = Cbo.Save().AsArray(); + + CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { + if (Field.GetName() == "files"sv) + { + NewWriter.AddArray("files"sv, FilesArray); + + return true; + } + + return false; + }); + CB(RewrittenOp); + }; + + Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObject Op) { Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert_or_assign(FieldView.AsAttachment(), LSN); }); - (SectionOpsWriter) << Op; + if (OutLooseAttachments != nullptr) + { + RewriteOp(LSN, Op, [&SectionOpsWriter](CbObject Op) { SectionOpsWriter << Op; }); + } + else + { + SectionOpsWriter << Op; + } OpCount++; }); + auto GetPayload = [&](const IoHash& AttachmentHash) { + if (OutLooseAttachments != nullptr) + { + auto PayloadIt = OutLooseAttachments->find(AttachmentHash); + if (PayloadIt != OutLooseAttachments->end()) + { + return PayloadIt->second; + } + } + return ChunkStore.FindChunkByCid(AttachmentHash); + }; + for (const auto& It : Attachments) { const IoHash& AttachmentHash(It.first); - IoBuffer Payload = ChunkStore.FindChunkByCid(AttachmentHash); + IoBuffer Payload = GetPayload(AttachmentHash); if (!Payload) { std::optional<CbObject> Op = Oplog.GetOpByIndex(It.second); @@ -252,6 +401,7 @@ BuildContainer(CidStore& ChunkStore, return {}; } + uint64_t PayloadSize = Payload.GetSize(); if (PayloadSize > MaxChunkEmbedSize) { @@ -412,13 +562,15 @@ BuildContainer(CidStore& ChunkStore, RemoteProjectStore::LoadContainerResult BuildContainer(CidStore& ChunkStore, + ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, bool BuildBlocks, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, const std::function<void(const IoHash&)>& OnLargeAttachment, - const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks) + const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, + tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutOptionalTempAttachments) { // We are creating a worker thread pool here since we are uploading a lot of attachments in one go and we dont want to keep a // WorkerThreadPool alive @@ -427,6 +579,7 @@ BuildContainer(CidStore& ChunkStore, AsyncRemoteResult RemoteResult; CbObject ContainerObject = BuildContainer(ChunkStore, + Project, Oplog, MaxBlockSize, MaxChunkEmbedSize, @@ -435,19 +588,22 @@ BuildContainer(CidStore& ChunkStore, AsyncOnBlock, OnLargeAttachment, OnBlockChunks, + OutOptionalTempAttachments, RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } RemoteProjectStore::Result -SaveOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - bool BuildBlocks, - bool UseTempBlocks, - bool ForceUpload) +SaveOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Project& Project, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool EmbedLooseFiles, + bool BuildBlocks, + bool UseTempBlocks, + bool ForceUpload) { using namespace std::literals; @@ -543,7 +699,9 @@ SaveOplog(CidStore& ChunkStore, OnBlock = UploadBlock; } - CbObject OplogContainerObject = BuildContainer(ChunkStore, + tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> TempAttachments; + CbObject OplogContainerObject = BuildContainer(ChunkStore, + Project, Oplog, MaxBlockSize, MaxChunkEmbedSize, @@ -552,6 +710,7 @@ SaveOplog(CidStore& ChunkStore, OnBlock, OnLargeAttachment, OnBlockChunks, + EmbedLooseFiles ? &TempAttachments : nullptr, /* out */ RemoteResult); if (!RemoteResult.IsError()) @@ -606,51 +765,56 @@ SaveOplog(CidStore& ChunkStore, break; } SaveAttachmentsLatch.AddCount(1); - WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks]() { - auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); - if (RemoteResult.IsError()) - { - return; - } + WorkerPool.ScheduleWork( + [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, &TempAttachments]() { + auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); + if (RemoteResult.IsError()) + { + return; + } - IoBuffer Payload; - if (auto It = CreatedBlocks.find(RawHash); It != CreatedBlocks.end()) - { - Payload = std::move(It->second); - } - else - { - Payload = ChunkStore.FindChunkByCid(RawHash); - } - if (!Payload) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to find attachment {}", RawHash), - {}); - ZEN_ERROR("Failed to build container ({}). Reason: '{}'", - RemoteResult.GetErrorReason(), - RemoteResult.GetError()); - return; - } + IoBuffer Payload; + if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) + { + Payload = std::move(BlockIt->second); + } + else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end()) + { + Payload = std::move(LooseTmpFileIt->second); + } + else + { + Payload = ChunkStore.FindChunkByCid(RawHash); + } + if (!Payload) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), + fmt::format("Failed to find attachment {}", RawHash), + {}); + ZEN_ERROR("Failed to build container ({}). Reason: '{}'", + RemoteResult.GetErrorReason(), + RemoteResult.GetError()); + return; + } - RemoteProjectStore::SaveAttachmentResult Result = - RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); - if (Result.ErrorCode) - { - RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); - ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", + RemoteProjectStore::SaveAttachmentResult Result = + RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); + if (Result.ErrorCode) + { + RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); + ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'", + RawHash, + NiceBytes(Payload.GetSize()), + RemoteResult.GetError(), + RemoteResult.GetErrorReason()); + return; + } + ZEN_DEBUG("Saved attachment {}, {} in {}", RawHash, NiceBytes(Payload.GetSize()), - RemoteResult.GetError(), - RemoteResult.GetErrorReason()); + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); return; - } - ZEN_DEBUG("Saved attachment {}, {} in {}", - RawHash, - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))); - return; - }); + }); } } @@ -776,6 +940,7 @@ SaveOplog(CidStore& ChunkStore, ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining()); } SaveAttachmentsLatch.Wait(); + TempAttachments.clear(); } if (!RemoteResult.IsError()) diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index 9b7d26dbe..6fb6e739e 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -81,13 +81,16 @@ struct RemoteStoreOptions RemoteProjectStore::LoadContainerResult BuildContainer( CidStore& ChunkStore, + ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, bool BuildBlocks, const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock, const std::function<void(const IoHash&)>& OnLargeAttachment, - const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks); + const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, + tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* + OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files" RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, @@ -95,14 +98,16 @@ RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock, const std::function<void(const IoHash& RawHash)>& OnNeedAttachment); -RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - size_t MaxBlockSize, - size_t MaxChunkEmbedSize, - bool BuildBlocks, - bool UseTempBlocks, - bool ForceUpload); +RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, + RemoteProjectStore& RemoteStore, + ProjectStore::Project& Project, + ProjectStore::Oplog& Oplog, + size_t MaxBlockSize, + size_t MaxChunkEmbedSize, + bool EmbedLooseFiles, + bool BuildBlocks, + bool UseTempBlocks, + bool ForceUpload); RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload); |