// Copyright Epic Games, Inc. All Rights Reserved. #include "remoteprojectstore.h" #include #include #include #include #include #include #include #include #include #include #include #include #include namespace zen { /* OplogContainer Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller { CbArray("ops") { CbObject Op (CbFieldType::BinaryAttachment Attachments[]) (OpData) } } CbArray("blocks") CbObject CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File) CbArray("chunks") CbFieldType::Hash // Chunk hashes CbArray("chunks") // Optional, only if we are not creating blocks (Zen) CbFieldType::BinaryAttachment // Chunk attachment hashes CompressedBinary ChunkBlock { VarUInt ChunkCount VarUInt ChunkSizes[ChunkCount] uint8_t[chunksize])[ChunkCount] } */ ////////////////////////////// AsyncRemoteResult struct AsyncRemoteResult { void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText) { int32_t Expected = 0; if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1)) { m_ErrorReason = ErrorReason; m_ErrorText = ErrorText; } } bool IsError() const { return m_ErrorCode.load() != 0; } int GetError() const { return m_ErrorCode.load(); }; const std::string& GetErrorReason() const { return m_ErrorReason; }; const std::string& GetErrorText() const { return m_ErrorText; }; RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const { return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText}; } private: std::atomic m_ErrorCode = 0; std::string m_ErrorReason; std::string m_ErrorText; }; void ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_t Total, ptrdiff_t Remaining) { if (OptionalContext) { ZEN_ASSERT(Total > 0); OptionalContext->ReportProgress(CurrentOp, gsl::narrow((100 * (Total - Remaining)) / Total)); } ZEN_INFO("{}", CurrentOp); } void ReportMessage(JobContext* OptionalContext, std::string_view Message) { if (OptionalContext) { OptionalContext->ReportMessage(Message); } ZEN_INFO("{}", Message); } bool IsCancelled(JobContext* OptionalContext) { if (!OptionalContext) { return false; } return OptionalContext->IsCancelled(); } bool IterateBlock(IoBuffer&& CompressedBlock, std::function Visitor) { IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer(); MemoryView BlockView = BlockPayload.GetView(); const uint8_t* ReadPtr = reinterpret_cast(BlockView.GetData()); uint32_t NumberSize; uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); ReadPtr += NumberSize; std::vector ChunkSizes; ChunkSizes.reserve(ChunkCount); while (ChunkCount--) { ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize)); ReadPtr += NumberSize; } ptrdiff_t TempBufferLength = std::distance(reinterpret_cast(BlockView.GetData()), ReadPtr); ZEN_ASSERT(TempBufferLength > 0); for (uint64_t ChunkSize : ChunkSizes) { IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize); IoHash AttachmentRawHash; uint64_t AttachmentRawSize; CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); if (!CompressedChunk) { ZEN_ERROR("Invalid chunk in block"); return false; } Visitor(std::move(CompressedChunk), AttachmentRawHash); ReadPtr += ChunkSize; ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd()); } return true; }; CompressedBuffer GenerateBlock(std::vector&& Chunks) { size_t ChunkCount = Chunks.size(); SharedBuffer SizeBuffer; { IoBuffer TempBuffer(ChunkCount * 9); MutableMemoryView View = TempBuffer.GetMutableView(); uint8_t* BufferStartPtr = reinterpret_cast(View.GetData()); uint8_t* BufferEndPtr = BufferStartPtr; BufferEndPtr += WriteVarUInt(gsl::narrow(ChunkCount), BufferEndPtr); auto It = Chunks.begin(); while (It != Chunks.end()) { BufferEndPtr += WriteVarUInt(gsl::narrow(It->GetSize()), BufferEndPtr); It++; } ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); SizeBuffer = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow(TempBufferLength))); } CompositeBuffer AllBuffers(std::move(SizeBuffer), CompositeBuffer(std::move(Chunks))); CompressedBuffer CompressedBlock = CompressedBuffer::Compress(std::move(AllBuffers), OodleCompressor::Mermaid, OodleCompressionLevel::None); return CompressedBlock; } struct Block { IoHash BlockHash; std::vector ChunksInBlock; }; void CreateBlock(WorkerThreadPool& WorkerPool, Latch& OpSectionsLatch, std::vector&& ChunksInBlock, RwLock& SectionsLock, std::vector& Blocks, size_t BlockIndex, const std::function& AsyncOnBlock, AsyncRemoteResult& RemoteResult) { ZEN_INFO("Generating block with {} attachments", ChunksInBlock.size()); OpSectionsLatch.AddCount(1); WorkerPool.ScheduleWork( [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable { auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } if (!Chunks.empty()) { CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); // Move to callback and return IoHash IoHash BlockHash = CompressedBlock.DecodeRawHash(); AsyncOnBlock(std::move(CompressedBlock), BlockHash); { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope __(SectionsLock); Blocks[BlockIndex].BlockHash = BlockHash; } } }); } size_t AddBlock(RwLock& BlocksLock, std::vector& Blocks) { size_t BlockIndex; { RwLock::ExclusiveLockScope _(BlocksLock); BlockIndex = Blocks.size(); Blocks.resize(BlockIndex + 1); } return BlockIndex; } CbObject BuildContainer(CidStore& ChunkStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, bool BuildBlocks, const std::vector& KnownBlocks, WorkerThreadPool& WorkerPool, const std::function& AsyncOnBlock, const std::function& OnLargeAttachment, const std::function)>& OnBlockChunks, tsl::robin_map* OutLooseAttachments, JobContext* OptionalContext, AsyncRemoteResult& RemoteResult) { using namespace std::literals; std::unordered_set LargeChunkHashes; CbObjectWriter SectionOpsWriter; SectionOpsWriter.BeginArray("ops"sv); size_t OpCount = 0; CbObject OplogContainerObject; { RwLock BlocksLock; std::vector Blocks; CompressedBuffer OpsBuffer; std::unordered_set BlockAttachmentHashes; size_t BlockSize = 0; std::vector ChunksInBlock; std::unordered_map Attachments; auto RewriteOp = [&](int LSN, CbObjectView Op, const std::function& CB) { bool OpRewritten = false; CbArrayView Files = Op["files"sv].AsArrayView(); if (Files.Num() == 0) { CB(Op); return; } CbWriter Cbo; Cbo.BeginArray("files"sv); for (CbFieldView& Field : Files) { bool CopyField = true; if (CbObjectView View = Field.AsObjectView()) { IoHash DataHash = View["data"sv].AsHash(); if (DataHash == IoHash::Zero) { { // Read file contents into memory and compress std::string_view ServerPath = View["serverpath"sv].AsString(); std::filesystem::path FilePath = Project.RootDir / ServerPath; BasicFile DataFile; DataFile.Open(FilePath, BasicFile::Mode::kRead); IoBuffer FileIoBuffer = DataFile.ReadAll(); DataFile.Close(); CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(FileIoBuffer))); DataHash = Compressed.DecodeRawHash(); uint64_t PayloadSize = Compressed.GetCompressed().GetSize(); if (PayloadSize > MaxChunkEmbedSize) { // Write it out as a temporary file IoBuffer AttachmentBuffer; std::filesystem::path AttachmentPath = Oplog.TempPath() / DataHash.ToHexString(); if (std::filesystem::is_regular_file(AttachmentPath)) { AttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath); if (AttachmentBuffer.GetSize() != PayloadSize) { AttachmentBuffer = IoBuffer{}; } } if (!AttachmentBuffer) { BasicFile BlockFile; uint32_t RetriesLeft = 3; BlockFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) { if (RetriesLeft == 0) { return false; } ZEN_WARN("Failed to create temp attachment '{}', reason: '{}', retries left: {}.", AttachmentPath, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; }); uint64_t Offset = 0; for (const SharedBuffer& Buffer : Compressed.GetCompressed().GetSegments()) { BlockFile.Write(Buffer.GetView(), Offset); Offset += Buffer.GetSize(); } void* FileHandle = BlockFile.Detach(); AttachmentBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); AttachmentBuffer.SetDeleteOnClose(true); ZEN_DEBUG("Saved temp attachment {}, {}", DataHash, NiceBytes(PayloadSize)); } OutLooseAttachments->insert_or_assign(DataHash, AttachmentBuffer); } else { // If it is small we just hang on to the compressed buffer OutLooseAttachments->insert_or_assign(DataHash, Compressed.GetCompressed().Flatten().AsIoBuffer()); } } // Rewrite file array entry with new data reference CbObjectWriter Writer; RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { if (Field.GetName() == "data"sv) { // omit this field as we will write it explicitly ourselves return true; } return false; }); Writer.AddBinaryAttachment("data"sv, DataHash); CbObject RewrittenOp = Writer.Save(); Cbo.AddObject(std::move(RewrittenOp)); CopyField = false; Attachments.insert_or_assign(DataHash, LSN); } } if (CopyField) { Cbo.AddField(Field); } else { OpRewritten = true; } } if (!OpRewritten) { CB(Op); return; } Cbo.EndArray(); CbArray FilesArray = Cbo.Save().AsArray(); CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { if (Field.GetName() == "files"sv) { NewWriter.AddArray("files"sv, FilesArray); return true; } return false; }); CB(RewrittenOp); }; ReportMessage(OptionalContext, "Building exported oplog and fetching attachments"); tsl::robin_map OpLSNToKey; Oplog.IterateOplogWithKey([&](int LSN, const Oid&, CbObjectView Op) { if (RemoteResult.IsError()) { return; } std::string_view Key = Op["key"sv].AsString(); OpLSNToKey.insert({LSN, std::string(Key)}); Op.IterateAttachments([&](CbFieldView FieldView) { Attachments.insert({FieldView.AsAttachment(), LSN}); }); if (OutLooseAttachments != nullptr) { RewriteOp(LSN, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); } else { SectionOpsWriter << Op; } OpCount++; if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); } }); if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); return {}; } if (!Attachments.empty() && !KnownBlocks.empty()) { ReportMessage(OptionalContext, fmt::format("Checking {} known blocks for reuse", KnownBlocks.size())); size_t ReusedBlockCount = 0; for (const Block& KnownBlock : KnownBlocks) { size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); if (BlockAttachmentCount == 0) { continue; } size_t FoundAttachmentCount = 0; for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) { if (Attachments.contains(KnownHash)) { FoundAttachmentCount++; } } size_t ReusePercent = (FoundAttachmentCount * 100) / BlockAttachmentCount; // TODO: Configure reuse-level if (ReusePercent > 80) { ZEN_DEBUG("Reusing block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, FoundAttachmentCount, ReusePercent); for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) { Attachments.erase(KnownHash); } BlocksLock.WithExclusiveLock([&]() { Blocks.push_back(KnownBlock); }); ReusedBlockCount++; } else if (FoundAttachmentCount > 0) { ZEN_DEBUG("Skipping block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, FoundAttachmentCount, ReusePercent); } } ReportMessage(OptionalContext, fmt::format("Reusing {} out of {} known blocks", ReusedBlockCount, KnownBlocks.size())); } ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", Attachments.size(), OpLSNToKey.size())); // Sort attachments so we get predictable blocks for the same oplog upload std::vector SortedAttachments; SortedAttachments.reserve(Attachments.size()); for (const auto& It : Attachments) { SortedAttachments.push_back(It.first); } std::sort(SortedAttachments.begin(), SortedAttachments.end(), [&Attachments, &OpLSNToKey](const IoHash& Lhs, const IoHash& Rhs) { auto LhsLNSIt = Attachments.find(Lhs); ZEN_ASSERT_SLOW(LhsLNSIt != Attachments.end()); auto RhsLNSIt = Attachments.find(Rhs); ZEN_ASSERT_SLOW(RhsLNSIt != Attachments.end()); if (LhsLNSIt->second == RhsLNSIt->second) { return Lhs < Rhs; } auto LhsKeyIt = OpLSNToKey.find(LhsLNSIt->second); ZEN_ASSERT_SLOW(LhsKeyIt != OpLSNToKey.end()); auto RhsKeyIt = OpLSNToKey.find(RhsLNSIt->second); ZEN_ASSERT_SLOW(RhsKeyIt != OpLSNToKey.end()); return LhsKeyIt->second < RhsKeyIt->second; }); if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); return {}; } ReportMessage(OptionalContext, fmt::format("Assembling {} attachments from {} ops into blocks and loose attachments", SortedAttachments.size(), OpLSNToKey.size())); auto GetPayload = [&](const IoHash& AttachmentHash) { if (OutLooseAttachments != nullptr) { auto PayloadIt = OutLooseAttachments->find(AttachmentHash); if (PayloadIt != OutLooseAttachments->end()) { return PayloadIt->second; } } return ChunkStore.FindChunkByCid(AttachmentHash); }; int LastLSNOp = -1; size_t GeneratedBlockCount = 0; size_t LargeAttachmentCount = 0; Latch BlockCreateLatch(1); for (const IoHash& AttachmentHash : SortedAttachments) { if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); } return {}; } auto It = Attachments.find(AttachmentHash); ZEN_ASSERT(It != Attachments.end()); IoBuffer Payload = GetPayload(AttachmentHash); if (!Payload) { std::optional Op = Oplog.GetOpByIndex(It->second); ZEN_ASSERT(Op.has_value()); ExtendableStringBuilder<1024> Sb; Sb.Append("Failed to find attachment '"); Sb.Append(AttachmentHash.ToHexString()); Sb.Append("' for op: \n"); Op.value().ToJson(Sb); RemoteResult.SetError(gsl::narrow(HttpResponseCode::NotFound), Sb.ToString(), {}); ZEN_ERROR("Failed to build container ({}). Reason: '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); } return {}; } uint64_t PayloadSize = Payload.GetSize(); if (PayloadSize > MaxChunkEmbedSize) { if (LargeChunkHashes.insert(AttachmentHash).second) { OnLargeAttachment(AttachmentHash); LargeAttachmentCount++; } continue; } if (!BlockAttachmentHashes.insert(AttachmentHash).second) { continue; } const int CurrentOpLSN = It->second; BlockSize += PayloadSize; if (BuildBlocks) { ChunksInBlock.emplace_back(SharedBuffer(std::move(Payload))); } else { Payload = {}; } if (BlockSize >= MaxBlockSize && (CurrentOpLSN != LastLSNOp)) { size_t BlockIndex = AddBlock(BlocksLock, Blocks); if (BuildBlocks) { CreateBlock(WorkerPool, BlockCreateLatch, std::move(ChunksInBlock), BlocksLock, Blocks, BlockIndex, AsyncOnBlock, RemoteResult); } else { ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); OnBlockChunks(BlockAttachmentHashes); } { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope _(BlocksLock); Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), BlockAttachmentHashes.begin(), BlockAttachmentHashes.end()); } BlockAttachmentHashes.clear(); ChunksInBlock.clear(); BlockSize = 0; GeneratedBlockCount++; } LastLSNOp = CurrentOpLSN; } if (BlockSize > 0) { size_t BlockIndex = AddBlock(BlocksLock, Blocks); if (BuildBlocks) { CreateBlock(WorkerPool, BlockCreateLatch, std::move(ChunksInBlock), BlocksLock, Blocks, BlockIndex, AsyncOnBlock, RemoteResult); } else { ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); OnBlockChunks(BlockAttachmentHashes); } { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope _(BlocksLock); Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), BlockAttachmentHashes.begin(), BlockAttachmentHashes.end()); } BlockAttachmentHashes.clear(); ChunksInBlock.clear(); BlockSize = 0; GeneratedBlockCount++; } SectionOpsWriter.EndArray(); // "ops" if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { ZEN_INFO("Aborting, {} blocks remaining...", BlockCreateLatch.Remaining()); } return {}; } ReportMessage(OptionalContext, fmt::format("Assembled {} attachments from {} ops into {} blocks and {} loose attachments", SortedAttachments.size(), OpLSNToKey.size(), GeneratedBlockCount, LargeAttachmentCount)); CompressedBuffer CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer()); if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { ptrdiff_t Remaining = BlockCreateLatch.Remaining(); ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining); } if (GeneratedBlockCount > 0) { ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0); } return {}; } ReportMessage(OptionalContext, fmt::format("Added oplog section {}, {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.GetCompressedSize()))); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { ptrdiff_t Remaining = BlockCreateLatch.Remaining(); if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); while (!BlockCreateLatch.Wait(1000)) { Remaining = BlockCreateLatch.Remaining(); ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining); } ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0); return {}; } ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", Remaining), GeneratedBlockCount, Remaining); } if (GeneratedBlockCount > 0) { ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0); } if (!RemoteResult.IsError()) { CbObjectWriter OplogContinerWriter; RwLock::SharedLockScope _(BlocksLock); OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); OplogContinerWriter.BeginArray("blocks"sv); { for (const Block& B : Blocks) { ZEN_ASSERT(!B.ChunksInBlock.empty()); if (BuildBlocks) { ZEN_ASSERT(B.BlockHash != IoHash::Zero); OplogContinerWriter.BeginObject(); { OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); OplogContinerWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : B.ChunksInBlock) { OplogContinerWriter.AddHash(RawHash); } } OplogContinerWriter.EndArray(); // "chunks" } OplogContinerWriter.EndObject(); continue; } ZEN_ASSERT(B.BlockHash == IoHash::Zero); OplogContinerWriter.BeginObject(); { OplogContinerWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : B.ChunksInBlock) { OplogContinerWriter.AddBinaryAttachment(RawHash); } } OplogContinerWriter.EndArray(); } OplogContinerWriter.EndObject(); } } OplogContinerWriter.EndArray(); // "blocks"sv OplogContinerWriter.BeginArray("chunks"sv); { for (const IoHash& AttachmentHash : LargeChunkHashes) { OplogContinerWriter.AddBinaryAttachment(AttachmentHash); } } OplogContinerWriter.EndArray(); // "chunks" OplogContainerObject = OplogContinerWriter.Save(); } } return OplogContainerObject; } RemoteProjectStore::LoadContainerResult BuildContainer(CidStore& ChunkStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, bool BuildBlocks, const std::function& AsyncOnBlock, const std::function& OnLargeAttachment, const std::function)>& OnBlockChunks, tsl::robin_map* OutOptionalTempAttachments) { WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); AsyncRemoteResult RemoteResult; CbObject ContainerObject = BuildContainer(ChunkStore, Project, Oplog, MaxBlockSize, MaxChunkEmbedSize, BuildBlocks, {}, WorkerPool, AsyncOnBlock, OnLargeAttachment, OnBlockChunks, OutOptionalTempAttachments, nullptr, RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } void UploadAttachments(WorkerThreadPool& WorkerPool, CidStore& ChunkStore, RemoteProjectStore& RemoteStore, const std::unordered_set& LargeAttachments, const std::vector>& BlockChunks, const std::unordered_map& CreatedBlocks, const tsl::robin_map& TempAttachments, const std::unordered_set& Needs, bool ForceAll, AsyncRemoteResult& RemoteResult, JobContext* OptionalContext) { using namespace std::literals; if (Needs.empty() && !ForceAll) { return; } ReportMessage(OptionalContext, "Filtering needed attachments..."); std::unordered_set AttachmentsToUpload; size_t BlockAttachmentCountToUpload = 0; size_t LargeAttachmentCountToUpload = 0; std::atomic BulkAttachmentCountToUpload = 0; AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size()); for (const auto& CreatedBlock : CreatedBlocks) { if (ForceAll || Needs.contains(CreatedBlock.first)) { AttachmentsToUpload.insert(CreatedBlock.first); BlockAttachmentCountToUpload++; } } for (const IoHash& LargeAttachment : LargeAttachments) { if (ForceAll || Needs.contains(LargeAttachment)) { AttachmentsToUpload.insert(LargeAttachment); LargeAttachmentCountToUpload++; } } for (const std::vector& BlockHashes : BlockChunks) { if (ForceAll) { AttachmentsToUpload.insert(BlockHashes.begin(), BlockHashes.end()); BulkAttachmentCountToUpload += BlockHashes.size(); continue; } for (const IoHash& Hash : BlockHashes) { if (Needs.contains(Hash)) { AttachmentsToUpload.insert(Hash); BulkAttachmentCountToUpload++; } } } for (const IoHash& Needed : Needs) { if (!AttachmentsToUpload.contains(Needed)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::NotFound), "Invalid attachment", fmt::format("Upload requested of unknown attachment '{}'", Needed)); ZEN_ERROR("Failed to upload attachment '{}'. ({}). Reason: '{}'", Needed, RemoteResult.GetError(), RemoteResult.GetErrorReason()); return; } } if (AttachmentsToUpload.empty()) { ReportMessage(OptionalContext, "No attachments needed"); return; } if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); } return; } ReportMessage(OptionalContext, fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)", AttachmentsToUpload.size(), BlockAttachmentCountToUpload, LargeAttachmentCountToUpload, BulkAttachmentCountToUpload.load())); ptrdiff_t AttachmentsToSave(0); Latch SaveAttachmentsLatch(1); for (const IoHash& RawHash : LargeAttachments) { if (RemoteResult.IsError()) { break; } if (!AttachmentsToUpload.contains(RawHash)) { continue; } IoBuffer Payload; if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) { Payload = BlockIt->second; } else if (auto LooseTmpFileIt = TempAttachments.find(RawHash); LooseTmpFileIt != TempAttachments.end()) { Payload = LooseTmpFileIt->second; } SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; WorkerPool.ScheduleWork( [&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, TempPayload = std::move(Payload)]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } IoBuffer Payload = TempPayload ? TempPayload : ChunkStore.FindChunkByCid(RawHash); if (!Payload) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::NotFound), fmt::format("Failed to find attachment {}", RawHash), {}); ZEN_WARN("Failed to save attachment '{}' ({}). Reason: '{}'", RawHash, RemoteResult.GetErrorReason(), RemoteResult.GetError()); return; } RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ZEN_WARN("Failed to save attachment '{}', {} ({}). Reason: '{}'", RawHash, NiceBytes(Payload.GetSize()), RemoteResult.GetError(), RemoteResult.GetErrorReason()); return; } ZEN_DEBUG("Saved attachment {}, {} in {}", RawHash, NiceBytes(Payload.GetSize()), NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000))); return; }); } if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); } return; } for (auto& It : CreatedBlocks) { if (RemoteResult.IsError()) { break; } const IoHash& RawHash = It.first; if (!AttachmentsToUpload.contains(RawHash)) { continue; } IoBuffer Payload = It.second; ZEN_ASSERT(Payload); SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, Payload = std::move(Payload), RawHash]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(Payload)), RawHash); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ZEN_WARN("Failed to save attachment '{}', {} ({}). Reason: '{}'", RawHash, NiceBytes(Payload.GetSize()), RemoteResult.GetError(), RemoteResult.GetErrorReason()); return; } ZEN_DEBUG("Saved attachment {}, {} in {}", RawHash, NiceBytes(Payload.GetSize()), NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000))); return; }); } if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); } return; } for (const std::vector& Chunks : BlockChunks) { if (RemoteResult.IsError()) { break; } std::vector NeededChunks; NeededChunks.reserve(Chunks.size()); for (const IoHash& Chunk : Chunks) { if (AttachmentsToUpload.contains(Chunk)) { NeededChunks.push_back(Chunk); } } if (NeededChunks.empty()) { continue; } SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &SaveAttachmentsLatch, &RemoteResult, &Chunks, NeededChunks = std::move(NeededChunks), &BulkAttachmentCountToUpload]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); std::vector ChunkBuffers; ChunkBuffers.reserve(NeededChunks.size()); for (const IoHash& Chunk : NeededChunks) { IoBuffer ChunkPayload = ChunkStore.FindChunkByCid(Chunk); if (!ChunkPayload) { RemoteResult.SetError(static_cast(HttpResponseCode::NotFound), fmt::format("Missing chunk {}"sv, Chunk), fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); ChunkBuffers.clear(); break; } ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload))); } RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ZEN_WARN("Failed to save attachments with {} chunks ({}). Reason: '{}'", Chunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason()); return; } ZEN_DEBUG("Saved {} bulk attachments in {}", Chunks.size(), NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000))); BulkAttachmentCountToUpload.fetch_sub(Chunks.size()); }); } SaveAttachmentsLatch.CountDown(); while (!SaveAttachmentsLatch.Wait(1000)) { ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); } } ReportProgress( OptionalContext, fmt::format("Saving attachments, {} remaining...", BlockChunks.empty() ? Remaining : BulkAttachmentCountToUpload.load()), AttachmentsToSave, Remaining); } if (AttachmentsToSave > 0) { ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0); } } RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, bool EmbedLooseFiles, bool BuildBlocks, bool UseTempBlocks, bool ForceUpload, JobContext* OptionalContext) { using namespace std::literals; Stopwatch Timer; WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); std::filesystem::path AttachmentTempPath; if (UseTempBlocks) { AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); CreateDirectories(AttachmentTempPath); } AsyncRemoteResult RemoteResult; RwLock AttachmentsLock; std::unordered_set LargeAttachments; std::unordered_map CreatedBlocks; auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { std::filesystem::path BlockPath = AttachmentTempPath; BlockPath.append(BlockHash.ToHexString()); if (!std::filesystem::exists(BlockPath)) { IoBuffer BlockBuffer; try { BasicFile BlockFile; uint32_t RetriesLeft = 3; BlockFile.Open(BlockPath, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) { if (RetriesLeft == 0) { return false; } ZEN_WARN("Failed to create temporary oplog block '{}', reason: '{}', retries left: {}.", BlockPath, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; }); uint64_t Offset = 0; for (const SharedBuffer& Buffer : CompressedBlock.GetCompressed().GetSegments()) { BlockFile.Write(Buffer.GetView(), Offset); Offset += Buffer.GetSize(); } void* FileHandle = BlockFile.Detach(); BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); } catch (std::exception& Ex) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::InternalServerError), Ex.what(), "Unable to create temp block file"); return; } BlockBuffer.SetDeleteOnClose(true); { RwLock::ExclusiveLockScope __(AttachmentsLock); CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); } ZEN_DEBUG("Saved temp block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); } }; auto UploadBlock = [&RemoteStore, &RemoteResult](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ZEN_WARN("Failed to save attachment ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); return; } ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); }; std::vector> BlockChunks; auto OnBlockChunks = [&BlockChunks](const std::unordered_set& Chunks) { BlockChunks.push_back({Chunks.begin(), Chunks.end()}); ZEN_DEBUG("Found {} block chunks", Chunks.size()); }; auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments](const IoHash& AttachmentHash) { { RwLock::ExclusiveLockScope _(AttachmentsLock); LargeAttachments.insert(AttachmentHash); } ZEN_DEBUG("Found attachment {}", AttachmentHash); }; std::function OnBlock; if (UseTempBlocks) { OnBlock = MakeTempBlock; } else { OnBlock = UploadBlock; } std::vector KnownBlocks; if (BuildBlocks) { ReportMessage(OptionalContext, "Loading oplog base container"); RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer(); if (BaseContainerResult.ErrorCode != static_cast(HttpResponseCode::NoContent)) { if (BaseContainerResult.ErrorCode) { ZEN_WARN("Failed to load oplog base container, reason: '{}', error code: {}", BaseContainerResult.Reason, BaseContainerResult.ErrorCode); } else { CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView(); KnownBlocks.reserve(BlocksArray.Num()); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); std::vector ChunksInBlock; CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); if (BlockHash == IoHash::Zero) { continue; } ChunksInBlock.reserve(ChunksArray.Num()); for (CbFieldView ChunkField : ChunksArray) { ChunksInBlock.push_back(ChunkField.AsHash()); } KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)}); }; } ReportMessage(OptionalContext, fmt::format("Loading oplog base container in {:.3} s", BaseContainerResult.ElapsedSeconds)); } } tsl::robin_map TempAttachments; CbObject OplogContainerObject = BuildContainer(ChunkStore, Project, Oplog, MaxBlockSize, MaxChunkEmbedSize, BuildBlocks, KnownBlocks, WorkerPool, OnBlock, OnLargeAttachment, OnBlockChunks, EmbedLooseFiles ? &TempAttachments : nullptr, OptionalContext, /* out */ RemoteResult); if (!RemoteResult.IsError()) { if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, .Text = "Operation cancelled"}; return Result; } uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); ReportMessage(OptionalContext, fmt::format("Saving oplog container with {} attachments and {} blocks...", ChunkCount, BlockCount)); RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); if (ContainerSaveResult.ErrorCode) { RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); ZEN_WARN("Failed to save oplog container ({}). Reason: '{}'", RemoteResult.GetErrorReason(), RemoteResult.GetError()); } else { ZEN_DEBUG("Saved container in {}", NiceTimeSpanMs(static_cast(ContainerSaveResult.ElapsedSeconds * 1000))); } UploadAttachments(WorkerPool, ChunkStore, RemoteStore, LargeAttachments, BlockChunks, CreatedBlocks, TempAttachments, ContainerSaveResult.Needs, ForceUpload, RemoteResult, OptionalContext); while (!RemoteResult.IsError()) { if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, .Text = "Operation cancelled"}; return Result; } ReportMessage(OptionalContext, "Finalizing oplog container..."); RemoteProjectStore::FinalizeResult ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); if (ContainerFinalizeResult.ErrorCode) { RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); ZEN_WARN("Failed to finalize oplog container {} ({}). Reason: '{}'", ContainerSaveResult.RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); } ZEN_DEBUG("Finalized container in {}", NiceTimeSpanMs(static_cast(ContainerFinalizeResult.ElapsedSeconds * 1000))); if (ContainerFinalizeResult.Needs.empty()) { break; } if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, .Text = "Operation cancelled"}; return Result; } ReportMessage(OptionalContext, fmt::format("Finalize reported {} missing attachments...", ContainerFinalizeResult.Needs.size())); UploadAttachments(WorkerPool, ChunkStore, RemoteStore, LargeAttachments, BlockChunks, CreatedBlocks, TempAttachments, ContainerFinalizeResult.Needs, false, RemoteResult, OptionalContext); } TempAttachments.clear(); CreatedBlocks.clear(); } RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; ZEN_INFO("Saved oplog {} in {}", RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000))); return Result; }; RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, const std::function& HasAttachment, const std::function&& Chunks)>& OnNeedBlock, const std::function& OnNeedAttachment, JobContext* OptionalContext) { using namespace std::literals; Stopwatch Timer; size_t NeedAttachmentCount = 0; CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); for (CbFieldView LargeChunksField : LargeChunksArray) { IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); if (HasAttachment(AttachmentHash)) { continue; } OnNeedAttachment(AttachmentHash); }; ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachements", NeedAttachmentCount, LargeChunksArray.Num())); size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); if (BlockHash == IoHash::Zero) { std::vector NeededChunks; NeededChunks.reserve(ChunksArray.GetSize()); for (CbFieldView ChunkField : ChunksArray) { IoHash ChunkHash = ChunkField.AsBinaryAttachment(); if (HasAttachment(ChunkHash)) { continue; } NeededChunks.emplace_back(ChunkHash); } if (!NeededChunks.empty()) { OnNeedBlock(IoHash::Zero, std::move(NeededChunks)); } continue; } for (CbFieldView ChunkField : ChunksArray) { IoHash ChunkHash = ChunkField.AsHash(); if (HasAttachment(ChunkHash)) { continue; } OnNeedBlock(BlockHash, {}); break; } }; ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); if (!SectionObject) { ZEN_WARN("Failed to save oplog container. Reason: '{}'", "Section has unexpected data type"); return RemoteProjectStore::Result{gsl::narrow(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.500, "Section has unexpected data type", "Failed to save oplog container"}; } CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpsArray.Num())); for (CbFieldView OpEntry : OpsArray) { CbObjectView Core = OpEntry.AsObjectView(); BinaryWriter Writer; Core.CopyTo(Writer); MemoryView OpView = Writer.GetView(); IoBuffer OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize()); CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); const uint32_t OpLsn = Oplog.AppendNewOplogEntry(Op); if (OpLsn == ProjectStore::Oplog::kInvalidOp) { return RemoteProjectStore::Result{gsl::narrow(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.500, "Failed saving op", "Failed to save oplog container"}; } ZEN_DEBUG("oplog entry #{}", OpLsn); } return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500}; } RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload, JobContext* OptionalContext) { using namespace std::literals; Stopwatch Timer; WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); std::unordered_set Attachments; std::vector> ChunksInBlocks; RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); if (LoadContainerResult.ErrorCode) { ZEN_WARN("Failed to load oplog container, reason: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode); return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500, .Reason = LoadContainerResult.Reason, .Text = LoadContainerResult.Text}; } ReportMessage(OptionalContext, fmt::format("Loaded container in {}", NiceTimeSpanMs(static_cast(LoadContainerResult.ElapsedSeconds * 1000)))); AsyncRemoteResult RemoteResult; Latch AttachmentsWorkLatch(1); std::atomic_size_t AttachmentCount = 0; auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { return !ForceDownload && ChunkStore.ContainsChunk(RawHash); }; auto OnNeedBlock = [&RemoteStore, &ChunkStore, &WorkerPool, &ChunksInBlocks, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult]( const IoHash& BlockHash, std::vector&& Chunks) { if (BlockHash == IoHash::Zero) { AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks)]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ZEN_WARN("Failed to load attachments with {} chunks ({}). Reason: '{}'", Chunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason()); return; } ZEN_DEBUG("Loaded {} bulk attachments in {}", Chunks.size(), NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000))); for (const auto& It : Result.Chunks) { ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); } }); return; } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); WorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); if (BlockResult.ErrorCode) { RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); ZEN_WARN("Failed to load oplog container, missing attachment {} ({}). Reason: '{}'", BlockHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); return; } ZEN_DEBUG("Loaded block attachment in {}", NiceTimeSpanMs(static_cast(BlockResult.ElapsedSeconds * 1000))); if (!IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); })) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::InternalServerError), fmt::format("Invalid format for block {}", BlockHash), {}); ZEN_WARN("Failed to load oplog container, attachment {} has invalid format ({}). Reason: '{}'", BlockHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); return; } }); }; auto OnNeedAttachment = [&RemoteStore, &ChunkStore, &WorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments, &AttachmentCount]( const IoHash& RawHash) { if (!Attachments.insert(RawHash).second) { return; } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); if (AttachmentResult.ErrorCode) { RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); ZEN_WARN("Failed to download attachment {}, reason: '{}', error code: {}", RawHash, AttachmentResult.Reason, AttachmentResult.ErrorCode); return; } ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast(AttachmentResult.ElapsedSeconds * 1000))); ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); }); }; RemoteProjectStore::Result Result = SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OptionalContext); if (!Attachments.empty()) { ReportMessage(OptionalContext, fmt::format("Found {} attachments to download", Attachments.size())); } AttachmentsWorkLatch.CountDown(); while (!AttachmentsWorkLatch.Wait(1000)) { ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining(); if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); } } ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining); } if (AttachmentCount.load() > 0) { ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0); } if (Result.ErrorCode == 0) { Result = RemoteResult.ConvertResult(); } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.500; ReportMessage(OptionalContext, fmt::format("Loaded oplog {} in {}", RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000.0)))); return Result; } ////////////////////////////////////////////////////////////////////////// // These are here to avoid vtable leakage RemoteProjectStore::RemoteProjectStore() { } RemoteProjectStore::~RemoteProjectStore() { } } // namespace zen