// Copyright Epic Games, Inc. All Rights Reserved. #include "remoteprojectstore.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace zen { /* OplogContainer Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller { CbArray("ops") { CbObject Op (CbFieldType::BinaryAttachment Attachments[]) (OpData) } } CbArray("blocks") CbObject CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File) CbArray("chunks") CbFieldType::Hash // Chunk hashes CbArray("chunks") // Optional, only if we are not creating blocks (Zen) CbFieldType::BinaryAttachment // Chunk attachment hashes CbArray("chunkedfiles"); CbFieldType::Hash "rawhash" CbFieldType::Integer "rawsize" CbArray("chunks"); CbFieldType::Hash "chunkhash" CbArray("sequence"); CbFieldType::Integer chunks index CompressedBinary ChunkBlock { VarUInt ChunkCount VarUInt ChunkSizes[ChunkCount] uint8_t[chunksize])[ChunkCount] } */ ////////////////////////////// AsyncRemoteResult struct AsyncRemoteResult { void SetError(int32_t ErrorCode, const std::string& ErrorReason, const std::string ErrorText) { int32_t Expected = 0; if (m_ErrorCode.compare_exchange_weak(Expected, ErrorCode ? ErrorCode : -1)) { m_ErrorReason = ErrorReason; m_ErrorText = ErrorText; } } bool IsError() const { return m_ErrorCode.load() != 0; } int GetError() const { return m_ErrorCode.load(); }; const std::string& GetErrorReason() const { return m_ErrorReason; }; const std::string& GetErrorText() const { return m_ErrorText; }; RemoteProjectStore::Result ConvertResult(double ElapsedSeconds = 0.0) const { return RemoteProjectStore::Result{m_ErrorCode, ElapsedSeconds, m_ErrorReason, m_ErrorText}; } private: std::atomic m_ErrorCode = 0; std::string m_ErrorReason; std::string m_ErrorText; }; void ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, ptrdiff_t Total, ptrdiff_t Remaining) { if (OptionalContext) { ZEN_ASSERT(Total > 0); OptionalContext->ReportProgress(CurrentOp, gsl::narrow((100 * (Total - Remaining)) / Total)); } } void ReportMessage(JobContext* OptionalContext, std::string_view Message) { if (OptionalContext) { OptionalContext->ReportMessage(Message); } ZEN_INFO("{}", Message); } bool IsCancelled(JobContext* OptionalContext) { if (!OptionalContext) { return false; } return OptionalContext->IsCancelled(); } bool IterateBlock(IoBuffer&& CompressedBlock, std::function Visitor) { IoBuffer BlockPayload = CompressedBuffer::FromCompressedNoValidate(std::move(CompressedBlock)).Decompress().AsIoBuffer(); MemoryView BlockView = BlockPayload.GetView(); const uint8_t* ReadPtr = reinterpret_cast(BlockView.GetData()); uint32_t NumberSize; uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); ReadPtr += NumberSize; std::vector ChunkSizes; ChunkSizes.reserve(ChunkCount); while (ChunkCount--) { ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize)); ReadPtr += NumberSize; } ptrdiff_t TempBufferLength = std::distance(reinterpret_cast(BlockView.GetData()), ReadPtr); ZEN_ASSERT(TempBufferLength > 0); for (uint64_t ChunkSize : ChunkSizes) { IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize); IoHash AttachmentRawHash; uint64_t AttachmentRawSize; CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); if (!CompressedChunk) { ZEN_ERROR("Invalid chunk in block"); return false; } Visitor(std::move(CompressedChunk), AttachmentRawHash); ReadPtr += ChunkSize; ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd()); } return true; }; CompressedBuffer GenerateBlock(std::vector>&& FetchChunks) { std::vector ChunkSegments; ChunkSegments.resize(1); ChunkSegments.reserve(1 + FetchChunks.size()); size_t ChunkCount = FetchChunks.size(); { IoBuffer TempBuffer(ChunkCount * 9); MutableMemoryView View = TempBuffer.GetMutableView(); uint8_t* BufferStartPtr = reinterpret_cast(View.GetData()); uint8_t* BufferEndPtr = BufferStartPtr; BufferEndPtr += WriteVarUInt(gsl::narrow(ChunkCount), BufferEndPtr); for (const auto& It : FetchChunks) { CompositeBuffer Chunk = It.second(It.first); uint64_t ChunkSize = 0; std::span Segments = Chunk.GetSegments(); for (const SharedBuffer& Segment : Segments) { ChunkSize += Segment.GetSize(); ChunkSegments.push_back(Segment); } BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr); } ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow(TempBufferLength))); } CompressedBuffer CompressedBlock = CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); return CompressedBlock; } struct Block { IoHash BlockHash; std::vector ChunksInBlock; }; void CreateBlock(WorkerThreadPool& WorkerPool, Latch& OpSectionsLatch, std::vector>&& ChunksInBlock, RwLock& SectionsLock, std::vector& Blocks, size_t BlockIndex, const std::function& AsyncOnBlock, AsyncRemoteResult& RemoteResult) { OpSectionsLatch.AddCount(1); WorkerPool.ScheduleWork( [&Blocks, &SectionsLock, &OpSectionsLatch, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, &RemoteResult]() mutable { auto _ = MakeGuard([&OpSectionsLatch] { OpSectionsLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } ZEN_ASSERT(!Chunks.empty()); size_t ChunkCount = Chunks.size(); Stopwatch Timer; CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks)); IoHash BlockHash = CompressedBlock.DecodeRawHash(); { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope __(SectionsLock); Blocks[BlockIndex].BlockHash = BlockHash; } uint64_t BlockSize = CompressedBlock.GetCompressedSize(); AsyncOnBlock(std::move(CompressedBlock), BlockHash); ZEN_INFO("Generated block with {} attachments in {} ({})", ChunkCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), NiceBytes(BlockSize)); }); } static size_t AddBlock(RwLock& BlocksLock, std::vector& Blocks) { size_t BlockIndex; { RwLock::ExclusiveLockScope _(BlocksLock); BlockIndex = Blocks.size(); Blocks.resize(BlockIndex + 1); } return BlockIndex; } static IoBuffer WriteToTempFile(CompressedBuffer&& CompressedBuffer, std::filesystem::path Path) { if (std::filesystem::is_regular_file(Path)) { IoBuffer ExistingTempFile = IoBuffer(IoBufferBuilder::MakeFromFile(Path)); if (ExistingTempFile && ExistingTempFile.GetSize() == CompressedBuffer.GetCompressedSize()) { ExistingTempFile.SetDeleteOnClose(true); return ExistingTempFile; } } IoBuffer BlockBuffer; BasicFile BlockFile; uint32_t RetriesLeft = 3; BlockFile.Open(Path, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) { if (RetriesLeft == 0) { return false; } ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", Path, Ec.message(), RetriesLeft); Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; }); uint64_t Offset = 0; { CompositeBuffer Compressed = std::move(CompressedBuffer).GetCompressed(); BasicFileWriter BlockWriter(BlockFile, 64u * 1024u); for (const SharedBuffer& Segment : Compressed.GetSegments()) { size_t SegmentSize = Segment.GetSize(); BlockWriter.Write(Segment.GetData(), SegmentSize, Offset); Offset += SegmentSize; } } void* FileHandle = BlockFile.Detach(); BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); BlockBuffer.SetDeleteOnClose(true); return BlockBuffer; } CbObject BuildContainer(CidStore& ChunkStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, size_t ChunkFileSizeLimit, bool BuildBlocks, bool IgnoreMissingAttachments, const std::vector& KnownBlocks, WorkerThreadPool& WorkerPool, const std::function& AsyncOnBlock, const std::function& OnLargeAttachment, const std::function>&&)>& OnBlockChunks, bool EmbedLooseFiles, JobContext* OptionalContext, AsyncRemoteResult& RemoteResult) { using namespace std::literals; size_t OpCount = 0; CbObject OplogContainerObject; { struct FoundAttachment { std::filesystem::path RawPath; // If not stored in cid uint64_t Size = 0; Oid Key = Oid::Zero; }; std::unordered_map UploadAttachments; RwLock BlocksLock; std::vector Blocks; CompressedBuffer OpsBuffer; std::filesystem::path AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); CreateDirectories(AttachmentTempPath); auto RewriteOp = [&](const Oid& Key, CbObjectView Op, const std::function& CB) { bool OpRewritten = false; CbArrayView Files = Op["files"sv].AsArrayView(); if (Files.Num() == 0) { CB(Op); return; } CbWriter Cbo; Cbo.BeginArray("files"sv); for (CbFieldView& Field : Files) { if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); CB(Op); return; } bool CopyField = true; if (CbObjectView View = Field.AsObjectView()) { IoHash DataHash = View["data"sv].AsHash(); if (DataHash == IoHash::Zero) { std::string_view ServerPath = View["serverpath"sv].AsString(); std::filesystem::path FilePath = Project.RootDir / ServerPath; if (!std::filesystem::is_regular_file(FilePath)) { ReportMessage(OptionalContext, fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId())); if (IgnoreMissingAttachments) { continue; } else { ExtendableStringBuilder<1024> Sb; Sb.Append("Failed to find attachment '"); Sb.Append(FilePath.string()); Sb.Append("' for op: \n"); View.ToJson(Sb); throw std::runtime_error(Sb.ToString()); } } { Stopwatch HashTimer; SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath)); DataHash = IoHash::HashBuffer(CompositeBuffer(DataBuffer)); ZEN_INFO("Hashed loose file '{}' {}: {} in {}", FilePath, NiceBytes(DataBuffer.GetSize()), DataHash, NiceTimeSpanMs(HashTimer.GetElapsedTimeMs())); } // Rewrite file array entry with new data reference CbObjectWriter Writer; RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { if (Field.GetName() == "data"sv) { // omit this field as we will write it explicitly ourselves return true; } return false; }); Writer.AddBinaryAttachment("data"sv, DataHash); UploadAttachments.insert_or_assign(DataHash, FoundAttachment{.RawPath = FilePath, .Key = Key}); CbObject RewrittenOp = Writer.Save(); Cbo.AddObject(std::move(RewrittenOp)); CopyField = false; } } if (CopyField) { Cbo.AddField(Field); } else { OpRewritten = true; } } if (!OpRewritten) { CB(Op); return; } Cbo.EndArray(); CbArray FilesArray = Cbo.Save().AsArray(); CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { if (Field.GetName() == "files"sv) { NewWriter.AddArray("files"sv, FilesArray); return true; } return false; }); CB(RewrittenOp); }; ReportMessage(OptionalContext, "Building exported oplog and collecting attachments"); Stopwatch Timer; size_t TotalOpCount = Oplog.GetOplogEntryCount(); CompressedBuffer CompressedOpsSection; { Stopwatch RewriteOplogTimer; CbObjectWriter SectionOpsWriter; SectionOpsWriter.BeginArray("ops"sv); { Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) { if (RemoteResult.IsError()) { return; } Op.IterateAttachments([&](CbFieldView FieldView) { UploadAttachments.insert_or_assign(FieldView.AsAttachment(), FoundAttachment{.Key = Key}); }); if (EmbedLooseFiles) { RewriteOp(Key, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); } else { SectionOpsWriter << Op; } OpCount++; if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } if (OpCount % 1000 == 0) { ReportProgress(OptionalContext, fmt::format("Building oplog: {} ops processed", OpCount), TotalOpCount, TotalOpCount - OpCount); } }); if (RemoteResult.IsError()) { return {}; } if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } if (TotalOpCount > 0) { ReportProgress(OptionalContext, fmt::format("Building oplog: {} ops processed", OpCount), TotalOpCount, 0); } } SectionOpsWriter.EndArray(); // "ops" ReportMessage(OptionalContext, fmt::format("Rewrote {} ops to new oplog in {}", OpCount, NiceTimeSpanMs(static_cast(RewriteOplogTimer.GetElapsedTimeMs())))); { Stopwatch CompressOpsTimer; CompressedOpsSection = CompressedBuffer::Compress(SectionOpsWriter.Save().GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast); ReportMessage(OptionalContext, fmt::format("Compressed oplog section {} ({} -> {}) in {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.DecodeRawSize()), NiceBytes(CompressedOpsSection.GetCompressedSize()), NiceTimeSpanMs(static_cast(CompressOpsTimer.GetElapsedTimeMs())))); } } if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } auto FindReuseBlocks = [](const std::vector& KnownBlocks, const std::unordered_set& Attachments, JobContext* OptionalContext) -> std::vector { std::vector ReuseBlockIndexes; if (!Attachments.empty() && !KnownBlocks.empty()) { ReportMessage( OptionalContext, fmt::format("Checking {} Attachments against {} known blocks for reuse", Attachments.size(), KnownBlocks.size())); Stopwatch ReuseTimer; for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) { const Block& KnownBlock = KnownBlocks[KnownBlockIndex]; size_t BlockAttachmentCount = KnownBlock.ChunksInBlock.size(); if (BlockAttachmentCount == 0) { continue; } size_t FoundAttachmentCount = 0; for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) { if (Attachments.contains(KnownHash)) { FoundAttachmentCount++; } } size_t ReusePercent = (FoundAttachmentCount * 100) / BlockAttachmentCount; // TODO: Configure reuse-level if (ReusePercent > 80) { ZEN_DEBUG("Reusing block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, FoundAttachmentCount, ReusePercent); ReuseBlockIndexes.push_back(KnownBlockIndex); } else if (FoundAttachmentCount > 0) { ZEN_DEBUG("Skipping block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, FoundAttachmentCount, ReusePercent); } } } return ReuseBlockIndexes; }; std::unordered_set FoundHashes; FoundHashes.reserve(UploadAttachments.size()); for (const auto& It : UploadAttachments) { FoundHashes.insert(It.first); } size_t ReusedAttachmentCount = 0; std::vector ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { const Block& KnownBlock = KnownBlocks[KnownBlockIndex]; for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) { if (UploadAttachments.erase(KnownHash) == 1) { ReusedAttachmentCount++; } } } struct ChunkedFile { IoBuffer Source; ChunkedInfoWithSource Chunked; tsl::robin_map ChunkLoookup; }; std::vector ChunkedFiles; auto ChunkFile = [AttachmentTempPath](const IoHash& RawHash, IoBuffer& RawData, const IoBufferFileReference& FileRef, JobContext*) -> ChunkedFile { ChunkedFile Chunked; Stopwatch Timer; uint64_t Offset = FileRef.FileChunkOffset; uint64_t Size = FileRef.FileChunkSize; BasicFile SourceFile; SourceFile.Attach(FileRef.FileHandle); auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); }); Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams); Chunked.Source = RawData; ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}", RawHash, NiceBytes(Chunked.Chunked.Info.RawSize), Chunked.Chunked.Info.ChunkHashes.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); return Chunked; }; RwLock ResolveLock; std::unordered_set ChunkedHashes; std::unordered_set LargeChunkHashes; std::unordered_map ChunkedUploadAttachments; std::unordered_map LooseUploadAttachments; std::unordered_set MissingHashes; ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); Latch ResolveAttachmentsLatch(1); for (auto& It : UploadAttachments) { if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } ResolveAttachmentsLatch.AddCount(1); WorkerPool.ScheduleWork([&ChunkStore, UploadAttachment = &It.second, RawHash = It.first, &ResolveAttachmentsLatch, &ResolveLock, &ChunkedHashes, &LargeChunkHashes, &ChunkedUploadAttachments, &LooseUploadAttachments, &MissingHashes, &OnLargeAttachment, &AttachmentTempPath, &ChunkFile, &ChunkedFiles, MaxChunkEmbedSize, ChunkFileSizeLimit, &RemoteResult, OptionalContext]() { auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); try { if (IsCancelled(OptionalContext)) { return; } if (!UploadAttachment->RawPath.empty()) { const std::filesystem::path& FilePath = UploadAttachment->RawPath; IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); if (RawData) { if (RawData.GetSize() > ChunkFileSizeLimit) { IoBufferFileReference FileRef; (void)RawData.GetFileReference(FileRef); ChunkedFile Chunked = ChunkFile(RawHash, RawData, FileRef, OptionalContext); ResolveLock.WithExclusiveLock( [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) { ChunkedHashes.insert(ChunkHash); } ChunkedFiles.emplace_back(std::move(Chunked)); }); } else if (RawData.GetSize() > (MaxChunkEmbedSize * 2)) { // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't // it will be a loose attachment instead of going into a block OnLargeAttachment(RawHash, [RawData = std::move(RawData), AttachmentTempPath](const IoHash& RawHash) { size_t RawSize = RawData.GetSize(); CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), OodleCompressor::Mermaid, OodleCompressionLevel::VeryFast); std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), NiceBytes(TempAttachmentBuffer.GetSize())); return TempAttachmentBuffer; }); ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); } else { size_t RawSize = RawData.GetSize(); CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData), OodleCompressor::Mermaid, OodleCompressionLevel::VeryFast); std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), NiceBytes(TempAttachmentBuffer.GetSize())); if (Compressed.GetCompressedSize() > MaxChunkEmbedSize) { OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; }); ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); } else { UploadAttachment->Size = Compressed.GetCompressedSize(); ResolveLock.WithExclusiveLock( [RawHash, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { LooseUploadAttachments.insert_or_assign(RawHash, std::move(Data)); }); } } } else { ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); } } else { IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); if (Data) { auto GetForChunking = [](size_t ChunkFileSizeLimit, const IoBuffer& Data, IoBufferFileReference& OutFileRef) -> bool { if (Data.IsWholeFile()) { IoHash VerifyRawHash; uint64_t VerifyRawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); if (Compressed) { if (VerifyRawSize > ChunkFileSizeLimit) { OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; uint64_t BlockSize; if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { if (CompressionLevel == OodleCompressionLevel::None) { CompositeBuffer Decompressed = Compressed.DecompressToComposite(); if (Decompressed) { std::span Segments = Decompressed.GetSegments(); if (Segments.size() == 1) { IoBuffer DecompressedData = Segments[0].AsIoBuffer(); if (DecompressedData.GetFileReference(OutFileRef)) { return true; } } } } } } } } return false; }; IoBufferFileReference FileRef; if (GetForChunking(ChunkFileSizeLimit, Data, FileRef)) { ChunkedFile Chunked = ChunkFile(RawHash, Data, FileRef, OptionalContext); ResolveLock.WithExclusiveLock( [RawHash, &ChunkedFiles, &ChunkedUploadAttachments, &ChunkedHashes, &Chunked]() { ChunkedUploadAttachments.insert_or_assign(RawHash, ChunkedFiles.size()); ChunkedHashes.reserve(ChunkedHashes.size() + Chunked.Chunked.Info.ChunkHashes.size()); for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) { ChunkedHashes.insert(ChunkHash); } ChunkedFiles.emplace_back(std::move(Chunked)); }); } else if (Data.GetSize() > MaxChunkEmbedSize) { OnLargeAttachment(RawHash, [&ChunkStore](const IoHash& RawHash) { return ChunkStore.FindChunkByCid(RawHash); }); ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); } else { UploadAttachment->Size = Data.GetSize(); } } else { ResolveLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); } } } catch (const std::exception& Ex) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::NotFound), fmt::format("Failed to resolve attachment {}", RawHash), Ex.what()); } }); } ResolveAttachmentsLatch.CountDown(); while (!ResolveAttachmentsLatch.Wait(1000)) { ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); while (!ResolveAttachmentsLatch.Wait(1000)) { Remaining = ResolveAttachmentsLatch.Remaining(); ReportProgress(OptionalContext, fmt::format("Aborting, {} attachments remaining...", Remaining), UploadAttachments.size(), Remaining); } ReportProgress(OptionalContext, fmt::format("Resolving attachments, {} remaining...", 0), UploadAttachments.size(), 0); return {}; } ReportProgress(OptionalContext, fmt::format("Resolving attachments, {} remaining...", Remaining), UploadAttachments.size(), Remaining); } if (UploadAttachments.size() > 0) { ReportProgress(OptionalContext, fmt::format("Resolving attachments, {} remaining...", 0), UploadAttachments.size(), 0); } if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } for (const IoHash& AttachmentHash : MissingHashes) { auto It = UploadAttachments.find(AttachmentHash); ZEN_ASSERT(It != UploadAttachments.end()); std::optional Op = Oplog.GetOpByKey(It->second.Key); ZEN_ASSERT(Op.has_value()); if (IgnoreMissingAttachments) { ReportMessage(OptionalContext, fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key)); } else { ExtendableStringBuilder<1024> Sb; Sb.Append("Failed to find attachment '"); Sb.Append(AttachmentHash.ToHexString()); Sb.Append("' for op: \n"); Op.value().ToJson(Sb); RemoteResult.SetError(gsl::narrow(HttpResponseCode::NotFound), Sb.ToString(), {}); return {}; } UploadAttachments.erase(AttachmentHash); } for (const auto& It : ChunkedUploadAttachments) { UploadAttachments.erase(It.first); } for (const auto& It : LargeChunkHashes) { UploadAttachments.erase(It); } std::vector ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { const Block& KnownBlock = KnownBlocks[KnownBlockIndex]; for (const IoHash& KnownHash : KnownBlock.ChunksInBlock) { if (ChunkedHashes.erase(KnownHash) == 1) { ReusedAttachmentCount++; } } } ReusedBlockIndexes.insert(ReusedBlockIndexes.end(), ReusedBlockFromChunking.begin(), ReusedBlockFromChunking.end()); std::sort(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end()); auto UniqueKnownBlocksEnd = std::unique(ReusedBlockIndexes.begin(), ReusedBlockIndexes.end()); size_t ReuseBlockCount = std::distance(ReusedBlockIndexes.begin(), UniqueKnownBlocksEnd); if (ReuseBlockCount > 0) { Blocks.reserve(ReuseBlockCount); for (auto It = ReusedBlockIndexes.begin(); It != UniqueKnownBlocksEnd; It++) { Blocks.push_back(KnownBlocks[*It]); } ReportMessage(OptionalContext, fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount)); } std::vector> SortedUploadAttachments; SortedUploadAttachments.reserve(UploadAttachments.size()); for (const auto& It : UploadAttachments) { SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key)); } if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount)); // Sort attachments so we get predictable blocks for the same oplog upload std::sort(SortedUploadAttachments.begin(), SortedUploadAttachments.end(), [](const std::pair& Lhs, const std::pair& Rhs) { if (Lhs.second == Rhs.second) { // Same key, sort by raw hash return Lhs.first < Rhs.first; } // Sort by key return Lhs.second < Rhs.second; }); std::vector ChunkedFilesOrder; ChunkedFilesOrder.reserve(ChunkedFiles.size()); for (size_t Index = 0; Index < ChunkedFiles.size(); Index++) { ChunkedFilesOrder.push_back(Index); } std::sort(ChunkedFilesOrder.begin(), ChunkedFilesOrder.end(), [&ChunkedFiles](size_t Lhs, size_t Rhs) { return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash; }); // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded // ChunkedHashes contains all chunked up chunks to be composed into blocks if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } ReportMessage(OptionalContext, fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments", SortedUploadAttachments.size(), ChunkedHashes.size(), TotalOpCount)); if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } // SortedUploadAttachments now contains all whole chunks with size to be composed into blocks and uploaded // ChunkedHashes contains all chunked up chunks to be composed into blocks size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedHashes.size(); size_t ChunksAssembled = 0; ReportMessage(OptionalContext, fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount)); Latch BlockCreateLatch(1); size_t GeneratedBlockCount = 0; size_t BlockSize = 0; std::vector> ChunksInBlock; Oid LastOpKey = Oid::Zero; uint32_t ComposedBlocks = 0; uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs(); try { uint64_t FetchAttachmentsStartMS = Timer.GetElapsedTimeMs(); std::unordered_set BlockAttachmentHashes; auto NewBlock = [&]() { size_t BlockIndex = AddBlock(BlocksLock, Blocks); size_t ChunkCount = ChunksInBlock.size(); if (BuildBlocks) { CreateBlock(WorkerPool, BlockCreateLatch, std::move(ChunksInBlock), BlocksLock, Blocks, BlockIndex, AsyncOnBlock, RemoteResult); ComposedBlocks++; } else { ZEN_INFO("Bulk group {} attachments", BlockAttachmentHashes.size()); OnBlockChunks(std::move(ChunksInBlock)); } { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope _(BlocksLock); Blocks[BlockIndex].ChunksInBlock.insert(Blocks[BlockIndex].ChunksInBlock.end(), BlockAttachmentHashes.begin(), BlockAttachmentHashes.end()); } uint64_t NowMS = Timer.GetElapsedTimeMs(); ZEN_INFO("Assembled block {} with {} chunks in {} ({})", BlockIndex, ChunkCount, NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS), NiceBytes(BlockSize)); FetchAttachmentsStartMS = NowMS; BlockAttachmentHashes.clear(); ChunksInBlock.clear(); BlockSize = 0; GeneratedBlockCount++; }; for (auto HashIt = SortedUploadAttachments.begin(); HashIt != SortedUploadAttachments.end(); HashIt++) { if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); break; } if (ChunksAssembled % 1000 == 0) { ReportProgress( OptionalContext, fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), ChunkAssembleCount, ChunkAssembleCount - ChunksAssembled); } const IoHash& RawHash(HashIt->first); const Oid CurrentOpKey = HashIt->second; const IoHash& AttachmentHash(HashIt->first); auto InfoIt = UploadAttachments.find(RawHash); ZEN_ASSERT(InfoIt != UploadAttachments.end()); uint64_t PayloadSize = InfoIt->second.Size; if (BlockAttachmentHashes.insert(AttachmentHash).second) { if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end()) { ChunksInBlock.emplace_back(std::make_pair(RawHash, [IoBuffer = SharedBuffer(It->second)](const IoHash&) { return CompositeBuffer(IoBuffer); })); LooseUploadAttachments.erase(It); } else { ChunksInBlock.emplace_back(std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) { return CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash))); })); } BlockSize += PayloadSize; if (BlockSize >= MaxBlockSize && (CurrentOpKey != LastOpKey)) { NewBlock(); } LastOpKey = CurrentOpKey; ChunksAssembled++; } } if (!RemoteResult.IsError()) { // Keep the chunked files as separate blocks to make the blocks generated // more consistent if (BlockSize > 0) { NewBlock(); } for (size_t ChunkedFileIndex : ChunkedFilesOrder) { const ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked; size_t ChunkCount = Chunked.Info.ChunkHashes.size(); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++) { if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); break; } if (ChunksAssembled % 1000 == 0) { ReportProgress(OptionalContext, fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), ChunkAssembleCount, ChunkAssembleCount - ChunksAssembled); } const IoHash& ChunkHash = ChunkedFile.Chunked.Info.ChunkHashes[ChunkIndex]; if (auto FindIt = ChunkedHashes.find(ChunkHash); FindIt != ChunkedHashes.end()) { if (BlockAttachmentHashes.insert(ChunkHash).second) { const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex]; ChunksInBlock.emplace_back(std::make_pair( ChunkHash, [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](const IoHash&) { return CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), OodleCompressor::Mermaid, OodleCompressionLevel::None) .GetCompressed(); })); BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size; if (BuildBlocks) { if (BlockSize >= MaxBlockSize) { NewBlock(); } } ChunksAssembled++; } ChunkedHashes.erase(FindIt); } } } } if (BlockSize > 0 && !RemoteResult.IsError()) { if (!IsCancelled(OptionalContext)) { NewBlock(); } } if (ChunkAssembleCount > 0) { ReportProgress( OptionalContext, fmt::format("Assembling blocks: {} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), ChunkAssembleCount, 0); } ReportMessage(OptionalContext, fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}", ChunkAssembleCount, TotalOpCount, GeneratedBlockCount, NiceTimeSpanMs(static_cast(Timer.GetElapsedTimeMs())))); if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { ptrdiff_t Remaining = BlockCreateLatch.Remaining(); ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining); } if (GeneratedBlockCount > 0) { ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", 0), GeneratedBlockCount, 0); } return {}; } } catch (const std::exception& Ex) { BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { } RemoteResult.SetError(gsl::narrow(HttpResponseCode::InternalServerError), "Block creation failed", Ex.what()); throw; } BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) { ptrdiff_t Remaining = BlockCreateLatch.Remaining(); if (IsCancelled(OptionalContext)) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); while (!BlockCreateLatch.Wait(1000)) { Remaining = BlockCreateLatch.Remaining(); ReportProgress(OptionalContext, fmt::format("Aborting, {} blocks remaining...", Remaining), GeneratedBlockCount, Remaining); } ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0); return {}; } ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", Remaining), GeneratedBlockCount, Remaining); } if (GeneratedBlockCount > 0) { uint64_t NowMS = Timer.GetElapsedTimeMs(); ReportProgress(OptionalContext, fmt::format("Creating blocks, {} remaining...", 0), GeneratedBlockCount, 0); ReportMessage(OptionalContext, fmt::format("Created {} blocks in {}", GeneratedBlockCount, NiceTimeSpanMs(NowMS - CreateBlocksStartMS))); } if (!RemoteResult.IsError()) { CbObjectWriter OplogContinerWriter; RwLock::SharedLockScope _(BlocksLock); OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); OplogContinerWriter.BeginArray("blocks"sv); { for (const Block& B : Blocks) { ZEN_ASSERT(!B.ChunksInBlock.empty()); if (BuildBlocks) { ZEN_ASSERT(B.BlockHash != IoHash::Zero); OplogContinerWriter.BeginObject(); { OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); OplogContinerWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : B.ChunksInBlock) { OplogContinerWriter.AddHash(RawHash); } } OplogContinerWriter.EndArray(); // "chunks" } OplogContinerWriter.EndObject(); continue; } ZEN_ASSERT(B.BlockHash == IoHash::Zero); OplogContinerWriter.BeginObject(); { OplogContinerWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : B.ChunksInBlock) { OplogContinerWriter.AddBinaryAttachment(RawHash); } } OplogContinerWriter.EndArray(); } OplogContinerWriter.EndObject(); } } OplogContinerWriter.EndArray(); // "blocks"sv OplogContinerWriter.BeginArray("chunkedfiles"sv); { for (const ChunkedFile& F : ChunkedFiles) { OplogContinerWriter.BeginObject(); { OplogContinerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash); OplogContinerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize); OplogContinerWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes) { OplogContinerWriter.AddHash(RawHash); } } OplogContinerWriter.EndArray(); // "chunks" OplogContinerWriter.BeginArray("sequence"sv); { for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence) { OplogContinerWriter.AddInteger(ChunkIndex); } } OplogContinerWriter.EndArray(); // "sequence" } OplogContinerWriter.EndObject(); } } OplogContinerWriter.EndArray(); // "chunkedfiles"sv OplogContinerWriter.BeginArray("chunks"sv); { for (const IoHash& AttachmentHash : LargeChunkHashes) { OplogContinerWriter.AddBinaryAttachment(AttachmentHash); } } OplogContinerWriter.EndArray(); // "chunks" OplogContainerObject = OplogContinerWriter.Save(); } } return OplogContainerObject; } RemoteProjectStore::LoadContainerResult BuildContainer(CidStore& ChunkStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, size_t ChunkFileSizeLimit, bool BuildBlocks, bool IgnoreMissingAttachments, const std::function& AsyncOnBlock, const std::function& OnLargeAttachment, const std::function>&&)>& OnBlockChunks, bool EmbedLooseFiles) { WorkerThreadPool& WorkerPool = GetLargeWorkerPool(); AsyncRemoteResult RemoteResult; CbObject ContainerObject = BuildContainer(ChunkStore, Project, Oplog, MaxBlockSize, MaxChunkEmbedSize, ChunkFileSizeLimit, BuildBlocks, IgnoreMissingAttachments, {}, WorkerPool, AsyncOnBlock, OnLargeAttachment, OnBlockChunks, EmbedLooseFiles, nullptr, RemoteResult); return RemoteProjectStore::LoadContainerResult{RemoteResult.ConvertResult(), ContainerObject}; } struct UploadInfo { uint64_t OplogSizeBytes = 0; std::atomic AttachmentsUploaded = 0; std::atomic AttachmentBlocksUploaded = 0; std::atomic AttachmentBytesUploaded = 0; std::atomic AttachmentBlockBytesUploaded = 0; }; void UploadAttachments(WorkerThreadPool& WorkerPool, CidStore& ChunkStore, RemoteProjectStore& RemoteStore, const std::unordered_set& LargeAttachments, const std::vector>>& BlockChunks, const std::unordered_map& CreatedBlocks, const tsl::robin_map& LooseFileAttachments, const std::unordered_set& Needs, bool ForceAll, UploadInfo& Info, AsyncRemoteResult& RemoteResult, JobContext* OptionalContext) { using namespace std::literals; if (Needs.empty() && !ForceAll) { return; } ReportMessage(OptionalContext, "Filtering needed attachments for upload..."); std::unordered_set AttachmentsToUpload; std::unordered_map BulkBlockAttachmentsToUpload; size_t BlockAttachmentCountToUpload = 0; size_t LargeAttachmentCountToUpload = 0; size_t BulkAttachmentCountToUpload = 0; AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size()); std::unordered_set UnknownAttachments(Needs); for (const auto& CreatedBlock : CreatedBlocks) { if (ForceAll || Needs.contains(CreatedBlock.first)) { AttachmentsToUpload.insert(CreatedBlock.first); BlockAttachmentCountToUpload++; UnknownAttachments.erase(CreatedBlock.first); } } for (const IoHash& LargeAttachment : LargeAttachments) { if (ForceAll || Needs.contains(LargeAttachment)) { AttachmentsToUpload.insert(LargeAttachment); LargeAttachmentCountToUpload++; UnknownAttachments.erase(LargeAttachment); } } for (const std::vector>& BlockHashes : BlockChunks) { for (const std::pair& Chunk : BlockHashes) { if (ForceAll || Needs.contains(Chunk.first)) { BulkBlockAttachmentsToUpload.insert(std::make_pair(Chunk.first, Chunk.second)); BulkAttachmentCountToUpload++; UnknownAttachments.erase(Chunk.first); } } } if (AttachmentsToUpload.empty() && BulkBlockAttachmentsToUpload.empty()) { ReportMessage(OptionalContext, "No attachments needed"); return; } if (!UnknownAttachments.empty()) { RemoteResult.SetError( gsl::narrow(HttpResponseCode::NotFound), fmt::format("Upload requested of {} missing attachments, the base container referenced blocks that are no longer available", UnknownAttachments.size()), ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } return; } ReportMessage(OptionalContext, fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)", AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), BlockAttachmentCountToUpload, LargeAttachmentCountToUpload, BulkAttachmentCountToUpload)); Stopwatch Timer; ptrdiff_t AttachmentsToSave(0); Latch SaveAttachmentsLatch(1); for (const IoHash& RawHash : AttachmentsToUpload) { if (RemoteResult.IsError()) { break; } SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; WorkerPool.ScheduleWork([&ChunkStore, &RemoteStore, &SaveAttachmentsLatch, &RemoteResult, RawHash, &CreatedBlocks, &LooseFileAttachments, &Info, OptionalContext]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } bool IsBlock = false; IoBuffer Payload; if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) { Payload = BlockIt->second; IsBlock = true; } else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) { Payload = LooseTmpFileIt->second(RawHash); } else { Payload = ChunkStore.FindChunkByCid(RawHash); } if (!Payload) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::NotFound), fmt::format("Failed to find attachment {}", RawHash), {}); ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); return; } size_t PayloadSize = Payload.GetSize(); RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompositeBuffer(SharedBuffer(std::move(Payload))), RawHash); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, fmt::format("Failed to save attachment '{}', {} ({}): {}", RawHash, NiceBytes(PayloadSize), RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } if (IsBlock) { Info.AttachmentBlocksUploaded.fetch_add(1); Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); ZEN_INFO("Saved block attachment '{}' in {} ({})", RawHash, NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000)), NiceBytes(PayloadSize)); } else { Info.AttachmentsUploaded.fetch_add(1); Info.AttachmentBytesUploaded.fetch_add(PayloadSize); ZEN_INFO("Saved large attachment '{}' in {} ({})", RawHash, NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000)), NiceBytes(PayloadSize)); } return; }); } if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } return; } if (!BulkBlockAttachmentsToUpload.empty()) { for (const std::vector>& Chunks : BlockChunks) { if (RemoteResult.IsError()) { break; } std::vector NeededChunks; NeededChunks.reserve(Chunks.size()); for (const std::pair& Chunk : Chunks) { const IoHash& ChunkHash = Chunk.first; if (BulkBlockAttachmentsToUpload.contains(ChunkHash) && !AttachmentsToUpload.contains(ChunkHash)) { NeededChunks.push_back(Chunk.first); } } if (NeededChunks.empty()) { continue; } SaveAttachmentsLatch.AddCount(1); AttachmentsToSave++; WorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &SaveAttachmentsLatch, &RemoteResult, NeededChunks = std::move(NeededChunks), &BulkBlockAttachmentsToUpload, &Info, OptionalContext]() { auto _ = MakeGuard([&SaveAttachmentsLatch] { SaveAttachmentsLatch.CountDown(); }); size_t ChunksSize = 0; std::vector ChunkBuffers; ChunkBuffers.reserve(NeededChunks.size()); for (const IoHash& Chunk : NeededChunks) { auto It = BulkBlockAttachmentsToUpload.find(Chunk); ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); CompositeBuffer ChunkPayload = It->second(It->first); if (!ChunkPayload) { RemoteResult.SetError(static_cast(HttpResponseCode::NotFound), fmt::format("Missing chunk {}"sv, Chunk), fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); ChunkBuffers.clear(); break; } ChunksSize += ChunkPayload.GetSize(); ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer())); } RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, fmt::format("Failed to save attachments with {} chunks ({}): {}", NeededChunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); Info.AttachmentBytesUploaded.fetch_add(ChunksSize); ZEN_INFO("Saved {} bulk attachments in {} ({})", NeededChunks.size(), NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000)), NiceBytes(ChunksSize)); }); } } SaveAttachmentsLatch.CountDown(); while (!SaveAttachmentsLatch.Wait(1000)) { ptrdiff_t Remaining = SaveAttachmentsLatch.Remaining(); if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } } ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", Remaining), AttachmentsToSave, Remaining); } if (AttachmentsToSave > 0) { ReportProgress(OptionalContext, fmt::format("Saving attachments, {} remaining...", 0), AttachmentsToSave, 0); } ReportMessage(OptionalContext, fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {}", AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), BlockAttachmentCountToUpload, LargeAttachmentCountToUpload, BulkAttachmentCountToUpload, NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); } RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunkEmbedSize, size_t ChunkFileSizeLimit, bool EmbedLooseFiles, bool ForceUpload, bool IgnoreMissingAttachments, JobContext* OptionalContext) { using namespace std::literals; Stopwatch Timer; UploadInfo Info; WorkerThreadPool& WorkerPool = GetLargeWorkerPool(); WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(); const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); std::filesystem::path AttachmentTempPath; if (RemoteStoreInfo.UseTempBlockFiles) { AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); CreateDirectories(AttachmentTempPath); } AsyncRemoteResult RemoteResult; RwLock AttachmentsLock; std::unordered_set LargeAttachments; std::unordered_map CreatedBlocks; tsl::robin_map LooseLargeFiles; auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { std::filesystem::path BlockPath = AttachmentTempPath; BlockPath.append(BlockHash.ToHexString()); try { IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock), BlockPath); RwLock::ExclusiveLockScope __(AttachmentsLock); CreatedBlocks.insert({BlockHash, std::move(BlockBuffer)}); ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize())); } catch (const std::exception& Ex) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::InternalServerError), Ex.what(), "Unable to create temp block file"); return; } }; auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, const IoHash& BlockHash) { RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash); if (Result.ErrorCode) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } Info.AttachmentBlocksUploaded.fetch_add(1); Info.AttachmentBlockBytesUploaded.fetch_add(CompressedBlock.GetCompressedSize()); ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); }; std::vector>> BlockChunks; auto OnBlockChunks = [&BlockChunks](std::vector>&& Chunks) { BlockChunks.push_back({Chunks.begin(), Chunks.end()}); ZEN_DEBUG("Found {} block chunks", Chunks.size()); }; auto OnLargeAttachment = [&AttachmentsLock, &LargeAttachments, &LooseLargeFiles](const IoHash& AttachmentHash, TGetAttachmentBufferFunc&& GetBufferFunc) { { RwLock::ExclusiveLockScope _(AttachmentsLock); LargeAttachments.insert(AttachmentHash); LooseLargeFiles.insert_or_assign(AttachmentHash, std::move(GetBufferFunc)); } ZEN_DEBUG("Found attachment {}", AttachmentHash); }; std::function OnBlock; if (RemoteStoreInfo.UseTempBlockFiles) { OnBlock = MakeTempBlock; } else { OnBlock = UploadBlock; } std::vector KnownBlocks; if (RemoteStoreInfo.CreateBlocks && !RemoteStoreInfo.BaseContainerName.empty()) { ReportMessage(OptionalContext, fmt::format("Loading oplog base container '{}'", RemoteStoreInfo.BaseContainerName)); RemoteProjectStore::LoadContainerResult BaseContainerResult = RemoteStore.LoadBaseContainer(); if (BaseContainerResult.ErrorCode != static_cast(HttpResponseCode::NoContent)) { if (BaseContainerResult.ErrorCode) { ReportMessage(OptionalContext, fmt::format("Failed to load oplog base container '{}' ({}): {}, uploading all attachments", RemoteStoreInfo.BaseContainerName, BaseContainerResult.ErrorCode, BaseContainerResult.Reason)); } else { ReportMessage(OptionalContext, fmt::format("Loaded oplog base container in {}", NiceTimeSpanMs(static_cast(BaseContainerResult.ElapsedSeconds * 1000.0)))); CbArrayView BlocksArray = BaseContainerResult.ContainerObject["blocks"sv].AsArrayView(); std::vector BlockHashes; BlockHashes.reserve(BlocksArray.Num()); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); BlockHashes.push_back(BlockHash); } RemoteProjectStore::HasAttachmentsResult HasResult = RemoteStore.HasAttachments(BlockHashes); if (HasResult.ErrorCode == 0) { ReportMessage(OptionalContext, fmt::format("Checked the existance of {} block{} in remote store, found {} existing blocks in {}", BlockHashes.size(), BlockHashes.size() > 1 ? "s"sv : ""sv, BlockHashes.size() - HasResult.Needs.size(), NiceTimeSpanMs(static_cast(HasResult.ElapsedSeconds * 1000.0)))); if (HasResult.Needs.size() < BlocksArray.Num()) { KnownBlocks.reserve(BlocksArray.Num() - HasResult.Needs.size()); const std::unordered_set MissingBlocks(HasResult.Needs); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); if (!MissingBlocks.contains(BlockHash)) { std::vector ChunksInBlock; CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); if (BlockHash == IoHash::Zero) { continue; } ChunksInBlock.reserve(ChunksArray.Num()); for (CbFieldView ChunkField : ChunksArray) { ChunksInBlock.push_back(ChunkField.AsHash()); } KnownBlocks.push_back({.BlockHash = BlockHash, .ChunksInBlock = std::move(ChunksInBlock)}); } } } } else { ReportMessage(OptionalContext, fmt::format("Unable to determine which blocks in base container exist in remote store, assuming none " "does: '{}', error code : {}", HasResult.Reason, HasResult.ErrorCode)); } } } } CbObject OplogContainerObject = BuildContainer(ChunkStore, Project, Oplog, MaxBlockSize, MaxChunkEmbedSize, ChunkFileSizeLimit, RemoteStoreInfo.CreateBlocks, IgnoreMissingAttachments, KnownBlocks, WorkerPool, OnBlock, OnLargeAttachment, OnBlockChunks, EmbedLooseFiles, OptionalContext, /* out */ RemoteResult); if (!RemoteResult.IsError()) { Info.OplogSizeBytes = OplogContainerObject.GetSize(); if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Text = "Operation cancelled"}; ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return Result; } uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); ReportMessage(OptionalContext, fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...", RemoteStoreInfo.ContainerName, ChunkCount, BlockCount)); RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(OplogContainerObject.GetBuffer().AsIoBuffer()); if (ContainerSaveResult.ErrorCode) { RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); ReportMessage(OptionalContext, fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } else { ReportMessage(OptionalContext, fmt::format("Saved container '{}' in {}", RemoteStoreInfo.ContainerName, NiceTimeSpanMs(static_cast(ContainerSaveResult.ElapsedSeconds * 1000.0)))); } UploadAttachments(NetworkWorkerPool, ChunkStore, RemoteStore, LargeAttachments, BlockChunks, CreatedBlocks, LooseLargeFiles, ContainerSaveResult.Needs, ForceUpload, Info, RemoteResult, OptionalContext); uint32_t Try = 0; while (!RemoteResult.IsError()) { if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Text = "Operation cancelled"}; ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Text)); return Result; } ReportMessage(OptionalContext, "Finalizing oplog container..."); RemoteProjectStore::FinalizeResult ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); if (ContainerFinalizeResult.ErrorCode) { RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); ReportMessage(OptionalContext, fmt::format("Failed to finalize oplog container {} ({}): {}", ContainerSaveResult.RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason())); RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); return Result; } ReportMessage(OptionalContext, fmt::format("Finalized container '{}' in {}", RemoteStoreInfo.ContainerName, NiceTimeSpanMs(static_cast(ContainerFinalizeResult.ElapsedSeconds * 1000.0)))); if (ContainerFinalizeResult.Needs.empty()) { break; } if (IsCancelled(OptionalContext)) { RemoteProjectStore::Result Result = {.ErrorCode = 0, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Text = "Operation cancelled"}; return Result; } const uint32_t MaxTries = 8; if (Try < MaxTries) { Try++; ReportMessage( OptionalContext, fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachements. Try {}", RemoteStoreInfo.ContainerName, ContainerFinalizeResult.Needs.size(), Try)); UploadAttachments(NetworkWorkerPool, ChunkStore, RemoteStore, LargeAttachments, BlockChunks, CreatedBlocks, LooseLargeFiles, ContainerFinalizeResult.Needs, false, Info, RemoteResult, OptionalContext); } else { RemoteResult.SetError( gsl::narrow(HttpResponseCode::InternalServerError), "Failed to save oplog container", fmt::format("Giving up finalize oplog container {} after {} retries, still getting reports of missing attachments", ContainerSaveResult.RawHash, ContainerFinalizeResult.Needs.size())); ReportMessage(OptionalContext, fmt::format("Failed to finalize oplog container container {} ({}): {}", ContainerSaveResult.RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason())); break; } } LooseLargeFiles.clear(); CreatedBlocks.clear(); } RemoteProjectStore::Result Result = RemoteResult.ConvertResult(); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; ReportMessage(OptionalContext, fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({})", RemoteStoreInfo.ContainerName, RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000.0)), NiceBytes(Info.OplogSizeBytes), Info.AttachmentBlocksUploaded.load(), NiceBytes(Info.AttachmentBlockBytesUploaded.load()), Info.AttachmentsUploaded.load(), NiceBytes(Info.AttachmentBytesUploaded.load()))); return Result; }; RemoteProjectStore::Result SaveOplogContainer(ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, const std::function& HasAttachment, const std::function&& Chunks)>& OnNeedBlock, const std::function& OnNeedAttachment, const std::function& OnChunkedAttachment, JobContext* OptionalContext) { using namespace std::literals; Stopwatch Timer; size_t NeedAttachmentCount = 0; CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); for (CbFieldView LargeChunksField : LargeChunksArray) { IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); if (HasAttachment(AttachmentHash)) { continue; } OnNeedAttachment(AttachmentHash); NeedAttachmentCount++; }; ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); std::vector NeededChunks; NeededChunks.reserve(ChunksArray.Num()); if (BlockHash == IoHash::Zero) { for (CbFieldView ChunkField : ChunksArray) { IoHash ChunkHash = ChunkField.AsBinaryAttachment(); if (HasAttachment(ChunkHash)) { continue; } NeededChunks.emplace_back(ChunkHash); } } else { for (CbFieldView ChunkField : ChunksArray) { const IoHash ChunkHash = ChunkField.AsHash(); if (HasAttachment(ChunkHash)) { continue; } NeededChunks.emplace_back(ChunkHash); } } if (!NeededChunks.empty()) { OnNeedBlock(BlockHash, std::move(NeededChunks)); if (BlockHash != IoHash::Zero) { NeedBlockCount++; } } } CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); for (CbFieldView ChunkedFileField : ChunkedFilesArray) { CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView(); ChunkedInfo Chunked; Chunked.RawHash = ChunkedFileView["rawhash"sv].AsHash(); Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64(); CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView(); Chunked.ChunkHashes.reserve(ChunksArray.Num()); for (CbFieldView ChunkField : ChunksArray) { const IoHash ChunkHash = ChunkField.AsHash(); Chunked.ChunkHashes.emplace_back(ChunkHash); } CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView(); Chunked.ChunkSequence.reserve(SequenceArray.Num()); for (CbFieldView SequenceField : SequenceArray) { uint32_t SequenceIndex = SequenceField.AsUInt32(); ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size()); Chunked.ChunkSequence.push_back(SequenceIndex); } OnChunkedAttachment(Chunked); ZEN_INFO("Found chunked attachment '{}' ({}) built from {} chunks", Chunked.RawHash, NiceBytes(Chunked.RawSize), Chunked.ChunkHashes.size()); } ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); { CbObject SectionObject = LoadCompactBinaryObject(SectionPayload); if (!SectionObject) { ReportMessage(OptionalContext, fmt::format("Failed to save oplog container: '{}'", "Section has unexpected data type")); return RemoteProjectStore::Result{gsl::narrow(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.0, "Section has unexpected data type", "Failed to save oplog container"}; } CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); const uint64_t OpCount = OpsArray.Num(); ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount)); const size_t OpsBatchSize = 8192; std::vector OpsData; std::vector OpDataOffsets; size_t OpsCompleteCount = 0; OpsData.reserve(OpsBatchSize); auto AppendBatch = [&]() { std::vector Ops; Ops.reserve(OpDataOffsets.size()); for (size_t OpDataOffset : OpDataOffsets) { Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset])); } std::vector OpLsns = Oplog.AppendNewOplogEntries(Ops); OpsCompleteCount += OpLsns.size(); OpsData.clear(); OpDataOffsets.clear(); ReportProgress(OptionalContext, fmt::format("Writing oplog, {} remaining...", OpCount - OpsCompleteCount), OpCount, OpCount - OpsCompleteCount); }; BinaryWriter Writer; for (CbFieldView OpEntry : OpsArray) { CbObjectView Op = OpEntry.AsObjectView(); Op.CopyTo(Writer); OpDataOffsets.push_back(OpsData.size()); OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize()); Writer.Reset(); if (OpDataOffsets.size() == OpsBatchSize) { AppendBatch(); } } if (!OpDataOffsets.empty()) { AppendBatch(); } } return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; } RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Oplog& Oplog, bool ForceDownload, bool IgnoreMissingAttachments, JobContext* OptionalContext) { using namespace std::literals; struct DownloadInfo { uint64_t OplogSizeBytes = 0; std::atomic AttachmentsDownloaded = 0; std::atomic AttachmentBlocksDownloaded = 0; std::atomic AttachmentBytesDownloaded = 0; std::atomic AttachmentBlockBytesDownloaded = 0; std::atomic AttachmentsStored = 0; std::atomic AttachmentBytesStored = 0; std::atomic_size_t MissingAttachmentCount = 0; }; DownloadInfo Info; Stopwatch Timer; WorkerThreadPool& WorkerPool = GetLargeWorkerPool(); WorkerThreadPool& NetworkWorkerPool = GetMediumWorkerPool(); std::unordered_set Attachments; RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName)); RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); if (LoadContainerResult.ErrorCode) { ReportMessage( OptionalContext, fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode)); return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Reason = LoadContainerResult.Reason, .Text = LoadContainerResult.Text}; } ReportMessage(OptionalContext, fmt::format("Loaded container in {} ({})", NiceTimeSpanMs(static_cast(LoadContainerResult.ElapsedSeconds * 1000)), NiceBytes(LoadContainerResult.ContainerObject.GetSize()))); Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize(); AsyncRemoteResult RemoteResult; Latch AttachmentsWorkLatch(1); std::atomic_size_t AttachmentCount = 0; auto HasAttachment = [&ChunkStore, ForceDownload](const IoHash& RawHash) { if (ForceDownload) { return false; } if (ChunkStore.ContainsChunk(RawHash)) { return true; } return false; }; auto OnNeedBlock = [&RemoteStore, &ChunkStore, &NetworkWorkerPool, &AttachmentsWorkLatch, &AttachmentCount, &RemoteResult, &Info, IgnoreMissingAttachments, OptionalContext](const IoHash& BlockHash, std::vector&& Chunks) { if (RemoteResult.IsError()) { return; } if (BlockHash == IoHash::Zero) { AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); NetworkWorkerPool.ScheduleWork([&RemoteStore, &ChunkStore, &AttachmentsWorkLatch, &RemoteResult, Chunks = std::move(Chunks), &Info, IgnoreMissingAttachments, OptionalContext]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); if (Result.ErrorCode) { ReportMessage(OptionalContext, fmt::format("Failed to load attachments with {} chunks ({}): {}", Chunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason())); Info.MissingAttachmentCount.fetch_add(1); if (IgnoreMissingAttachments) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } return; } Info.AttachmentsDownloaded.fetch_add(Chunks.size()); ZEN_INFO("Loaded {} bulk attachments in {}", Chunks.size(), NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000))); if (RemoteResult.IsError()) { return; } for (const auto& It : Result.Chunks) { uint64_t ChunkSize = It.second.GetCompressedSize(); Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); CidStore::InsertResult InsertResult = ChunkStore.AddChunk(It.second.GetCompressed().Flatten().AsIoBuffer(), It.first, CidStore::InsertMode::kCopyOnly); if (InsertResult.New) { Info.AttachmentBytesStored.fetch_add(ChunkSize); Info.AttachmentsStored.fetch_add(1); } } }); return; } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); NetworkWorkerPool.ScheduleWork([&AttachmentsWorkLatch, &ChunkStore, &RemoteStore, BlockHash, &RemoteResult, Chunks = std::move(Chunks), &Info, IgnoreMissingAttachments, OptionalContext]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); if (BlockResult.ErrorCode) { ReportMessage(OptionalContext, fmt::format("Failed to download block attachment {} ({}): {}", BlockHash, RemoteResult.GetError(), RemoteResult.GetErrorReason())); Info.MissingAttachmentCount.fetch_add(1); if (!IgnoreMissingAttachments) { RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); } return; } if (RemoteResult.IsError()) { return; } uint64_t BlockSize = BlockResult.Bytes.GetSize(); Info.AttachmentBlocksDownloaded.fetch_add(1); ZEN_INFO("Loaded block attachment '{}' in {} ({})", BlockHash, NiceTimeSpanMs(static_cast(BlockResult.ElapsedSeconds * 1000)), NiceBytes(BlockSize)); Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); std::unordered_set WantedChunks; WantedChunks.reserve(Chunks.size()); WantedChunks.insert(Chunks.begin(), Chunks.end()); bool StoreChunksOK = IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &WantedChunks, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { if (WantedChunks.contains(AttachmentRawHash)) { uint64_t ChunkSize = Chunk.GetCompressedSize(); CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); if (InsertResult.New) { Info.AttachmentBytesStored.fetch_add(ChunkSize); Info.AttachmentsStored.fetch_add(1); } WantedChunks.erase(AttachmentRawHash); } }); if (!StoreChunksOK) { ReportMessage(OptionalContext, fmt::format("Block attachment {} has invalid format ({}): {}", BlockHash, RemoteResult.GetError(), RemoteResult.GetErrorReason())); RemoteResult.SetError(gsl::narrow(HttpResponseCode::InternalServerError), fmt::format("Invalid format for block {}", BlockHash), {}); return; } ZEN_ASSERT(WantedChunks.empty()); }); }; auto OnNeedAttachment = [&RemoteStore, &ChunkStore, &NetworkWorkerPool, &AttachmentsWorkLatch, &RemoteResult, &Attachments, &AttachmentCount, &Info, IgnoreMissingAttachments, OptionalContext](const IoHash& RawHash) { if (!Attachments.insert(RawHash).second) { return; } if (RemoteResult.IsError()) { return; } AttachmentsWorkLatch.AddCount(1); AttachmentCount.fetch_add(1); NetworkWorkerPool.ScheduleWork( [&RemoteStore, &ChunkStore, &RemoteResult, &AttachmentsWorkLatch, RawHash, &Info, IgnoreMissingAttachments, OptionalContext]() { auto _ = MakeGuard([&AttachmentsWorkLatch] { AttachmentsWorkLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); if (AttachmentResult.ErrorCode) { ReportMessage(OptionalContext, fmt::format("Failed to download large attachment {}: '{}', error code : {}", RawHash, AttachmentResult.Reason, AttachmentResult.ErrorCode)); Info.MissingAttachmentCount.fetch_add(1); if (!IgnoreMissingAttachments) { RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); } return; } uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); ZEN_INFO("Loaded large attachment '{}' in {} ({})", RawHash, NiceTimeSpanMs(static_cast(AttachmentResult.ElapsedSeconds * 1000)), NiceBytes(AttachmentSize)); Info.AttachmentsDownloaded.fetch_add(1); if (RemoteResult.IsError()) { return; } Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); if (InsertResult.New) { Info.AttachmentBytesStored.fetch_add(AttachmentSize); Info.AttachmentsStored.fetch_add(1); } }); }; std::vector FilesToDechunk; auto OnChunkedAttachment = [&Oplog, &ChunkStore, &FilesToDechunk, ForceDownload](const ChunkedInfo& Chunked) { if (ForceDownload || !ChunkStore.ContainsChunk(Chunked.RawHash)) { Oplog.CaptureAddedAttachments(Chunked.ChunkHashes); FilesToDechunk.push_back(Chunked); } }; RemoteProjectStore::Result Result = SaveOplogContainer(Oplog, LoadContainerResult.ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment, OnChunkedAttachment, OptionalContext); if (Result.ErrorCode != 0) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } ReportMessage(OptionalContext, fmt::format("Wrote oplog in {}, found {} attachments to download", NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000.0)), Attachments.size())); AttachmentsWorkLatch.CountDown(); while (!AttachmentsWorkLatch.Wait(1000)) { ptrdiff_t Remaining = AttachmentsWorkLatch.Remaining(); if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); } } ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", Remaining), AttachmentCount.load(), Remaining); } if (AttachmentCount.load() > 0) { ReportProgress(OptionalContext, fmt::format("Loading attachments, {} remaining...", 0), AttachmentCount.load(), 0); } if (Result.ErrorCode == 0) { if (!FilesToDechunk.empty()) { ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); Latch DechunkLatch(1); std::filesystem::path TempFilePath = Oplog.TempPath(); for (const ChunkedInfo& Chunked : FilesToDechunk) { std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); DechunkLatch.AddCount(1); WorkerPool.ScheduleWork([&ChunkStore, &DechunkLatch, TempFileName, &Chunked, &RemoteResult, IgnoreMissingAttachments, &Info, OptionalContext]() { auto _ = MakeGuard([&DechunkLatch] { DechunkLatch.CountDown(); }); if (RemoteResult.IsError()) { return; } Stopwatch Timer; IoBuffer TmpBuffer; { BasicFile TmpFile; TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate); { BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); uint64_t Offset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); BLAKE3Stream HashingStream; for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) { const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); if (!Chunk) { ReportMessage(OptionalContext, fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); // We only add 1 as the resulting missing count will be 1 for the dechunked file Info.MissingAttachmentCount.fetch_add(1); TmpFile.Close(); std::error_code Ec; std::filesystem::remove(TempFileName, Ec); if (Ec) { ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); } if (!IgnoreMissingAttachments) { RemoteResult.SetError( gsl::narrow(HttpResponseCode::NotFound), "Missing chunk", fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); } return; } CompositeBuffer Decompressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)).DecompressToComposite(); for (const SharedBuffer& Segment : Decompressed.GetSegments()) { MemoryView SegmentData = Segment.GetView(); HashingStream.Append(SegmentData); TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), Offset); Offset += SegmentData.GetSize(); } } BLAKE3 RawHash = HashingStream.GetHash(); ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); } TmpFile.Flush(); uint64_t TmpFileSize = TmpFile.FileSize(); TmpBuffer = IoBuffer(IoBuffer::File, TmpFile.Detach(), 0, TmpFileSize, /*IsWholeFile*/ true); IoHash ValidateRawHash; uint64_t ValidateRawSize = 0; ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(TmpBuffer, ValidateRawHash, ValidateRawSize)); ZEN_ASSERT(ValidateRawHash == Chunked.RawHash); ZEN_ASSERT(ValidateRawSize == Chunked.RawSize); } CidStore::InsertResult InsertResult = ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); if (InsertResult.New) { Info.AttachmentBytesStored.fetch_add(TmpBuffer.GetSize()); Info.AttachmentsStored.fetch_add(1); } ZEN_INFO("Dechunked attachment {} ({}) in {}", Chunked.RawHash, NiceBytes(Chunked.RawSize), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); } DechunkLatch.CountDown(); while (!DechunkLatch.Wait(1000)) { ptrdiff_t Remaining = DechunkLatch.Remaining(); if (IsCancelled(OptionalContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow(HttpResponseCode::OK), "Operation cancelled", ""); ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } } ReportProgress(OptionalContext, fmt::format("Dechunking attachments, {} remaining...", Remaining), FilesToDechunk.size(), Remaining); } ReportProgress(OptionalContext, fmt::format("Dechunking attachments, {} remaining...", 0), FilesToDechunk.size(), 0); } Result = RemoteResult.ConvertResult(); } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; ReportMessage(OptionalContext, fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}), Stored: {} ({}), Missing: {}", RemoteStoreInfo.ContainerName, Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000.0)), NiceBytes(Info.OplogSizeBytes), Info.AttachmentBlocksDownloaded.load(), NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), Info.AttachmentsDownloaded.load(), NiceBytes(Info.AttachmentBytesDownloaded.load()), Info.AttachmentsStored.load(), NiceBytes(Info.AttachmentBytesStored.load()), Info.MissingAttachmentCount.load())); return Result; } ////////////////////////////////////////////////////////////////////////// // These are here to avoid vtable leakage RemoteProjectStore::RemoteProjectStore() { } RemoteProjectStore::~RemoteProjectStore() { } } // namespace zen