aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore/remoteprojectstore.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/projectstore/remoteprojectstore.cpp
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenserver/projectstore/remoteprojectstore.cpp')
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp1036
1 files changed, 1036 insertions, 0 deletions
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
new file mode 100644
index 000000000..1e6ca51a1
--- /dev/null
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -0,0 +1,1036 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "remoteprojectstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compress.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
+#include <zencore/timer.h>
+#include <zencore/workthreadpool.h>
+#include <zenstore/cidstore.h>
+
+namespace zen {
+
+/*
+ OplogContainer
+ Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller
+ {
+ CbArray("ops")
+ {
+ CbObject Op
+ (CbFieldType::BinaryAttachment Attachments[])
+ (OpData)
+ }
+ }
+ CbArray("blocks")
+ CbObject
+ CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File)
+ CbArray("chunks")
+ CbFieldType::Hash // Chunk hashes
+ CbArray("chunks") // Optional, only if we are not creating blocks (Zen)
+ CbFieldType::BinaryAttachment // Chunk attachment hashes
+
+ CompressedBinary ChunkBlock
+ {
+ VarUInt ChunkCount
+ VarUInt ChunkSizes[ChunkCount]
+ uint8_t[chunksize])[ChunkCount]
+ }
+*/
+
+////////////////////////////// AsyncRemoteResult
+
+struct AsyncRemoteResult
+{
+ void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText)
+ {
+ int32_t Expected = 0;
+ if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1))
+ {
+ m_ErrorReason = ErrorReason;
+ m_ErrorText = ErrorText;
+ }
+ }
+ bool IsError() const { return m_ErrorCode.load() != 0; }
+ int GetError() const { return m_ErrorCode.load(); };
+ const std::string& GetErrorReason() const { return m_ErrorReason; };
+ const std::string& GetErrorText() const { return m_ErrorText; };
+ RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const
+ {
+ return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText};
+ }
+
+private:
+ std::atomic<int32_t> m_ErrorCode = 0;
+ std::string m_ErrorReason;
+ std::string m_ErrorText;
+};
+
+bool
+IterateBlock(IoBuffer&& CompressedBlock, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
+{
+ IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer();
+
+ MemoryView BlockView = BlockPayload.GetView();
+ const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
+ uint32_t NumberSize;
+ uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize);
+ ReadPtr += NumberSize;
+ std::vector<uint64_t> ChunkSizes;
+ ChunkSizes.reserve(ChunkCount);
+ while (ChunkCount--)
+ {
+ ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize));
+ ReadPtr += NumberSize;
+ }
+ ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr);
+ ZEN_ASSERT(TempBufferLength > 0);
+ for (uint64_t ChunkSize : ChunkSizes)
+ {
+ IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize);
+ IoHash AttachmentRawHash;
+ uint64_t AttachmentRawSize;
+ CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize);
+
+ if (!CompressedChunk)
+ {
+ ZEN_ERROR("Invalid chunk in block");
+ return false;
+ }
+ Visitor(std::move(CompressedChunk), AttachmentRawHash);
+ ReadPtr += ChunkSize;
+ ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd());
+ }
+ return true;
+};
+
+CompressedBuffer
+GenerateBlock(std::vector<SharedBuffer>&& Chunks)
+{
+ size_t ChunkCount = Chunks.size();
+ SharedBuffer SizeBuffer;
+ {
+ IoBuffer TempBuffer(ChunkCount * 9);
+ MutableMemoryView View = TempBuffer.GetMutableView();
+ uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData());
+ uint8_t* BufferEndPtr = BufferStartPtr;
+ BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr);
+ auto It = Chunks.begin();
+ while (It != Chunks.end())
+ {
+ BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(It->GetSize()), BufferEndPtr);
+ It++;
+ }
+ ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd());
+ ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
+ SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
+ }
+ CompositeBuffer AllBuffers(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks)));
+
+ CompressedBuffer CompressedBlock =
+ CompressedBuffer::Compress(std::move(AllBuffers), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+
+ return CompressedBlock;
+}
+
+struct Block
+{
+ IoHash BlockHash;
+ std::vector<IoHash> ChunksInBlock;
+};
+
+void
+CreateBlock(WorkerThreadPool& WorkerPool,
+ Latch& OpSectionsLatch,
+ std::vector<SharedBuffer>&& ChunksInBlock,
+ RwLock& SectionsLock,
+ std::vector<Block>& Blocks,
+ size_t BlockIndex,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ AsyncRemoteResult& RemoteResult)
+{
+ OpSectionsLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable {
+ auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ if (!Chunks.empty())
+ {
+ CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ AsyncOnBlock(std::move(CompressedBlock), BlockHash);
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope __(SectionsLock);
+ Blocks[BlockIndex].BlockHash = BlockHash;
+ }
+ }
+ });
+}
+
+size_t
+AddBlock(RwLock& BlocksLock, std::vector<Block>& Blocks)
+{
+ size_t BlockIndex;
+ {
+ RwLock::ExclusiveLockScope _(BlocksLock);
+ BlockIndex = Blocks.size();
+ Blocks.resize(BlockIndex + 1);
+ }
+ return BlockIndex;
+}
+
+CbObject
+BuildContainer(CidStore& ChunkStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ WorkerThreadPool& WorkerPool,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
+ AsyncRemoteResult& RemoteResult)
+{
+ using namespace std::literals;
+
+ std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+ CbObjectWriter SectionOpsWriter;
+ SectionOpsWriter.BeginArray("ops"sv);
+
+ size_t OpCount = 0;
+
+ CbObject OplogContainerObject;
+ {
+ RwLock BlocksLock;
+ std::vector<Block> Blocks;
+ CompressedBuffer OpsBuffer;
+
+ Latch BlockCreateLatch(1);
+
+ std::unordered_set<IoHash, IoHash::Hasher> BlockAttachmentHashes;
+
+ size_t BlockSize = 0;
+ std::vector<SharedBuffer> ChunksInBlock;
+
+ std::unordered_set<IoHash, IoHash::Hasher> Attachments;
+ Oplog.IterateOplog([&Attachments, &SectionOpsWriter, &OpCount](CbObject Op) {
+ Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert(FieldView.AsAttachment()); });
+ (SectionOpsWriter) << Op;
+ OpCount++;
+ });
+
+ for (const IoHash& AttachmentHash : Attachments)
+ {
+ IoBuffer Payload = ChunkStore.FindChunkByCid(AttachmentHash);
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {} for op", AttachmentHash),
+ {});
+ ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason());
+ return {};
+ }
+ uint64_t PayloadSize = Payload.GetSize();
+ if (PayloadSize > MaxChunkEmbedSize)
+ {
+ if (LargeChunkHashes.insert(AttachmentHash).second)
+ {
+ OnLargeAttachment(AttachmentHash);
+ }
+ continue;
+ }
+
+ if (!BlockAttachmentHashes.insert(AttachmentHash).second)
+ {
+ continue;
+ }
+
+ BlockSize += PayloadSize;
+ if (BuildBlocks)
+ {
+ ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload)));
+ }
+ else
+ {
+ Payload = {};
+ }
+
+ if (BlockSize >= MaxBlockSize)
+ {
+ size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ if (BuildBlocks)
+ {
+ CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ }
+ else
+ {
+ OnBlockChunks(BlockAttachmentHashes);
+ }
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
+ }
+ BlockAttachmentHashes.clear();
+ ChunksInBlock.clear();
+ BlockSize = 0;
+ }
+ }
+ if (BlockSize > 0)
+ {
+ size_t BlockIndex = AddBlock(BlocksLock, Blocks);
+ if (BuildBlocks)
+ {
+ CreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
+ }
+ else
+ {
+ OnBlockChunks(BlockAttachmentHashes);
+ }
+ {
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(),
+ BlockAttachmentHashes.begin(),
+ BlockAttachmentHashes.end());
+ }
+ BlockAttachmentHashes.clear();
+ ChunksInBlock.clear();
+ BlockSize = 0;
+ }
+ SectionOpsWriter.EndArray(); // "ops"
+
+ CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer());
+ ZEN_DEBUG("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize()));
+
+ BlockCreateLatch.CountDown();
+ while (!BlockCreateLatch.Wait(1000))
+ {
+ ZEN_INFO("Creating blocks, {} remaining...", BlockCreateLatch.Remaining());
+ }
+
+ if (!RemoteResult.IsError())
+ {
+ CbObjectWriter OplogContinerWriter;
+ RwLock::SharedLockScope _(BlocksLock);
+ OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer());
+
+ OplogContinerWriter.BeginArray("blocks"sv);
+ {
+ for (const Block& B : Blocks)
+ {
+ ZEN_ASSERT(!B.ChunksInBlock.empty());
+ if (BuildBlocks)
+ {
+ ZEN_ASSERT(B.BlockHash != IoHash::Zero);
+
+ OplogContinerWriter.BeginObject();
+ {
+ OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash);
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : B.ChunksInBlock)
+ {
+ OplogContinerWriter.AddHash(RawHash);
+ }
+ }
+ OplogContinerWriter.EndArray(); // "chunks"
+ }
+ OplogContinerWriter.EndObject();
+ continue;
+ }
+
+ ZEN_ASSERT(B.BlockHash == IoHash::Zero);
+ OplogContinerWriter.BeginObject();
+ {
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& RawHash : B.ChunksInBlock)
+ {
+ OplogContinerWriter.AddBinaryAttachment(RawHash);
+ }
+ }
+ OplogContinerWriter.EndArray();
+ }
+ OplogContinerWriter.EndObject();
+ }
+ }
+ OplogContinerWriter.EndArray(); // "blocks"sv
+
+ OplogContinerWriter.BeginArray("chunks"sv);
+ {
+ for (const IoHash& AttachmentHash : LargeChunkHashes)
+ {
+ OplogContinerWriter.AddBinaryAttachment(AttachmentHash);
+ }
+ }
+ OplogContinerWriter.EndArray(); // "chunks"
+
+ OplogContainerObject = OplogContinerWriter.Save();
+ }
+ }
+ return OplogContainerObject;
+}
+
+RemoteProjectStore::LoadContainerResult
+BuildContainer(CidStore& ChunkStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ const std::function<void(CompressedBuffer&&, const IoHash&)>& AsyncOnBlock,
+ const std::function<void(const IoHash&)>& OnLargeAttachment,
+ const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks)
+{
+ // We are creating a worker thread pool here since we are uploading a lot of attachments in one go and we dont want to keep a
+ // WorkerThreadPool alive
+ size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+
+ AsyncRemoteResult RemoteResult;
+ CbObject ContainerObject = BuildContainer(ChunkStore,
+ Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ BuildBlocks,
+ WorkerPool,
+ AsyncOnBlock,
+ OnLargeAttachment,
+ OnBlockChunks,
+ RemoteResult);
+ return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject};
+}
+
+RemoteProjectStore::Result
+SaveOplog(CidStore& ChunkStore,
+ RemoteProjectStore& RemoteStore,
+ ProjectStore::Oplog& Oplog,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize,
+ bool BuildBlocks,
+ bool UseTempBlocks,
+ bool ForceUpload)
+{
+ using namespace std::literals;
+
+ Stopwatch Timer;
+
+ // We are creating a worker thread pool here since we are uploading a lot of attachments in one go
+ // Doing upload is a rare and transient occation so we don't want to keep a WorkerThreadPool alive.
+ size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+
+ std::filesystem::path AttachmentTempPath;
+ if (UseTempBlocks)
+ {
+ AttachmentTempPath = Oplog.TempPath();
+ AttachmentTempPath.append(".pending");
+ CreateDirectories(AttachmentTempPath);
+ }
+
+ AsyncRemoteResult RemoteResult;
+ RwLock AttachmentsLock;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeAttachments;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> CreatedBlocks;
+
+ auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
+ const IoHash& BlockHash) {
+ std::filesystem::path BlockPath = AttachmentTempPath;
+ BlockPath.append(BlockHash.ToHexString());
+ if (!std::filesystem::exists(BlockPath))
+ {
+ IoBuffer BlockBuffer;
+ try
+ {
+ BasicFile BlockFile;
+ BlockFile.Open(BlockPath, BasicFile::Mode::kTruncateDelete);
+ uint64_t Offset = 0;
+ for (const SharedBuffer& Buffer : CompressedBlock.GetCompressed().GetSegments())
+ {
+ BlockFile.Write(Buffer.GetView(), Offset);
+ Offset += Buffer.GetSize();
+ }
+ void* FileHandle = BlockFile.Detach();
+ BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset);
+ }
+ catch (std::exception& Ex)
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ Ex.what(),
+ "Unable to create temp block file");
+ return;
+ }
+
+ BlockBuffer.MarkAsDeleteOnClose();
+ {
+ RwLock::ExclusiveLockScope __(AttachmentsLock);
+ CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)});
+ }
+ ZEN_DEBUG("Saved temp block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
+ }
+ };
+
+ auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) {
+ RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError());
+ return;
+ }
+ ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
+ };
+
+ std::vector<std::vector<IoHash>> BlockChunks;
+ auto OnBlockChunks = [&BlockChunks](const std::unordered_set<IoHash, IoHash::Hasher>& Chunks) {
+ BlockChunks.push_back({Chunks.begin(), Chunks.end()});
+ ZEN_DEBUG("Found {} block chunks", Chunks.size());
+ };
+
+ auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments](const IoHash& AttachmentHash) {
+ {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ LargeAttachments.insert(AttachmentHash);
+ }
+ ZEN_DEBUG("Found attachment {}", AttachmentHash);
+ };
+
+ std::function<void(CompressedBuffer&&, const IoHash&)> OnBlock;
+ if (UseTempBlocks)
+ {
+ OnBlock = MakeTempBlock;
+ }
+ else
+ {
+ OnBlock = UploadBlock;
+ }
+
+ CbObject OplogContainerObject = BuildContainer(ChunkStore,
+ Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ BuildBlocks,
+ WorkerPool,
+ OnBlock,
+ OnLargeAttachment,
+ OnBlockChunks,
+ RemoteResult);
+
+ if (!RemoteResult.IsError())
+ {
+ uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num();
+ uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num();
+ ZEN_INFO("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount);
+ RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer());
+ if (ContainerSaveResult.ErrorCode)
+ {
+ RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container");
+ ZEN_ERROR("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError());
+ }
+ ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerSaveResult.ElapsedSeconds * 1000)));
+ if (!ContainerSaveResult.Needs.empty())
+ {
+ ZEN_INFO("Filtering needed attachments...");
+ std::vector<IoHash> NeededLargeAttachments;
+ std::unordered_set<IoHash, IoHash::Hasher> NeededOtherAttachments;
+ NeededLargeAttachments.reserve(LargeAttachments.size());
+ NeededOtherAttachments.reserve(CreatedBlocks.size());
+ if (ForceUpload)
+ {
+ NeededLargeAttachments.insert(NeededLargeAttachments.end(), LargeAttachments.begin(), LargeAttachments.end());
+ }
+ else
+ {
+ for (const IoHash& RawHash : ContainerSaveResult.Needs)
+ {
+ if (LargeAttachments.contains(RawHash))
+ {
+ NeededLargeAttachments.push_back(RawHash);
+ continue;
+ }
+ NeededOtherAttachments.insert(RawHash);
+ }
+ }
+
+ Latch SaveAttachmentsLatch(1);
+ if (!NeededLargeAttachments.empty())
+ {
+ ZEN_INFO("Saving large attachments...");
+ for (const IoHash& RawHash : NeededLargeAttachments)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ SaveAttachmentsLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ IoBuffer Payload;
+ if (auto It = CreatedBlocks.find(RawHash); It != CreatedBlocks.end())
+ {
+ Payload = std::move(It->second);
+ }
+ else
+ {
+ Payload = ChunkStore.FindChunkByCid(RawHash);
+ }
+ if (!Payload)
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
+ fmt::format("Failed to find attachment {}", RawHash),
+ {});
+ ZEN_ERROR("Failed to build container ({}). Reason: '{}'",
+ RemoteResult.GetErrorReason(),
+ RemoteResult.GetError());
+ return;
+ }
+
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
+ }
+ }
+
+ if (!CreatedBlocks.empty())
+ {
+ ZEN_INFO("Saving created block attachments...");
+ for (auto& It : CreatedBlocks)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ const IoHash& RawHash = It.first;
+ if (ForceUpload || NeededOtherAttachments.contains(RawHash))
+ {
+ IoBuffer Payload = It.second;
+ ZEN_ASSERT(Payload);
+ SaveAttachmentsLatch.AddCount(1);
+ WorkerPool.ScheduleWork(
+ [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ RemoteProjectStore::SaveAttachmentResult Result =
+ RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachment '{}', {} ({}). Reason: '{}'",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+
+ ZEN_DEBUG("Saved attachment {}, {} in {}",
+ RawHash,
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return;
+ });
+ }
+ It.second = {};
+ }
+ }
+
+ if (!BlockChunks.empty())
+ {
+ ZEN_INFO("Saving chunk block attachments...");
+ for (const std::vector<IoHash>& Chunks : BlockChunks)
+ {
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ std::vector<IoHash> NeededChunks;
+ if (ForceUpload)
+ {
+ NeededChunks = Chunks;
+ }
+ else
+ {
+ NeededChunks.reserve(Chunks.size());
+ for (const IoHash& Chunk : Chunks)
+ {
+ if (NeededOtherAttachments.contains(Chunk))
+ {
+ NeededChunks.push_back(Chunk);
+ }
+ }
+ if (NeededChunks.empty())
+ {
+ continue;
+ }
+ }
+ SaveAttachmentsLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&RemoteStore,
+ &ChunkStore,
+ &SaveAttachmentsLatch,
+ &RemoteResult,
+ &Chunks,
+ NeededChunks = std::move(NeededChunks),
+ ForceUpload]() {
+ auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); });
+ std::vector<SharedBuffer> ChunkBuffers;
+ ChunkBuffers.reserve(NeededChunks.size());
+ for (const IoHash& Chunk : NeededChunks)
+ {
+ IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk);
+ if (!ChunkPayload)
+ {
+ RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
+ fmt::format("Missing chunk {}"sv, Chunk),
+ fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk));
+ ChunkBuffers.clear();
+ break;
+ }
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload)));
+ }
+ RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to save attachments with {} chunks ({}). Reason: '{}'",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Saved {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ });
+ }
+ }
+ SaveAttachmentsLatch.CountDown();
+ while (!SaveAttachmentsLatch.Wait(1000))
+ {
+ ZEN_INFO("Saving attachments, {} remaining...", SaveAttachmentsLatch.Remaining());
+ }
+ SaveAttachmentsLatch.Wait();
+ }
+
+ if (!RemoteResult.IsError())
+ {
+ ZEN_INFO("Finalizing oplog container...");
+ RemoteProjectStore::Result ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash);
+ if (ContainerFinalizeResult.ErrorCode)
+ {
+ RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text);
+ ZEN_ERROR("Failed to finalize oplog container {} ({}). Reason: '{}'",
+ ContainerSaveResult.RawHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ }
+ ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast<uint64_t>(ContainerFinalizeResult.ElapsedSeconds * 1000)));
+ }
+ }
+
+ RemoteProjectStore::Result Result = RemoteResult.ConvertResult();
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+ ZEN_INFO("Saved oplog {} in {}",
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ return Result;
+};
+
+RemoteProjectStore::Result
+SaveOplogContainer(ProjectStore::Oplog& Oplog,
+ const CbObject& ContainerObject,
+ const std::function<bool(const IoHash& RawHash)>& HasAttachment,
+ const std::function<void(const IoHash& BlockHash, std::vector<IoHash>&& Chunks)>& OnNeedBlock,
+ const std::function<void(const IoHash& RawHash)>& OnNeedAttachment)
+{
+ using namespace std::literals;
+
+ Stopwatch Timer;
+
+ CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView();
+ for (CbFieldView LargeChunksField : LargeChunksArray)
+ {
+ IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment();
+ if (HasAttachment(AttachmentHash))
+ {
+ continue;
+ }
+ OnNeedAttachment(AttachmentHash);
+ };
+
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+ for (CbFieldView BlockField : BlocksArray)
+ {
+ CbObjectView BlockView = BlockField.AsObjectView();
+ IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment();
+
+ CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView();
+ if (BlockHash == IoHash::Zero)
+ {
+ std::vector<IoHash> NeededChunks;
+ NeededChunks.reserve(ChunksArray.GetSize());
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ IoHash ChunkHash = ChunkField.AsBinaryAttachment();
+ if (HasAttachment(ChunkHash))
+ {
+ continue;
+ }
+ NeededChunks.emplace_back(ChunkHash);
+ }
+
+ if (!NeededChunks.empty())
+ {
+ OnNeedBlock(IoHash::Zero, std::move(NeededChunks));
+ }
+ continue;
+ }
+
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ IoHash ChunkHash = ChunkField.AsHash();
+ if (HasAttachment(ChunkHash))
+ {
+ continue;
+ }
+
+ OnNeedBlock(BlockHash, {});
+ break;
+ }
+ };
+
+ MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
+ IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
+ IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
+
+ CbObject SectionObject = LoadCompactBinaryObject(SectionPayload);
+ if (!SectionObject)
+ {
+ ZEN_ERROR("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type");
+ return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
+ Timer.GetElapsedTimeMs() / 1000.500,
+ "Section has unexpected data type",
+ "Failed to save oplog container"};
+ }
+
+ CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView();
+ for (CbFieldView OpEntry : OpsArray)
+ {
+ CbObjectView Core = OpEntry.AsObjectView();
+ BinaryWriter Writer;
+ Core.CopyTo(Writer);
+ MemoryView OpView = Writer.GetView();
+ IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize());
+ CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
+ const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op);
+ if (OpLsn == ProjectStore::Oplog::kInvalidOp)
+ {
+ return RemoteProjectStore::Result{gsl::narrow<int>(HttpResponseCode::BadRequest),
+ Timer.GetElapsedTimeMs() / 1000.500,
+ "Failed saving op",
+ "Failed to save oplog container"};
+ }
+ ZEN_DEBUG("oplog entry #{}", OpLsn);
+ }
+ return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500};
+}
+
+RemoteProjectStore::Result
+LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload)
+{
+ using namespace std::literals;
+
+ Stopwatch Timer;
+
+ // We are creating a worker thread pool here since we are download a lot of attachments in one go and we dont want to keep a
+ // WorkerThreadPool alive
+ size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+
+ std::unordered_set<IoHash, IoHash::Hasher> Attachments;
+ std::vector<std::vector<IoHash>> ChunksInBlocks;
+
+ RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer();
+ if (LoadContainerResult.ErrorCode)
+ {
+ ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode);
+ return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode,
+ .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500,
+ .Reason = LoadContainerResult.Reason,
+ .Text = LoadContainerResult.Text};
+ }
+ ZEN_DEBUG("Loaded container in {}", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)));
+
+ AsyncRemoteResult RemoteResult;
+ Latch AttachmentsWorkLatch(1);
+
+ auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) {
+ return !ForceDownload && ChunkStore.ContainsChunk(RawHash);
+ };
+ auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &RemoteResult](
+ const IoHash& BlockHash,
+ std::vector<IoHash>&& Chunks) {
+ if (BlockHash == IoHash::Zero)
+ {
+ AttachmentsWorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+
+ RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
+ if (Result.ErrorCode)
+ {
+ RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
+ ZEN_ERROR("Failed to attachments with {} chunks ({}). Reason: '{}'",
+ Chunks.size(),
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Loaded {} bulk attachments in {}",
+ Chunks.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)));
+ for (const auto& It : Result.Chunks)
+ {
+ ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly);
+ }
+ });
+ return;
+ }
+ AttachmentsWorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
+ if (BlockResult.ErrorCode)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ ZEN_ERROR("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)));
+
+ if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
+ }))
+ {
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Invalid format for block {}", BlockHash),
+ {});
+ ZEN_ERROR("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'",
+ BlockHash,
+ RemoteResult.GetError(),
+ RemoteResult.GetErrorReason());
+ return;
+ }
+ });
+ };
+
+ auto OnNeedAttachment =
+ [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments](const IoHash& RawHash) {
+ if (!Attachments.insert(RawHash).second)
+ {
+ return;
+ }
+
+ AttachmentsWorkLatch.AddCount(1);
+ WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() {
+ auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); });
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ ZEN_ERROR("Failed to download attachment {}, reason: '{}', error code: {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode);
+ return;
+ }
+ ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
+ ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
+ });
+ };
+
+ RemoteProjectStore::Result Result =
+ SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+
+ AttachmentsWorkLatch.CountDown();
+ while (!AttachmentsWorkLatch.Wait(1000))
+ {
+ ZEN_INFO("Loading attachments, {} remaining...", AttachmentsWorkLatch.Remaining());
+ }
+ AttachmentsWorkLatch.Wait();
+ if (Result.ErrorCode == 0)
+ {
+ Result = RemoteResult.ConvertResult();
+ }
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500;
+
+ ZEN_INFO("Loaded oplog {} in {}",
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)));
+
+ return Result;
+}
+
+} // namespace zen