aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-08-24 12:38:51 +0200
committerGitHub <[email protected]>2023-08-24 12:38:51 +0200
commit26134d49cc2b5174cbee1ce0b8c1b023f5e451eb (patch)
treeec8c80b0f95c8fbe6d23aaebc517abc0d01a52fa /src
parentcrash in process cache (#375) (diff)
downloadzen-26134d49cc2b5174cbee1ce0b8c1b023f5e451eb.tar.xz
zen-26134d49cc2b5174cbee1ce0b8c1b023f5e451eb.zip
Add `--embedloosefiles` option to `oplog-export` (#376)
* Add `--embedloosefiles` option to `oplog-export` which adds loose files to the export, removing need to call `oplog-snapshot` * Retain `ServerPath` in oplog when performing `oplog-snapshot`. This is a short-term fix for current incompatability with the UE cooker.
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/projectstore.cpp10
-rw-r--r--src/zen/cmds/projectstore.h1
-rw-r--r--src/zenserver/projectstore/projectstore.cpp15
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp277
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h23
5 files changed, 253 insertions, 73 deletions
diff --git a/src/zen/cmds/projectstore.cpp b/src/zen/cmds/projectstore.cpp
index bc526ce8c..5814b9671 100644
--- a/src/zen/cmds/projectstore.cpp
+++ b/src/zen/cmds/projectstore.cpp
@@ -466,6 +466,12 @@ ExportOplogCommand::ExportOplogCommand()
"Max size for attachment to be bundled",
cxxopts::value(m_MaxChunkEmbedSize),
"<chunksize>");
+ m_Options.add_option("",
+ "",
+ "embedloosefiles",
+ "Export additional files referenced by path as attachments",
+ cxxopts::value(m_EmbedLooseFiles),
+ "<embedloosefiles>");
m_Options.add_option("", "f", "force", "Force export of all attachments", cxxopts::value(m_Force), "<force>");
m_Options.add_option("",
"",
@@ -665,6 +671,10 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
{
Writer.AddInteger("maxchunkembedsize"sv, m_MaxChunkEmbedSize);
}
+ if (m_EmbedLooseFiles)
+ {
+ Writer.AddBool("embedloosefiles"sv, true);
+ }
if (m_Force)
{
Writer.AddBool("force"sv, true);
diff --git a/src/zen/cmds/projectstore.h b/src/zen/cmds/projectstore.h
index d67aed9aa..6ab49becf 100644
--- a/src/zen/cmds/projectstore.h
+++ b/src/zen/cmds/projectstore.h
@@ -121,6 +121,7 @@ private:
std::string m_OplogName;
uint64_t m_MaxBlockSize = 0;
uint64_t m_MaxChunkEmbedSize = 0;
+ bool m_EmbedLooseFiles = false;
bool m_Force = false;
bool m_DisableBlocks = false;
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 06d5221c4..c37b8c8d4 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -2322,13 +2322,15 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
ChunkStore,
+ *Project.Get(),
*Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
false,
[](CompressedBuffer&&, const IoHash) {},
[](const IoHash&) {},
- [](const std::unordered_set<IoHash, IoHash::Hasher>) {});
+ [](const std::unordered_set<IoHash, IoHash::Hasher>) {},
+ nullptr);
OutResponse = std::move(ContainerResult.ContainerObject);
return ConvertResult(ContainerResult);
@@ -2569,7 +2571,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
Oid ChunkId = View["id"sv].AsObjectId();
IoBuffer FileIoBuffer = DataFile.ReadAll();
- CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(FileIoBuffer));
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
const IoHash RawHash = Compressed.DecodeRawHash();
const uint64_t RawSize = Compressed.DecodeRawSize();
IoBuffer CompressedBuffer = Compressed.GetCompressed().Flatten().AsIoBuffer();
@@ -2592,12 +2594,6 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq,
// omit this field as we will write it explicitly ourselves
return true;
}
- else if (Field.GetName() == "serverpath"sv)
- {
- // omit this field as it's not relevant if there is a hash
- return true;
- }
-
return false;
});
Writer.AddBinaryAttachment("data"sv, RawHash);
@@ -2688,6 +2684,7 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u);
size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
bool Force = Params["force"sv].AsBool(false);
+ bool EmbedLooseFile = Params["embedloosefiles"sv].AsBool(false);
std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize);
@@ -2708,9 +2705,11 @@ ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog,
RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
*RemoteStore,
+ Project,
Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
+ EmbedLooseFile,
StoreInfo.CreateBlocks,
StoreInfo.UseTempBlockFiles,
Force);
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 1abd18a5c..2806bc2d1 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -3,6 +3,7 @@
#include "remoteprojectstore.h"
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryutil.h>
#include <zencore/compress.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
@@ -189,6 +190,7 @@ AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks)
CbObject
BuildContainer(CidStore& ChunkStore,
+ ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
@@ -197,6 +199,7 @@ BuildContainer(CidStore& ChunkStore,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
const std::function<void(const IoHash&)>& OnLargeAttachment,
const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
+ tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutLooseAttachments,
AsyncRemoteResult& RemoteResult)
{
using namespace std::literals;
@@ -217,20 +220,166 @@ BuildContainer(CidStore& ChunkStore,
std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
- size_t BlockSize = 0;
- std::vector<SharedBuffer> ChunksInBlock;
-
+ size_t BlockSize = 0;
+ std::vector<SharedBuffer> ChunksInBlock;
std::unordered_map<IoHash, int, IoHash::Hasher> Attachments;
- Oplog.IterateOplogWithKey([&Attachments, &SectionOpsWriter, &OpCount](int LSN, const Oid&, CbObject Op) {
+
+ auto RewriteOp = [&](int LSN, CbObject Op, const std::function<void(CbObject)>& CB) {
+ bool OpRewritten = false;
+ CbArrayView Files = Op["files"sv].AsArrayView();
+ if (Files.Num() == 0)
+ {
+ CB(Op);
+ return;
+ }
+
+ CbWriter Cbo;
+ Cbo.BeginArray("files"sv);
+
+ for (CbFieldView& Field : Files)
+ {
+ bool CopyField = true;
+
+ if (CbObjectView View = Field.AsObjectView())
+ {
+ IoHash DataHash = View["data"sv].AsHash();
+
+ if (DataHash == IoHash::Zero)
+ {
+ {
+ // Read file contents into memory and compress
+
+ std::string_view ServerPath = View["serverpath"sv].AsString();
+ std::filesystem::path FilePath = Project.RootDir / ServerPath;
+ BasicFile DataFile;
+ DataFile.Open(FilePath, BasicFile::Mode::kRead);
+
+ IoBuffer FileIoBuffer = DataFile.ReadAll();
+ DataFile.Close();
+
+ CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer)));
+
+ DataHash = Compressed.DecodeRawHash();
+ uint64_t PayloadSize = Compressed.GetCompressed().GetSize();
+ if (PayloadSize > MaxChunkEmbedSize)
+ {
+ // Write it out as a temporary file
+ IoBuffer AttachmentBuffer;
+ std::filesystem::path AttachmentPath = Oplog.TempPath() / DataHash.ToHexString();
+ if (std::filesystem::is_regular_file(AttachmentPath))
+ {
+ AttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath);
+ if (AttachmentBuffer.GetSize() != PayloadSize)
+ {
+ AttachmentBuffer = IoBuffer{};
+ }
+ }
+ if (!AttachmentBuffer)
+ {
+ BasicFile BlockFile;
+ BlockFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete);
+ uint64_t Offset = 0;
+ for (const SharedBuffer& Buffer : Compressed.GetCompressed().GetSegments())
+ {
+ BlockFile.Write(Buffer.GetView(), Offset);
+ Offset += Buffer.GetSize();
+ }
+ void* FileHandle = BlockFile.Detach();
+ AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset);
+
+ AttachmentBuffer.MarkAsDeleteOnClose();
+ ZEN_DEBUG("Saved temp attachment {}, {}", DataHash, NiceBytes(PayloadSize));
+ }
+ OutLooseAttachments->insert_or_assign(DataHash, AttachmentBuffer);
+ }
+ else
+ {
+ // If it is small we just hang on to the compressed buffer
+ OutLooseAttachments->insert_or_assign(DataHash, Compressed.GetCompressed().Flatten().AsIoBuffer());
+ }
+ }
+
+ // Rewrite file array entry with new data reference
+ CbObjectWriter Writer;
+ RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool {
+ if (Field.GetName() == "data"sv)
+ {
+ // omit this field as we will write it explicitly ourselves
+ return true;
+ }
+ return false;
+ });
+ Writer.AddBinaryAttachment("data"sv, DataHash);
+
+ CbObject RewrittenOp = Writer.Save();
+ Cbo.AddObject(std::move(RewrittenOp));
+ CopyField = false;
+
+ Attachments.insert_or_assign(DataHash, LSN);
+ }
+ }
+
+ if (CopyField)
+ {
+ Cbo.AddField(Field);
+ }
+ else
+ {
+ OpRewritten = true;
+ }
+ }
+
+ if (!OpRewritten)
+ {
+ CB(Op);
+ return;
+ }
+
+ Cbo.EndArray();
+ CbArray FilesArray = Cbo.Save().AsArray();
+
+ CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool {
+ if (Field.GetName() == "files"sv)
+ {
+ NewWriter.AddArray("files"sv, FilesArray);
+
+ return true;
+ }
+
+ return false;
+ });
+ CB(RewrittenOp);
+ };
+
+ Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObject Op) {
Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert_or_assign(FieldView.AsAttachment(), LSN); });
- (SectionOpsWriter) << Op;
+ if (OutLooseAttachments != nullptr)
+ {
+ RewriteOp(LSN, Op, [&SectionOpsWriter](CbObject Op) { SectionOpsWriter << Op; });
+ }
+ else
+ {
+ SectionOpsWriter << Op;
+ }
OpCount++;
});
+ auto GetPayload = [&](const IoHash& AttachmentHash) {
+ if (OutLooseAttachments != nullptr)
+ {
+ auto PayloadIt = OutLooseAttachments->find(AttachmentHash);
+ if (PayloadIt != OutLooseAttachments->end())
+ {
+ return PayloadIt->second;
+ }
+ }
+ return ChunkStore.FindChunkByCid(AttachmentHash);
+ };
+
for (const auto& It : Attachments)
{
const IoHash& AttachmentHash(It.first);
- IoBuffer Payload = ChunkStore.FindChunkByCid(AttachmentHash);
+ IoBuffer Payload = GetPayload(AttachmentHash);
if (!Payload)
{
std::optional<CbObject> Op = Oplog.GetOpByIndex(It.second);
@@ -252,6 +401,7 @@ BuildContainer(CidStore& ChunkStore,
return {};
}
+
uint64_t PayloadSize = Payload.GetSize();
if (PayloadSize > MaxChunkEmbedSize)
{
@@ -412,13 +562,15 @@ BuildContainer(CidStore& ChunkStore,
RemoteProjectStore::LoadContainerResult
BuildContainer(CidStore& ChunkStore,
+ ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
bool BuildBlocks,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
const std::function<void(const IoHash&)>& OnLargeAttachment,
- const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks)
+ const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
+ tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutOptionalTempAttachments)
{
// We are creating a worker thread pool here since we are uploading a lot of attachments in one go and we dont want to keep a
// WorkerThreadPool alive
@@ -427,6 +579,7 @@ BuildContainer(CidStore& ChunkStore,
AsyncRemoteResult RemoteResult;
CbObject ContainerObject = BuildContainer(ChunkStore,
+ Project,
Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
@@ -435,19 +588,22 @@ BuildContainer(CidStore& ChunkStore,
AsyncOnBlock,
OnLargeAttachment,
OnBlockChunks,
+ OutOptionalTempAttachments,
RemoteResult);
return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
}
RemoteProjectStore::Result
-SaveOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Oplog& Oplog,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize,
- bool BuildBlocks,
- bool UseTempBlocks,
- bool ForceUpload)
+SaveOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Project& Project,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool EmbedLooseFiles,
+ bool BuildBlocks,
+ bool UseTempBlocks,
+ bool ForceUpload)
{
using namespace std::literals;
@@ -543,7 +699,9 @@ SaveOplog(CidStore& ChunkStore,
OnBlock = UploadBlock;
}
- CbObject OplogContainerObject = BuildContainer(ChunkStore,
+ tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher> TempAttachments;
+ CbObject OplogContainerObject = BuildContainer(ChunkStore,
+ Project,
Oplog,
MaxBlockSize,
MaxChunkEmbedSize,
@@ -552,6 +710,7 @@ SaveOplog(CidStore& ChunkStore,
OnBlock,
OnLargeAttachment,
OnBlockChunks,
+ EmbedLooseFiles ? &TempAttachments : nullptr,
/* out */ RemoteResult);
if (!RemoteResult.IsError())
@@ -606,51 +765,56 @@ SaveOplog(CidStore& ChunkStore,
break;
}
SaveAttachmentsLatch.AddCount(1);
- WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks]() {
- auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
- if (RemoteResult.IsError())
- {
- return;
- }
+ WorkerPool.ScheduleWork(
+ [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, &TempAttachments]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
- IoBuffer Payload;
- if (auto It = CreatedBlocks.find(RawHash); It != CreatedBlocks.end())
- {
- Payload = std::move(It->second);
- }
- else
- {
- Payload = ChunkStore.FindChunkByCid(RawHash);
- }
- if (!Payload)
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
- fmt::format("Failed to find attachment {}", RawHash),
- {});
- ZEN_ERROR("Failed to build container ({}). Reason: '{}'",
- RemoteResult.GetErrorReason(),
- RemoteResult.GetError());
- return;
- }
+ IoBuffer Payload;
+ if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
+ {
+ Payload = std::move(BlockIt->second);
+ }
+ else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end())
+ {
+ Payload = std::move(LooseTmpFileIt->second);
+ }
+ else
+ {
+ Payload = ChunkStore.FindChunkByCid(RawHash);
+ }
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_ERROR("Failed to build container ({}). Reason: '{}'",
+ RemoteResult.GetErrorReason(),
+ RemoteResult.GetError());
+ return;
+ }
- RemoteProjectStore::SaveAttachmentResult Result =
- RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
- if (Result.ErrorCode)
- {
- RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
- ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
RawHash,
NiceBytes(Payload.GetSize()),
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason());
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
return;
- }
- ZEN_DEBUG("Saved attachment {}, {} in {}",
- RawHash,
- NiceBytes(Payload.GetSize()),
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
- return;
- });
+ });
}
}
@@ -776,6 +940,7 @@ SaveOplog(CidStore& ChunkStore,
ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining());
}
SaveAttachmentsLatch.Wait();
+ TempAttachments.clear();
}
if (!RemoteResult.IsError())
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index 9b7d26dbe..6fb6e739e 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -81,13 +81,16 @@ struct RemoteStoreOptions
RemoteProjectStore::LoadContainerResult BuildContainer(
CidStore& ChunkStore,
+ ProjectStore::Project& Project,
ProjectStore::Oplog& Oplog,
size_t MaxBlockSize,
size_t MaxChunkEmbedSize,
bool BuildBlocks,
const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
const std::function<void(const IoHash&)>& OnLargeAttachment,
- const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks);
+ const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
+ tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>*
+ OutOptionalTempAttachments); // Set OutOptionalTempAttachments to nullptr to avoid embedding loose "additional files"
RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog,
const CbObject& ContainerObject,
@@ -95,14 +98,16 @@ RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog,
const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
const std::function<void(const IoHash& RawHash)>& OnNeedAttachment);
-RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Oplog& Oplog,
- size_t MaxBlockSize,
- size_t MaxChunkEmbedSize,
- bool BuildBlocks,
- bool UseTempBlocks,
- bool ForceUpload);
+RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Project& Project,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool EmbedLooseFiles,
+ bool BuildBlocks,
+ bool UseTempBlocks,
+ bool ForceUpload);
RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload);