// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include # include #endif // ZEN_WITH_TESTS namespace zen { /* OplogContainer Binary("ops") // Compressed CompactBinary object to hide attachment references, also makes the oplog smaller { CbArray("ops") { CbObject Op (CbFieldType::BinaryAttachment Attachments[]) (OpData) } } CbArray("blocks") CbObject CbFieldType::BinaryAttachment "rawhash" // Optional, only if we are creating blocks (Jupiter/File) CbArray("chunks") CbFieldType::Hash // Chunk hashes CbArray("chunks") // Optional, only if we are not creating blocks (Zen) CbFieldType::BinaryAttachment // Chunk attachment hashes CbArray("chunkedfiles"); CbFieldType::Hash "rawhash" CbFieldType::Integer "rawsize" CbArray("chunks"); CbFieldType::Hash "chunkhash" CbArray("sequence"); CbFieldType::Integer chunks index CompressedBinary ChunkBlock { VarUInt ChunkCount VarUInt ChunkSizes[ChunkCount] uint8_t[chunksize])[ChunkCount] } */ namespace remotestore_impl { using namespace std::literals; void ReportProgress(JobContext* OptionalContext, std::string_view CurrentOp, std::string_view Details, ptrdiff_t Total, ptrdiff_t Remaining, uint64_t ElapsedTimeMS) { if (OptionalContext) { ZEN_ASSERT(Total > 0); OptionalContext->ReportProgress(CurrentOp, Details, Total, Remaining, ElapsedTimeMS); } } void ReportMessage(JobContext* OptionalContext, std::string_view Message) { if (OptionalContext) { OptionalContext->ReportMessage(Message); } ZEN_INFO("{}", Message); } bool IsCancelled(JobContext* OptionalContext) { if (!OptionalContext) { return false; } return OptionalContext->IsCancelled(); } std::string GetStats(const RemoteProjectStore::Stats& Stats, const BuildStorageCache::Statistics* OptionalCacheStats, uint64_t ElapsedWallTimeMS) { uint64_t SentBytes = Stats.m_SentBytes + (OptionalCacheStats ? OptionalCacheStats->TotalBytesWritten.load() : 0); uint64_t ReceivedBytes = Stats.m_ReceivedBytes + (OptionalCacheStats ? OptionalCacheStats->TotalBytesRead.load() : 0); return fmt::format("Sent: {} ({}bits/s) Recv: {} ({}bits/s)", NiceBytes(SentBytes), NiceNum(ElapsedWallTimeMS > 0u ? static_cast((SentBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u), NiceBytes(ReceivedBytes), NiceNum(ElapsedWallTimeMS > 0u ? static_cast((ReceivedBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u)); } void LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats) { ZEN_INFO("Oplog request count: {}. Average request size: {}. Average request time: {}, Peak request speed: {}bits/s", Stats.m_RequestCount, NiceBytes(Stats.m_RequestCount > 0 ? (Stats.m_ReceivedBytes + Stats.m_SentBytes) / Stats.m_RequestCount : 0u), NiceLatencyNs(Stats.m_RequestCount > 0 ? (Stats.m_RequestTimeNS / Stats.m_RequestCount) : 0u), NiceNum(Stats.m_PeakBytesPerSec)); ZEN_INFO( "Oplog sent request avg: {} ({}bits/s). Peak: {}", NiceBytes(Stats.m_RequestCount > 0u ? static_cast((Stats.m_SentBytes) / Stats.m_RequestCount) : 0u), NiceNum(Stats.m_RequestTimeNS > 0u ? static_cast((Stats.m_SentBytes * 8 * 1000000000) / Stats.m_RequestTimeNS) : 0u), NiceBytes(Stats.m_PeakSentBytes)); ZEN_INFO( "Oplog recv request avg: {} ({}bits/s). Peak: {}", NiceBytes(Stats.m_RequestCount > 0u ? static_cast((Stats.m_ReceivedBytes) / Stats.m_RequestCount) : 0u), NiceNum(Stats.m_RequestTimeNS > 0u ? static_cast((Stats.m_ReceivedBytes * 8 * 1000000000) / Stats.m_RequestTimeNS) : 0u), NiceBytes(Stats.m_PeakReceivedBytes)); } size_t AddBlock(RwLock& BlocksLock, std::vector& Blocks) { size_t BlockIndex; { RwLock::ExclusiveLockScope _(BlocksLock); BlockIndex = Blocks.size(); Blocks.resize(BlockIndex + 1); } return BlockIndex; } // BlockComposer packs attachment chunks (each identified by an IoHash and a byte size) into // fixed-size blocks subject to two constraints: // - The total encoded content of a block must not exceed UsableBlockSize bytes. // - A block may contain at most MaxChunksPerBlock chunk entries. // // Chunks belonging to the same op key (Oid) are kept together in one block whenever possible, // so that a single block fetch can satisfy an entire op without needing to read multiple blocks. // // When a block is complete the OnNewBlock callback is invoked with ownership of the chunk-hash // vector for that block. The callback is also invoked for any partially-filled pending block // that remains after all attachments have been processed. class BlockComposer { public: struct Configuration { uint64_t MaxBlockSize = 0; // Total encoded block size limit in bytes (includes header overhead). uint64_t MaxChunksPerBlock = 0; // Maximum number of chunk entries allowed in a single block. uint64_t MaxChunkEmbedSize = 0; // Maximum size of one embeddable chunk; used to calculate worst-case header size. std::function IsCancelledFunc; // Optional: if set and returns true, Compose returns early without emitting remaining blocks. }; explicit BlockComposer(const Configuration& Config) : m_Config(Config), m_UsableBlockSize(CalculateUsableBlockSize(m_Config)) {} // Compose distributes AttachmentHashes into blocks via a two-phase algorithm. // // Phase 1 - Gather (inner while loop): // Starting from the current index, collect all consecutive attachments that share the same // op key (Oid) into CurrentOpRawHashes / CurrentOpChunkSizes. Collection stops (with // CurrentOpFillFullBlock = false) when a different op key is encountered. Collection also // stops early (with CurrentOpFillFullBlock = true) if adding the next same-key attachment // would exceed m_UsableBlockSize by bytes OR would reach MaxChunksPerBlock by count - // meaning the gathered chunks exactly saturate one block and must be emitted immediately. // // Phase 2 - Place (while loop over CurrentOpChunkSizes): // Decides where the gathered chunks go. Exactly one of four mutually exclusive paths runs // per iteration; after each path the loop re-evaluates with whatever chunks remain: // // Path A: CurrentOpFillFullBlock == true // The gathered set exactly fills one block. Emit it immediately as a standalone block // and clear CurrentOpChunkSizes. The pending block is left untouched. // // Path B: All gathered chunks fit in the pending block (both size and count constraints met) // Merge the gathered chunks into PendingChunkHashes/PendingBlockSize and clear the // current-op buffers. If the pending block is now exactly full, flush it immediately. // // Path C: Gathered chunks don't fit AND pending block is >75% full by bytes // The pending block is already well-utilised; flush it now and loop so that the gathered // chunks are re-evaluated against the freshly emptied pending block. // // Path D: Gathered chunks don't fit AND pending block is <=75% full by bytes // The binding constraint is chunk count, not bytes. Greedily fill the pending block with // as many gathered chunks as fit (stopping at the first chunk that would violate either // size or count), flush the pending block, remove the added chunks from the current-op // buffers, and loop so the remaining gathered chunks are re-evaluated. // // Final flush: after all attachments have been processed, any non-empty pending block is // emitted. void Compose(std::span AttachmentHashes, std::span AttachmentSizes, std::span AttachmentKeys, const std::function&& ChunkRawHashes)>& OnNewBlock) { std::vector PendingChunkHashes; uint64_t PendingBlockSize = 0; size_t SortedUploadAttachmentsIndex = 0; Stopwatch AssembleBlocksProgressTimer; while (SortedUploadAttachmentsIndex < AttachmentHashes.size()) { if (m_Config.IsCancelledFunc && m_Config.IsCancelledFunc()) { return; } const IoHash& FirstAttachmentHash = AttachmentHashes[SortedUploadAttachmentsIndex]; const Oid FirstAttachmentOpKey = AttachmentKeys[SortedUploadAttachmentsIndex]; uint64_t CurrentOpAttachmentsSize = AttachmentSizes[SortedUploadAttachmentsIndex]; ZEN_ASSERT(CurrentOpAttachmentsSize <= m_Config.MaxChunkEmbedSize); std::vector CurrentOpRawHashes; CurrentOpRawHashes.push_back(FirstAttachmentHash); std::vector CurrentOpChunkSizes; CurrentOpChunkSizes.push_back(CurrentOpAttachmentsSize); bool CurrentOpFillFullBlock = false; while (SortedUploadAttachmentsIndex + CurrentOpRawHashes.size() < AttachmentHashes.size()) { size_t NextSortedUploadAttachmentsIndex = SortedUploadAttachmentsIndex + CurrentOpChunkSizes.size(); const Oid NextAttachmentOpKey = AttachmentKeys[NextSortedUploadAttachmentsIndex]; if (NextAttachmentOpKey != FirstAttachmentOpKey) { break; } const IoHash& NextAttachmentHash = AttachmentHashes[NextSortedUploadAttachmentsIndex]; uint64_t NextOpAttachmentSize = AttachmentSizes[NextSortedUploadAttachmentsIndex]; ZEN_ASSERT(NextOpAttachmentSize <= m_Config.MaxChunkEmbedSize); if (CurrentOpAttachmentsSize + NextOpAttachmentSize > m_UsableBlockSize) { CurrentOpFillFullBlock = true; break; } CurrentOpRawHashes.push_back(NextAttachmentHash); CurrentOpChunkSizes.push_back(NextOpAttachmentSize); CurrentOpAttachmentsSize += NextOpAttachmentSize; if (CurrentOpRawHashes.size() == m_Config.MaxChunksPerBlock) { CurrentOpFillFullBlock = true; break; } } SortedUploadAttachmentsIndex += CurrentOpChunkSizes.size(); while (!CurrentOpChunkSizes.empty()) { size_t CurrentOpAttachmentCount = CurrentOpChunkSizes.size(); ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size()); ZEN_ASSERT(CurrentOpAttachmentsSize <= m_UsableBlockSize); ZEN_ASSERT(CurrentOpAttachmentCount <= m_Config.MaxChunksPerBlock); // Path A: gathered chunks exactly fill one block -- emit as a standalone block immediately. if (CurrentOpFillFullBlock) { OnNewBlock(std::move(CurrentOpRawHashes)); CurrentOpChunkSizes.clear(); CurrentOpAttachmentsSize = 0; CurrentOpFillFullBlock = false; } else if ((PendingBlockSize + CurrentOpAttachmentsSize) <= m_UsableBlockSize && (PendingChunkHashes.size() + CurrentOpAttachmentCount) <= m_Config.MaxChunksPerBlock) { // Path B: all gathered chunks fit in the pending block -- merge them in. PendingChunkHashes.insert(PendingChunkHashes.end(), CurrentOpRawHashes.begin(), CurrentOpRawHashes.end()); PendingBlockSize += CurrentOpAttachmentsSize; ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize); ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock); CurrentOpRawHashes.clear(); CurrentOpChunkSizes.clear(); CurrentOpAttachmentsSize = 0; if (PendingBlockSize == m_UsableBlockSize || PendingChunkHashes.size() == m_Config.MaxChunksPerBlock) { OnNewBlock(std::move(PendingChunkHashes)); PendingChunkHashes.clear(); PendingBlockSize = 0; } } else if (PendingBlockSize > (m_UsableBlockSize * 3) / 4) { // Path C: gathered chunks don't fit AND pending block is >75% full by bytes -- flush pending // block now; loop to re-evaluate gathered chunks against the freshly emptied pending block. OnNewBlock(std::move(PendingChunkHashes)); PendingChunkHashes.clear(); PendingBlockSize = 0; } else { // Path D: gathered chunks don't fit AND pending block is <=75% full by bytes -- the // binding constraint is chunk count. Greedily fill the pending block with as many // chunks as fit, flush it, remove them from the current-op buffers, and loop with the // remaining gathered chunks in the next iteration. size_t AddedChunkCount = 0; uint64_t AddedChunkSize = 0; for (size_t CurrentChunkIndex = 0; CurrentChunkIndex < CurrentOpRawHashes.size(); CurrentChunkIndex++) { uint64_t ChunkSize = CurrentOpChunkSizes[CurrentChunkIndex]; if (PendingBlockSize + ChunkSize > m_UsableBlockSize) { break; } if (PendingChunkHashes.size() == m_Config.MaxChunksPerBlock) { break; } PendingBlockSize += ChunkSize; PendingChunkHashes.push_back(CurrentOpRawHashes[CurrentChunkIndex]); AddedChunkSize += ChunkSize; AddedChunkCount++; ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize); ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock); } ZEN_ASSERT(AddedChunkSize <= CurrentOpAttachmentsSize); ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize); ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock); ZEN_ASSERT(AddedChunkCount < CurrentOpRawHashes.size()); OnNewBlock(std::move(PendingChunkHashes)); PendingChunkHashes.clear(); PendingBlockSize = 0; CurrentOpRawHashes.erase(CurrentOpRawHashes.begin(), CurrentOpRawHashes.begin() + AddedChunkCount); CurrentOpChunkSizes.erase(CurrentOpChunkSizes.begin(), CurrentOpChunkSizes.begin() + AddedChunkCount); CurrentOpAttachmentsSize -= AddedChunkSize; } } } if (!PendingChunkHashes.empty()) { ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize); ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock); OnNewBlock(std::move(PendingChunkHashes)); PendingChunkHashes.clear(); } } private: // CalculateUsableBlockSize computes the maximum bytes available for chunk content in one // block. The block header encodes: // - A CompressedBuffer header of fixed size. // - One VarUInt field encoding MaxChunksPerBlock. // - MaxChunksPerBlock VarUInt entries each encoding one chunk size (bounded by // MaxChunkEmbedSize, which determines the worst-case VarUInt width). // MaxHeaderSize is the worst-case total header size, so // UsableBlockSize = MaxBlockSize - MaxHeaderSize is a conservative bound that guarantees // chunk content always fits within the encoded block. static uint64_t CalculateUsableBlockSize(const Configuration& Config) { ZEN_ASSERT(Config.MaxChunksPerBlock > 0); ZEN_ASSERT(Config.MaxChunkEmbedSize > 0); uint64_t MaxHeaderSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + MeasureVarUInt(Config.MaxChunksPerBlock) + MeasureVarUInt(Config.MaxChunkEmbedSize) * Config.MaxChunksPerBlock; ZEN_ASSERT(Config.MaxBlockSize > MaxHeaderSize); return Config.MaxBlockSize - MaxHeaderSize; } const Configuration m_Config; const uint64_t m_UsableBlockSize = 0; }; IoBuffer CompressToTempFile(const IoHash& RawHash, const IoBuffer& RawData, const std::filesystem::path& AttachmentPath, OodleCompressor Compressor, OodleCompressionLevel CompressionLevel) { if (IsFile(AttachmentPath)) { ZEN_WARN("Temp attachment file already exists at '{}', truncating", AttachmentPath); } BasicFile CompressedFile; std::error_code Ec; CompressedFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, Ec); if (Ec) { throw std::system_error(Ec, fmt::format("Failed to create temp file for blob {} at '{}'", RawHash, AttachmentPath)); } if (RawData.GetSize() < 512u * 1024u) { CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), Compressor, CompressionLevel); if (!CompressedBlob) { throw std::runtime_error(fmt::format("Failed to compress blob {}", RawHash)); } CompressedFile.Write(CompressedBlob.GetCompressed(), 0); } else { bool CouldCompress = CompressedBuffer::CompressToStream( CompositeBuffer(SharedBuffer(RawData)), [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset, SourceSize); CompressedFile.Write(RangeBuffer, Offset); }, Compressor, CompressionLevel); if (!CouldCompress) { // Compressed is larger than source data... CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), OodleCompressor::Mermaid, OodleCompressionLevel::None); if (!CompressedBlob) { throw std::runtime_error(fmt::format("Failed to compress blob {}", RawHash)); } CompressedFile.SetFileSize(0); CompressedFile.Write(CompressedBlob.GetCompressed(), 0); } } IoBuffer TempAttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath); CompressedFile.Close(); TempAttachmentBuffer.SetDeleteOnClose(true); ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); return TempAttachmentBuffer; } struct FoundAttachment { std::filesystem::path RawPath; // If not stored in cid uint64_t Size = 0; Oid Key = Oid::Zero; }; CbObject RewriteOplog( LoggerRef InLog, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, bool IgnoreMissingAttachments, bool EmbedLooseFiles, const std::filesystem::path& AttachmentTempPath, std::unordered_map& UploadAttachments, // TODO: Rename to OutUploadAttachments JobContext* OptionalContext) { ZEN_SCOPED_LOG(InLog); size_t OpCount = 0; CreateDirectories(AttachmentTempPath); auto RewriteOp = [&](const Oid& Key, CbObjectView Op, const std::function& CB) { bool OpRewritten = false; CbArrayView Files = Op["files"sv].AsArrayView(); if (Files.Num() == 0) { CB(Op); return; } CbWriter Cbo; Cbo.BeginArray("files"sv); for (CbFieldView& Field : Files) { if (remotestore_impl::IsCancelled(OptionalContext)) { return; } bool CopyField = true; if (CbObjectView View = Field.AsObjectView()) { IoHash DataHash = View["data"sv].AsHash(); if (DataHash == IoHash::Zero) { std::string_view ServerPath = View["serverpath"sv].AsString(); std::filesystem::path FilePath = (Project.RootDir / ServerPath).make_preferred(); MakeSafeAbsolutePathInPlace(FilePath); if (!IsFile(FilePath)) { remotestore_impl::ReportMessage( OptionalContext, fmt::format("Missing attachment '{}' for op '{}'", FilePath, View["id"sv].AsObjectId())); if (IgnoreMissingAttachments) { continue; } else { ExtendableStringBuilder<1024> Sb; Sb.Append("Failed to find attachment '"); Sb.Append(FilePath.string()); Sb.Append("' for op: \n"); View.ToJson(Sb); throw std::runtime_error(Sb.ToString()); } } { Stopwatch HashTimer; SharedBuffer DataBuffer(IoBufferBuilder::MakeFromFile(FilePath)); DataHash = IoHash::HashBuffer(CompositeBuffer(DataBuffer)); ZEN_INFO("Hashed loose file '{}' {}: {} in {}", FilePath, NiceBytes(DataBuffer.GetSize()), DataHash, NiceTimeSpanMs(HashTimer.GetElapsedTimeMs())); } // Rewrite file array entry with new data reference CbObjectWriter Writer; RewriteCbObject(Writer, View, [&](CbObjectWriter&, CbFieldView Field) -> bool { if (Field.GetName() == "data"sv) { // omit this field as we will write it explicitly ourselves return true; } return false; }); Writer.AddBinaryAttachment("data"sv, DataHash); UploadAttachments.insert_or_assign(DataHash, FoundAttachment{.RawPath = FilePath, .Key = Key}); CbObject RewrittenOp = Writer.Save(); Cbo.AddObject(std::move(RewrittenOp)); CopyField = false; } } if (CopyField) { Cbo.AddField(Field); } else { OpRewritten = true; } } if (!OpRewritten) { CB(Op); return; } Cbo.EndArray(); CbArray FilesArray = Cbo.Save().AsArray(); CbObject RewrittenOp = RewriteCbObject(Op, [&](CbObjectWriter& NewWriter, CbFieldView Field) -> bool { if (Field.GetName() == "files"sv) { NewWriter.AddArray("files"sv, FilesArray); return true; } return false; }); CB(RewrittenOp); }; remotestore_impl::ReportMessage(OptionalContext, "Building exported oplog and collecting attachments"); Stopwatch Timer; size_t TotalOpCount = Oplog.GetOplogEntryCount(); Stopwatch RewriteOplogTimer; CbObjectWriter SectionOpsWriter; SectionOpsWriter.BeginArray("ops"sv); { Stopwatch BuildingOplogProgressTimer; Oplog.IterateOplogWithKey([&](int, const Oid& Key, CbObjectView Op) { if (remotestore_impl::IsCancelled(OptionalContext)) { return; } Op.IterateAttachments([&](CbFieldView FieldView) { UploadAttachments.insert_or_assign(FieldView.AsAttachment(), FoundAttachment{.Key = Key}); }); if (EmbedLooseFiles) { RewriteOp(Key, Op, [&SectionOpsWriter](CbObjectView Op) { SectionOpsWriter << Op; }); } else { SectionOpsWriter << Op; } OpCount++; if (OpCount % 1000 == 0) { remotestore_impl::ReportProgress(OptionalContext, "Building oplog"sv, fmt::format("{} ops processed", OpCount), TotalOpCount, TotalOpCount - OpCount, BuildingOplogProgressTimer.GetElapsedTimeMs()); } }); if (remotestore_impl::IsCancelled(OptionalContext)) { return {}; } if (TotalOpCount > 0) { remotestore_impl::ReportProgress(OptionalContext, "Building oplog"sv, fmt::format("{} ops processed", OpCount), TotalOpCount, 0, BuildingOplogProgressTimer.GetElapsedTimeMs()); } } SectionOpsWriter.EndArray(); // "ops" return SectionOpsWriter.Save(); } struct FoundChunkedFile { IoHash RawHash = IoHash::Zero; IoBuffer Source; uint64_t Offset = 0; uint64_t Size = 0; }; void FindChunkSizes(CidStore& ChunkStore, WorkerThreadPool& WorkerPool, size_t MaxChunkEmbedSize, size_t ChunkFileSizeLimit, bool AllowChunking, const std::filesystem::path& AttachmentTempPath, std::unordered_map& UploadAttachments, std::unordered_set& MissingHashes, std::vector& AttachmentsToChunk, JobContext* OptionalContext) { if (UploadAttachments.empty()) { return; } Stopwatch FindChunkSizesTimer; RwLock FindChunkSizesLock; std::atomic AbortFlag(false); std::atomic PauseFlag(false); ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (auto& It : UploadAttachments) { if (remotestore_impl::IsCancelled(OptionalContext)) { AbortFlag.store(true); break; } Work.ScheduleWork( WorkerPool, [&ChunkStore, UploadAttachment = &It.second, RawHash = It.first, &FindChunkSizesLock, &MissingHashes, AttachmentTempPath, MaxChunkEmbedSize, ChunkFileSizeLimit, AllowChunking, &AttachmentsToChunk, OptionalContext](std::atomic& AbortFlag) { if (remotestore_impl::IsCancelled(OptionalContext)) { AbortFlag.store(true); } if (AbortFlag) { return; } if (!UploadAttachment->RawPath.empty()) { const std::filesystem::path& FilePath = UploadAttachment->RawPath; IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); if (RawData) { UploadAttachment->Size = RawData.GetSize(); if (AllowChunking && UploadAttachment->Size > ChunkFileSizeLimit) { FindChunkSizesLock.WithExclusiveLock([&]() { AttachmentsToChunk.push_back( FoundChunkedFile{.RawHash = RawHash, .Source = RawData, .Offset = 0, .Size = RawData.GetSize()}); }); } } else { FindChunkSizesLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); } } else { IoBuffer Data = ChunkStore.FindChunkByCid(RawHash); if (Data) { UploadAttachment->Size = Data.GetSize(); if (AllowChunking && Data.IsWholeFile()) { IoHash VerifyRawHash; uint64_t VerifyRawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Data), VerifyRawHash, VerifyRawSize); if (Compressed) { if (VerifyRawSize > ChunkFileSizeLimit) { OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; uint64_t BlockSize; if (Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { if (CompressionLevel == OodleCompressionLevel::None) { CompositeBuffer Decompressed = Compressed.DecompressToComposite(); if (Decompressed) { std::span Segments = Decompressed.GetSegments(); if (Segments.size() == 1) { IoBuffer DecompressedData = Segments[0].AsIoBuffer(); IoBufferFileReference DecompressedFileRef; if (DecompressedData.GetFileReference(DecompressedFileRef)) { // Are we still pointing to disk? FindChunkSizesLock.WithExclusiveLock([&]() { AttachmentsToChunk.push_back( FoundChunkedFile{.RawHash = RawHash, .Source = Data, .Offset = DecompressedFileRef.FileChunkOffset, .Size = DecompressedFileRef.FileChunkSize}); }); } } } } } } } } } else { FindChunkSizesLock.WithExclusiveLock([RawHash, &MissingHashes]() { MissingHashes.insert(RawHash); }); } } }); } Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, IsPaused); if (remotestore_impl::IsCancelled(OptionalContext)) { AbortFlag.store(true); } remotestore_impl::ReportProgress(OptionalContext, "Finding attachments"sv, fmt::format("{}{} remaining...", AbortFlag.load() ? "Aborting, " : "", PendingWork), UploadAttachments.size(), PendingWork, FindChunkSizesTimer.GetElapsedTimeMs()); }); if (!AbortFlag.load()) { remotestore_impl::ReportProgress(OptionalContext, "Finding attachments"sv, "", UploadAttachments.size(), 0, FindChunkSizesTimer.GetElapsedTimeMs()); } } struct ChunkedFile { IoBuffer Source; ChunkedInfoWithSource Chunked; }; std::vector ChunkAttachments(WorkerThreadPool& WorkerPool, const std::vector& AttachmentsToChunk, JobContext* OptionalContext) { if (AttachmentsToChunk.empty()) { return {}; } Stopwatch ChunkAttachmentsTimer; std::vector ChunkedFiles(AttachmentsToChunk.size()); std::atomic AbortFlag(false); std::atomic PauseFlag(false); ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (size_t ChunkFileIndexToChunk = 0; ChunkFileIndexToChunk < AttachmentsToChunk.size(); ChunkFileIndexToChunk++) { if (remotestore_impl::IsCancelled(OptionalContext)) { AbortFlag.store(true); break; } Work.ScheduleWork(WorkerPool, [&AttachmentsToChunk, ChunkFileIndexToChunk, &ChunkedFiles, OptionalContext](std::atomic& AbortFlag) { if (remotestore_impl::IsCancelled(OptionalContext)) { AbortFlag.store(true); } if (AbortFlag) { return; } const remotestore_impl::FoundChunkedFile& AttachmentToChunk = AttachmentsToChunk[ChunkFileIndexToChunk]; const IoHash& RawHash = AttachmentToChunk.RawHash; const IoBuffer& Buffer = AttachmentToChunk.Source; IoBufferFileReference FileRef; bool IsFile = Buffer.GetFileReference(FileRef); ZEN_ASSERT(IsFile); Stopwatch ChunkOneTimer; uint64_t Offset = AttachmentToChunk.Offset; uint64_t Size = AttachmentToChunk.Size; BasicFile SourceFile; SourceFile.Attach(FileRef.FileHandle); auto __ = MakeGuard([&SourceFile]() { SourceFile.Detach(); }); ChunkedFile& Chunked = ChunkedFiles[ChunkFileIndexToChunk]; Chunked.Source = Buffer; Chunked.Chunked = ChunkData(SourceFile, Offset, Size, UShaderByteCodeParams); ZEN_ASSERT(Chunked.Chunked.Info.RawHash == RawHash); ZEN_INFO("Chunked large attachment '{}' {} into {} chunks in {}", RawHash, NiceBytes(Chunked.Chunked.Info.RawSize), Chunked.Chunked.Info.ChunkHashes.size(), NiceTimeSpanMs(ChunkOneTimer.GetElapsedTimeMs())); }); } Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, IsPaused); if (remotestore_impl::IsCancelled(OptionalContext)) { AbortFlag.store(true); } remotestore_impl::ReportProgress(OptionalContext, "Chunking attachments"sv, fmt::format("{}{} remaining...", AbortFlag.load() ? "Aborting, " : "", PendingWork), AttachmentsToChunk.size(), PendingWork, ChunkAttachmentsTimer.GetElapsedTimeMs()); }); if (!AbortFlag.load()) { remotestore_impl::ReportProgress(OptionalContext, "Chunking attachments"sv, "", AttachmentsToChunk.size(), 0, ChunkAttachmentsTimer.GetElapsedTimeMs()); } return ChunkedFiles; } void ResolveAttachments(CidStore& ChunkStore, WorkerThreadPool& WorkerPool, uint64_t MaxChunkEmbedSize, const std::filesystem::path& AttachmentTempPath, std::unordered_map& UploadAttachments, std::unordered_map& LargeChunkAttachments, std::unordered_map, IoHash::Hasher>& LooseUploadAttachments, JobContext* OptionalContext) { ZEN_ASSERT(!UploadAttachments.empty()); Stopwatch UploadAttachmentsTimer; RwLock ResolveLock; std::atomic AbortFlag(false); std::atomic PauseFlag(false); ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (auto& It : UploadAttachments) { if (remotestore_impl::IsCancelled(OptionalContext)) { AbortFlag.store(true); break; } Work.ScheduleWork( WorkerPool, [&ChunkStore, MaxChunkEmbedSize, &AttachmentTempPath, &ResolveLock, &LargeChunkAttachments, &LooseUploadAttachments, UploadAttachment = &It.second, RawHash = It.first, OptionalContext](std::atomic& AbortFlag) { if (remotestore_impl::IsCancelled(OptionalContext)) { AbortFlag.store(true); } if (AbortFlag) { return; } if (!UploadAttachment->RawPath.empty()) { if (UploadAttachment->Size > (MaxChunkEmbedSize * 2)) { // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't // it will be a loose attachment instead of going into a block TGetAttachmentBufferFunc FetchFunc = [RawPath = UploadAttachment->RawPath, AttachmentTempPath, RawSize = UploadAttachment->Size]( const IoHash& RawHash) -> CompositeBuffer { IoBuffer RawData = IoBufferBuilder::MakeFromFile(RawPath); if (!RawData) { throw std::runtime_error( fmt::format("Failed to read source file for blob {} from '{}'", RawHash, RawPath)); } std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash, RawData, AttachmentPath, OodleCompressor::Mermaid, OodleCompressionLevel::VeryFast); if (!TempAttachmentBuffer) { throw std::runtime_error(fmt::format("Failed to compressed source file for blob {} from '{}' to '{}'", RawHash, RawPath, AttachmentPath)); } TempAttachmentBuffer.SetDeleteOnClose(true); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), NiceBytes(TempAttachmentBuffer.GetSize())); return CompositeBuffer(SharedBuffer(std::move(TempAttachmentBuffer))); }; RwLock::ExclusiveLockScope _(ResolveLock); LargeChunkAttachments.insert_or_assign(RawHash, std::move(FetchFunc)); } else { // Compress inline - check compressed size to see if it should go into a block or not IoBuffer RawData = IoBufferBuilder::MakeFromFile(UploadAttachment->RawPath); if (!RawData) { throw std::runtime_error( fmt::format("Failed to read source file for blob {} from '{}'", RawHash, UploadAttachment->RawPath)); } std::filesystem::path TempFilePath = AttachmentTempPath; TempFilePath.append(RawHash.ToHexString()); IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash, RawData, TempFilePath, OodleCompressor::Mermaid, OodleCompressionLevel::VeryFast); TempAttachmentBuffer.SetDeleteOnClose(true); uint64_t CompressedSize = TempAttachmentBuffer.GetSize(); ZEN_INFO("Saved temp attachment to '{}', {} ({})", TempFilePath, NiceBytes(UploadAttachment->Size), NiceBytes(CompressedSize)); if (CompressedSize > MaxChunkEmbedSize) { TGetAttachmentBufferFunc FetchFunc = [Data = std::move(TempAttachmentBuffer)](const IoHash&) mutable { return CompositeBuffer(SharedBuffer(std::move(Data))); }; RwLock::ExclusiveLockScope _(ResolveLock); LargeChunkAttachments.insert_or_assign(RawHash, std::move(FetchFunc)); } else { UploadAttachment->Size = CompressedSize; std::pair LooseAttachment(RawData.GetSize(), std::move(TempAttachmentBuffer)); RwLock::ExclusiveLockScope _(ResolveLock); LooseUploadAttachments.insert_or_assign(RawHash, std::move(LooseAttachment)); } } } else { if (UploadAttachment->Size > MaxChunkEmbedSize) { TGetAttachmentBufferFunc FetchFunc = [&ChunkStore](const IoHash& RawHash) { return CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash))); }; RwLock::ExclusiveLockScope _(ResolveLock); LargeChunkAttachments.insert_or_assign(RawHash, std::move(FetchFunc)); } } }); } Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, IsPaused); if (remotestore_impl::IsCancelled(OptionalContext)) { AbortFlag.store(true); } remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, fmt::format("{}{} remaining...", AbortFlag.load() ? "Aborting, " : "", PendingWork), UploadAttachments.size(), PendingWork, UploadAttachmentsTimer.GetElapsedTimeMs()); }); if (!AbortFlag.load()) { remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "", UploadAttachments.size(), 0, UploadAttachmentsTimer.GetElapsedTimeMs()); } } RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext) { using namespace std::literals; Stopwatch Timer; CbArrayView OpsArray = SectionObject["ops"sv].AsArrayView(); const uint64_t OpCount = OpsArray.Num(); ReportMessage(OptionalContext, fmt::format("Writing {} ops to oplog", OpCount)); const size_t OpsBatchSize = 8192; std::vector OpsData; std::vector OpDataOffsets; size_t OpsCompleteCount = 0; OpsData.reserve(OpsBatchSize); auto AppendBatch = [&]() { std::vector Ops; Ops.reserve(OpDataOffsets.size()); for (size_t OpDataOffset : OpDataOffsets) { Ops.emplace_back(CbObjectView(&OpsData[OpDataOffset])); } std::vector OpLsns = Oplog.AppendNewOplogEntries(Ops); OpsCompleteCount += OpLsns.size(); OpsData.clear(); OpDataOffsets.clear(); ReportProgress(OptionalContext, "Writing oplog"sv, fmt::format("{} remaining...", OpCount - OpsCompleteCount), OpCount, OpCount - OpsCompleteCount, Timer.GetElapsedTimeMs()); }; BinaryWriter Writer; for (CbFieldView OpEntry : OpsArray) { CbObjectView Op = OpEntry.AsObjectView(); Op.CopyTo(Writer); OpDataOffsets.push_back(OpsData.size()); OpsData.insert(OpsData.end(), (const uint8_t*)Writer.GetData(), ((const uint8_t*)Writer.GetData()) + Writer.GetSize()); Writer.Reset(); if (OpDataOffsets.size() == OpsBatchSize) { AppendBatch(); } } if (!OpDataOffsets.empty()) { AppendBatch(); } if (OpCount > 0) { ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0, Timer.GetElapsedTimeMs()); } return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; } struct DownloadInfo { uint64_t OplogSizeBytes = 0; std::atomic AttachmentsDownloaded = 0; std::atomic AttachmentBlocksDownloaded = 0; std::atomic AttachmentBlocksRangesDownloaded = 0; std::atomic AttachmentBytesDownloaded = 0; std::atomic AttachmentBlockBytesDownloaded = 0; std::atomic AttachmentBlockRangeBytesDownloaded = 0; std::atomic AttachmentsStored = 0; std::atomic AttachmentBytesStored = 0; std::atomic_size_t MissingAttachmentCount = 0; std::atomic ChunksCompleteCount = 0; }; class JobContextSink : public logging::Sink { public: explicit JobContextSink(JobContext* Context) : m_Context(Context) {} void Log(const logging::LogMessage& Msg) override { if (m_Context) { m_Context->ReportMessage(Msg.GetPayload()); } } void Flush() override {} void SetFormatter(std::unique_ptr) override {} private: JobContext* m_Context; }; class JobContextLogger { public: explicit JobContextLogger(JobContext* OptionalContext) { if (!OptionalContext) { return; } logging::SinkPtr ContextSink(new JobContextSink(OptionalContext)); Ref DefaultSink = GetDefaultBroadcastSink(); std::vector Sinks; if (DefaultSink) { Sinks.push_back(DefaultSink); } Sinks.push_back(std::move(ContextSink)); Ref Broadcast(new logging::BroadcastSink(std::move(Sinks))); m_Log = Ref(new logging::Logger("jobcontext", Broadcast)); } LoggerRef Log() const { return m_Log ? LoggerRef(*m_Log) : zen::Log(); } private: Ref m_Log; }; void DownloadAndSaveBlockChunks(LoadOplogContext& Context, ParallelWork& AttachmentWork, DownloadInfo& Info, Stopwatch& LoadAttachmentsTimer, std::atomic_uint64_t& DownloadStartMS, ThinChunkBlockDescription&& ThinBlockDescription, std::vector&& NeededChunkIndexes) { AttachmentWork.ScheduleWork( Context.NetworkWorkerPool, [&Context, &AttachmentWork, ThinBlockDescription = std::move(ThinBlockDescription), NeededChunkIndexes = std::move(NeededChunkIndexes), &Info, &LoadAttachmentsTimer, &DownloadStartMS](std::atomic& AbortFlag) { ZEN_TRACE_CPU("DownloadBlockChunks"); ZEN_SCOPED_LOG(Context.Log); if (AbortFlag) { return; } try { std::vector Chunks; Chunks.reserve(NeededChunkIndexes.size()); for (uint32_t ChunkIndex : NeededChunkIndexes) { Chunks.push_back(ThinBlockDescription.ChunkRawHashes[ChunkIndex]); } uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); RemoteProjectStore::LoadAttachmentsResult Result = Context.RemoteStore.LoadAttachments(Chunks); if (Result.ErrorCode) { ReportMessage(Context.OptionalJobContext, fmt::format("Failed to load attachments with {} chunks ({}): {}", Chunks.size(), Result.ErrorCode, Result.Reason)); Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { throw RemoteStoreError(Result.Reason, Result.ErrorCode, Result.Text); } return; } Info.AttachmentsDownloaded.fetch_add(Result.Chunks.size()); for (const auto& It : Result.Chunks) { uint64_t ChunkSize = It.second.GetCompressedSize(); Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); } remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Loaded {} bulk attachments in {}", Chunks.size(), NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000)))); if (AbortFlag) { return; } AttachmentWork.ScheduleWork( Context.WorkerPool, [&Info, &Context, Chunks = std::move(Result.Chunks)](std::atomic& AbortFlag) { if (AbortFlag) { return; } if (!Chunks.empty()) { std::vector WriteAttachmentBuffers; std::vector WriteRawHashes; WriteAttachmentBuffers.reserve(Chunks.size()); WriteRawHashes.reserve(Chunks.size()); for (const auto& It : Chunks) { WriteAttachmentBuffers.push_back(It.second.GetCompressed().Flatten().AsIoBuffer()); WriteRawHashes.push_back(It.first); } std::vector InsertResults = Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); for (size_t Index = 0; Index < InsertResults.size(); Index++) { if (InsertResults[Index].New) { Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); Info.AttachmentsStored.fetch_add(1); } } } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const RemoteStoreError&) { throw; } catch (const std::exception& Ex) { throw RemoteStoreError(fmt::format("Failed to bulk load {} attachments", NeededChunkIndexes.size()), gsl::narrow(HttpResponseCode::InternalServerError), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; void DownloadAndSaveBlock(LoadOplogContext& Context, ParallelWork& AttachmentWork, DownloadInfo& Info, Stopwatch& LoadAttachmentsTimer, std::atomic_uint64_t& DownloadStartMS, const IoHash& BlockHash, const tsl::robin_map& AllNeededPartialChunkHashesLookup, std::span> ChunkDownloadedFlags, uint32_t RetriesLeft) { AttachmentWork.ScheduleWork( Context.NetworkWorkerPool, [&AttachmentWork, &Context, &Info, &LoadAttachmentsTimer, &DownloadStartMS, RetriesLeft, BlockHash = IoHash(BlockHash), &AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags](std::atomic& AbortFlag) { ZEN_TRACE_CPU("DownloadBlock"); ZEN_SCOPED_LOG(Context.Log); if (AbortFlag) { return; } try { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); IoBuffer BlobBuffer; if (Context.OptionalCache) { BlobBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, BlockHash); } if (!BlobBuffer) { RemoteProjectStore::LoadAttachmentResult BlockResult = Context.RemoteStore.LoadAttachment(BlockHash); if (BlockResult.ErrorCode) { ReportMessage(Context.OptionalJobContext, fmt::format("Failed to download block attachment {} ({}): {}", BlockHash, BlockResult.Reason, BlockResult.Text)); Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { throw RemoteStoreError(BlockResult.Reason, BlockResult.ErrorCode, BlockResult.Text); } return; } if (AbortFlag) { return; } BlobBuffer = std::move(BlockResult.Bytes); ZEN_DEBUG("Loaded block attachment '{}' in {} ({})", BlockHash, NiceTimeSpanMs(static_cast(BlockResult.ElapsedSeconds * 1000)), NiceBytes(BlobBuffer.Size())); if (Context.OptionalCache && Context.PopulateCache) { Context.OptionalCache->PutBuildBlob(Context.CacheBuildId, BlockHash, BlobBuffer.GetContentType(), CompositeBuffer(SharedBuffer(BlobBuffer))); } } uint64_t BlockSize = BlobBuffer.GetSize(); Info.AttachmentBlocksDownloaded.fetch_add(1); Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); AttachmentWork.ScheduleWork( Context.WorkerPool, [&AttachmentWork, &Context, &Info, &LoadAttachmentsTimer, &DownloadStartMS, RetriesLeft, BlockHash = IoHash(BlockHash), &AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, Bytes = std::move(BlobBuffer)](std::atomic& AbortFlag) { ZEN_SCOPED_LOG(Context.Log); if (AbortFlag) { return; } try { ZEN_ASSERT(Bytes.Size() > 0); std::vector WriteAttachmentBuffers; std::vector WriteRawHashes; IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Bytes), RawHash, RawSize); std::string ErrorString; if (!Compressed) { ErrorString = fmt::format("Block attachment {} is malformed, can't parse as compressed binary", BlockHash); } else if (RawHash != BlockHash) { ErrorString = fmt::format("Block attachment {} has mismatching raw hash ({})", BlockHash, RawHash); } else if (CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); !BlockPayload) { ErrorString = fmt::format("Block attachment {} is malformed, can't decompress payload", BlockHash); } else { uint64_t PotentialSize = 0; uint64_t UsedSize = 0; uint64_t BlockSize = BlockPayload.GetSize(); uint64_t BlockHeaderSize = 0; bool StoreChunksOK = IterateChunkBlock( BlockPayload.Flatten(), [&AllNeededPartialChunkHashesLookup, &ChunkDownloadedFlags, &WriteAttachmentBuffers, &WriteRawHashes, &Info, &PotentialSize](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(AttachmentRawHash); if (ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) { bool Expected = false; if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true)) { WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); IoHash RawHash; uint64_t RawSize; ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader( WriteAttachmentBuffers.back(), RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr)); ZEN_ASSERT(RawHash == AttachmentRawHash); WriteRawHashes.emplace_back(AttachmentRawHash); PotentialSize += WriteAttachmentBuffers.back().GetSize(); } } }, BlockHeaderSize); if (!StoreChunksOK) { ErrorString = fmt::format("Invalid format for block {}", BlockHash); } else { if (!WriteAttachmentBuffers.empty()) { std::vector Results = Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (size_t Index = 0; Index < Results.size(); Index++) { const CidStore::InsertResult& Result = Results[Index]; if (Result.New) { Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); Info.AttachmentsStored.fetch_add(1); UsedSize += WriteAttachmentBuffers[Index].GetSize(); } } Info.ChunksCompleteCount += WriteAttachmentBuffers.size(); if (UsedSize < BlockSize) { ZEN_DEBUG("Used {} (skipping {}) out of {} for block {} ({} %) (use of matching {}%)", NiceBytes(UsedSize), NiceBytes(BlockSize - UsedSize), NiceBytes(BlockSize), BlockHash, (100 * UsedSize) / BlockSize, PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0); } } } } if (!ErrorString.empty()) { if (RetriesLeft > 0) { ReportMessage(Context.OptionalJobContext, fmt::format("{}, retrying download", ErrorString)); return DownloadAndSaveBlock(Context, AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, BlockHash, AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, RetriesLeft - 1); } else { ReportMessage(Context.OptionalJobContext, ErrorString); throw RemoteStoreError(ErrorString, gsl::narrow(HttpResponseCode::InternalServerError), {}); } } } catch (const RemoteStoreError&) { throw; } catch (const std::exception& Ex) { throw RemoteStoreError(fmt::format("Failed to save block attachment {}", BlockHash), gsl::narrow(HttpResponseCode::InternalServerError), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } catch (const RemoteStoreError&) { throw; } catch (const std::exception& Ex) { throw RemoteStoreError(fmt::format("Failed to download block attachment {}", BlockHash), gsl::narrow(HttpResponseCode::InternalServerError), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; void DownloadPartialBlock(LoadOplogContext& Context, std::atomic& AbortFlag, DownloadInfo& Info, double& DownloadTimeSeconds, const ChunkBlockDescription& BlockDescription, bool BlockExistsInCache, std::span BlockRangeDescriptors, size_t BlockRangeIndexStart, size_t BlockRangeCount, std::function> OffsetAndLengths)>&& OnDownloaded) { ZEN_ASSERT(Context.StoreMaxRangeCountPerRequest != 0); ZEN_ASSERT(BlockExistsInCache == false || Context.CacheMaxRangeCountPerRequest != 0); std::vector> Ranges; Ranges.reserve(BlockRangeDescriptors.size()); for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount; BlockRangeIndex++) { const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeIndex]; Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength)); } size_t SubBlockRangeCount = BlockRangeCount; size_t SubRangeCountComplete = 0; std::span> RangesSpan(Ranges); while (SubRangeCountComplete < SubBlockRangeCount) { if (AbortFlag.load()) { break; } size_t SubRangeStartIndex = BlockRangeIndexStart + SubRangeCountComplete; if (BlockExistsInCache) { ZEN_ASSERT(Context.OptionalCache); size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, Context.CacheMaxRangeCountPerRequest); if (SubRangeCount == 1) { // Legacy single-range path, prefer that for max compatibility const std::pair SubRange = RangesSpan[SubRangeCountComplete]; Stopwatch CacheTimer; IoBuffer PayloadBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, BlockDescription.BlockHash, SubRange.first, SubRange.second); DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0; if (AbortFlag.load()) { break; } if (PayloadBuffer) { OnDownloaded(std::move(PayloadBuffer), SubRangeStartIndex, std::vector>{std::make_pair(0u, SubRange.second)}); SubRangeCountComplete += SubRangeCount; continue; } } else { auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); Stopwatch CacheTimer; BuildStorageCache::BuildBlobRanges RangeBuffers = Context.OptionalCache->GetBuildBlobRanges(Context.CacheBuildId, BlockDescription.BlockHash, SubRanges); DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0; if (AbortFlag.load()) { break; } if (RangeBuffers.PayloadBuffer) { if (RangeBuffers.Ranges.empty()) { SubRangeCount = Ranges.size() - SubRangeCountComplete; OnDownloaded(std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangesSpan.subspan(SubRangeCountComplete, SubRangeCount)); SubRangeCountComplete += SubRangeCount; continue; } else if (RangeBuffers.Ranges.size() == SubRangeCount) { OnDownloaded(std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangeBuffers.Ranges); SubRangeCountComplete += SubRangeCount; continue; } } } } size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, Context.StoreMaxRangeCountPerRequest); auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); RemoteProjectStore::LoadAttachmentRangesResult BlockResult = Context.RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, SubRanges); DownloadTimeSeconds += BlockResult.ElapsedSeconds; if (AbortFlag.load()) { break; } if (BlockResult.ErrorCode || !BlockResult.Bytes) { ReportMessage(Context.OptionalJobContext, fmt::format("Failed to download {} ranges from block attachment '{}' ({}): {}", SubRanges.size(), BlockDescription.BlockHash, BlockResult.ErrorCode, BlockResult.Reason)); Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { throw RemoteStoreError(BlockResult.Reason, BlockResult.ErrorCode, BlockResult.Text); } } else { if (BlockResult.Ranges.empty()) { // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 // Use the whole payload for the remaining ranges if (Context.OptionalCache && Context.PopulateCache) { Context.OptionalCache->PutBuildBlob(Context.CacheBuildId, BlockDescription.BlockHash, ZenContentType::kCompressedBinary, CompositeBuffer(std::vector{BlockResult.Bytes})); if (AbortFlag.load()) { break; } } SubRangeCount = Ranges.size() - SubRangeCountComplete; OnDownloaded(std::move(BlockResult.Bytes), SubRangeStartIndex, RangesSpan.subspan(SubRangeCountComplete, SubRangeCount)); } else { if (BlockResult.Ranges.size() != SubRanges.size()) { throw RemoteStoreError(fmt::format("Range response for block {} contains {} ranges, expected {} ranges", BlockDescription.BlockHash, BlockResult.Ranges.size(), SubRanges.size()), gsl::narrow(HttpResponseCode::InternalServerError), ""); } OnDownloaded(std::move(BlockResult.Bytes), SubRangeStartIndex, BlockResult.Ranges); } } SubRangeCountComplete += SubRangeCount; } } void DownloadAndSavePartialBlock(LoadOplogContext& Context, ParallelWork& AttachmentWork, DownloadInfo& Info, Stopwatch& LoadAttachmentsTimer, std::atomic_uint64_t& DownloadStartMS, const ChunkBlockDescription& BlockDescription, bool BlockExistsInCache, std::span BlockRangeDescriptors, size_t BlockRangeIndexStart, size_t BlockRangeCount, const tsl::robin_map& AllNeededPartialChunkHashesLookup, std::span> ChunkDownloadedFlags, uint32_t RetriesLeft) { AttachmentWork.ScheduleWork( Context.NetworkWorkerPool, [&AttachmentWork, &Context, &Info, &LoadAttachmentsTimer, &DownloadStartMS, BlockDescription, BlockExistsInCache, BlockRangeDescriptors, BlockRangeIndexStart, BlockRangeCount, &AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, RetriesLeft](std::atomic& AbortFlag) { ZEN_TRACE_CPU("DownloadBlockRanges"); ZEN_SCOPED_LOG(Context.Log); try { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); double DownloadElapsedSeconds = 0; uint64_t DownloadedBytes = 0; DownloadPartialBlock( Context, AbortFlag, Info, DownloadElapsedSeconds, BlockDescription, BlockExistsInCache, BlockRangeDescriptors, BlockRangeIndexStart, BlockRangeCount, [&](IoBuffer&& Buffer, size_t BlockRangeStartIndex, std::span> OffsetAndLengths) { uint64_t BlockPartSize = Buffer.GetSize(); DownloadedBytes += BlockPartSize; Info.AttachmentBlockRangeBytesDownloaded.fetch_add(BlockPartSize); Info.AttachmentBlocksRangesDownloaded++; AttachmentWork.ScheduleWork( Context.WorkerPool, [&AttachmentWork, &Context, &Info, &LoadAttachmentsTimer, &DownloadStartMS, BlockDescription, BlockExistsInCache, BlockRangeDescriptors, BlockRangeStartIndex, &AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, RetriesLeft, BlockPayload = std::move(Buffer), OffsetAndLengths = std::vector>(OffsetAndLengths.begin(), OffsetAndLengths.end())]( std::atomic& AbortFlag) { ZEN_SCOPED_LOG(Context.Log); try { ZEN_ASSERT(BlockPayload.Size() > 0); size_t RangeCount = OffsetAndLengths.size(); for (size_t RangeOffset = 0; RangeOffset < RangeCount; RangeOffset++) { if (AbortFlag) { return; } const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRangeDescriptors[BlockRangeStartIndex + RangeOffset]; const std::pair& OffsetAndLength = OffsetAndLengths[RangeOffset]; IoBuffer BlockRangeBuffer(BlockPayload, OffsetAndLength.first, OffsetAndLength.second); std::vector WriteAttachmentBuffers; std::vector WriteRawHashes; uint64_t PotentialSize = 0; uint64_t UsedSize = 0; uint64_t BlockPartSize = BlockRangeBuffer.GetSize(); uint32_t OffsetInBlock = 0; for (uint32_t ChunkBlockIndex = BlockRange.ChunkBlockIndexStart; ChunkBlockIndex < BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount; ChunkBlockIndex++) { if (AbortFlag) { break; } const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; if (auto ChunkIndexIt = AllNeededPartialChunkHashesLookup.find(ChunkHash); ChunkIndexIt != AllNeededPartialChunkHashesLookup.end()) { if (!ChunkDownloadedFlags[ChunkIndexIt->second]) { IoHash VerifyChunkHash; uint64_t VerifyChunkSize; CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed( SharedBuffer(IoBuffer(BlockRangeBuffer, OffsetInBlock, ChunkCompressedSize)), VerifyChunkHash, VerifyChunkSize); std::string ErrorString; if (!CompressedChunk) { ErrorString = fmt::format( "Chunk at {},{} in block attachment '{}' is not a valid compressed buffer", OffsetInBlock, ChunkCompressedSize, BlockDescription.BlockHash); } else if (VerifyChunkHash != ChunkHash) { ErrorString = fmt::format( "Chunk at {},{} in block attachment '{}' has mismatching hash, expected " "{}, got {}", OffsetInBlock, ChunkCompressedSize, BlockDescription.BlockHash, ChunkHash, VerifyChunkHash); } else if (VerifyChunkSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex]) { ErrorString = fmt::format( "Chunk at {},{} in block attachment '{}' has mismatching raw size, " "expected {}, " "got {}", OffsetInBlock, ChunkCompressedSize, BlockDescription.BlockHash, BlockDescription.ChunkRawLengths[ChunkBlockIndex], VerifyChunkSize); } if (!ErrorString.empty()) { if (RetriesLeft > 0) { ReportMessage(Context.OptionalJobContext, fmt::format("{}, retrying download", ErrorString)); return DownloadAndSavePartialBlock(Context, AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, BlockDescription, BlockExistsInCache, BlockRangeDescriptors, BlockRangeStartIndex, RangeCount, AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, RetriesLeft - 1); } ReportMessage(Context.OptionalJobContext, ErrorString); Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { throw RemoteStoreError("Malformed chunk block", gsl::narrow(HttpResponseCode::NotFound), ErrorString); } } else { bool Expected = false; if (ChunkDownloadedFlags[ChunkIndexIt->second].compare_exchange_strong(Expected, true)) { WriteAttachmentBuffers.emplace_back( CompressedChunk.GetCompressed().Flatten().AsIoBuffer()); WriteRawHashes.emplace_back(ChunkHash); PotentialSize += WriteAttachmentBuffers.back().GetSize(); } } } } OffsetInBlock += ChunkCompressedSize; } if (!WriteAttachmentBuffers.empty()) { std::vector Results = Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (size_t Index = 0; Index < Results.size(); Index++) { const CidStore::InsertResult& Result = Results[Index]; if (Result.New) { Info.AttachmentBytesStored.fetch_add(WriteAttachmentBuffers[Index].GetSize()); Info.AttachmentsStored.fetch_add(1); UsedSize += WriteAttachmentBuffers[Index].GetSize(); } } Info.ChunksCompleteCount += WriteAttachmentBuffers.size(); if (UsedSize < BlockPartSize) { ZEN_DEBUG( "Used {} (skipping {}) out of {} for block {} range {}, {} ({} %) (use of matching " "{}%)", NiceBytes(UsedSize), NiceBytes(BlockPartSize - UsedSize), NiceBytes(BlockPartSize), BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength, (100 * UsedSize) / BlockPartSize, PotentialSize > 0 ? (UsedSize * 100) / PotentialSize : 0); } } } } catch (const RemoteStoreError&) { throw; } catch (const std::exception& Ex) { throw RemoteStoreError(fmt::format("Failed saving {} ranges from block attachment {}", OffsetAndLengths.size(), BlockDescription.BlockHash), gsl::narrow(HttpResponseCode::InternalServerError), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }); if (!AbortFlag) { ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})", BlockRangeCount, BlockDescription.BlockHash, NiceTimeSpanMs(static_cast(DownloadElapsedSeconds * 1000)), NiceBytes(DownloadedBytes)); } } catch (const RemoteStoreError&) { throw; } catch (const std::exception& Ex) { throw RemoteStoreError(fmt::format("Failed to download block attachment {} ranges", BlockDescription.BlockHash), gsl::narrow(HttpResponseCode::InternalServerError), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; void DownloadAndSaveAttachment(LoadOplogContext& Context, ParallelWork& AttachmentWork, DownloadInfo& Info, Stopwatch& LoadAttachmentsTimer, std::atomic_uint64_t& DownloadStartMS, const IoHash& RawHash) { AttachmentWork.ScheduleWork( Context.NetworkWorkerPool, [&Context, &AttachmentWork, RawHash, &LoadAttachmentsTimer, &DownloadStartMS, &Info](std::atomic& AbortFlag) { ZEN_TRACE_CPU("DownloadAttachment"); ZEN_SCOPED_LOG(Context.Log); if (AbortFlag) { return; } try { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); IoBuffer BlobBuffer; if (Context.OptionalCache) { BlobBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, RawHash); } if (!BlobBuffer) { RemoteProjectStore::LoadAttachmentResult AttachmentResult = Context.RemoteStore.LoadAttachment(RawHash); if (AttachmentResult.ErrorCode) { ReportMessage(Context.OptionalJobContext, fmt::format("Failed to download large attachment {}: '{}', error code: {}", RawHash, AttachmentResult.Reason, AttachmentResult.ErrorCode)); Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { throw RemoteStoreError(AttachmentResult.Reason, AttachmentResult.ErrorCode, AttachmentResult.Text); } return; } ZEN_ASSERT(AttachmentResult.Bytes); BlobBuffer = std::move(AttachmentResult.Bytes); ZEN_DEBUG("Loaded large attachment '{}' in {} ({})", RawHash, NiceTimeSpanMs(static_cast(AttachmentResult.ElapsedSeconds * 1000)), NiceBytes(BlobBuffer.GetSize())); if (Context.OptionalCache && Context.PopulateCache) { Context.OptionalCache->PutBuildBlob(Context.CacheBuildId, RawHash, BlobBuffer.GetContentType(), CompositeBuffer(SharedBuffer(BlobBuffer))); } } if (AbortFlag) { return; } uint64_t AttachmentSize = BlobBuffer.GetSize(); Info.AttachmentsDownloaded.fetch_add(1); Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); ZEN_ASSERT(BlobBuffer); AttachmentWork.ScheduleWork( Context.WorkerPool, [&Context, &Info, RawHash, AttachmentSize, Bytes = std::move(BlobBuffer)](std::atomic& AbortFlag) { ZEN_TRACE_CPU("WriteAttachment"); ZEN_ASSERT(Bytes); if (AbortFlag) { return; } CidStore::InsertResult InsertResult = Context.ChunkStore.AddChunk(Bytes, RawHash); if (InsertResult.New) { Info.AttachmentBytesStored.fetch_add(AttachmentSize); Info.AttachmentsStored.fetch_add(1); } Info.ChunksCompleteCount++; }, WorkerThreadPool::EMode::EnableBacklog); } catch (const RemoteStoreError&) { throw; } catch (const std::exception& Ex) { throw RemoteStoreError(fmt::format("Loading attachment {} failed", RawHash), gsl::narrow(HttpResponseCode::InternalServerError), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; void AsyncCreateBlock(LoggerRef InLog, ParallelWork& Work, WorkerThreadPool& WorkerPool, std::vector>&& ChunksInBlock, RwLock& SectionsLock, std::vector& Blocks, size_t BlockIndex, const std::function& AsyncOnBlock, JobContext* OptionalContext) { Work.ScheduleWork(WorkerPool, [InLog, &Blocks, &SectionsLock, BlockIndex, Chunks = std::move(ChunksInBlock), &AsyncOnBlock, OptionalContext]( std::atomic& AbortFlag) mutable { ZEN_TRACE_CPU("CreateBlock"); ZEN_SCOPED_LOG(InLog); if (remotestore_impl::IsCancelled(OptionalContext)) { AbortFlag.store(true); } if (AbortFlag) { return; } size_t ChunkCount = Chunks.size(); ZEN_ASSERT(ChunkCount > 0); Stopwatch Timer; ChunkBlockDescription Block; CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block); uint64_t BlockSize = CompressedBlock.GetCompressedSize(); { // We can share the lock as we are not resizing the vector and only touch our own index RwLock::SharedLockScope __(SectionsLock); Blocks[BlockIndex] = Block; } AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); ZEN_INFO("Generated block with {} attachments in {} ({})", ChunkCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs()), NiceBytes(BlockSize)); }); } struct UploadInfo { uint64_t OplogSizeBytes = 0; std::atomic AttachmentsUploaded = 0; std::atomic AttachmentBlocksUploaded = 0; std::atomic AttachmentBytesUploaded = 0; std::atomic AttachmentBlockBytesUploaded = 0; }; struct CreatedBlock { CompositeBuffer Payload; ChunkBlockDescription Block; }; void UploadAttachments(WorkerThreadPool& WorkerPool, CidStore& ChunkStore, RemoteProjectStore& RemoteStore, const std::unordered_set& LargeAttachments, const std::vector>>& BlockChunks, const std::unordered_map& CreatedBlocks, const tsl::robin_map& LooseFileAttachments, const std::unordered_set& Needs, bool ForceAll, UploadInfo& Info, JobContext* OptionalContext) { using namespace std::literals; if (Needs.empty() && !ForceAll) { return; } ReportMessage(OptionalContext, "Filtering needed attachments for upload..."); std::unordered_set AttachmentsToUpload; std::unordered_map BulkBlockAttachmentsToUpload; size_t BlockAttachmentCountToUpload = 0; size_t LargeAttachmentCountToUpload = 0; size_t BulkAttachmentCountToUpload = 0; AttachmentsToUpload.reserve(ForceAll ? CreatedBlocks.size() + LargeAttachments.size() : Needs.size()); std::unordered_set UnknownAttachments(Needs); for (const auto& CreatedBlock : CreatedBlocks) { if (ForceAll || Needs.contains(CreatedBlock.first)) { AttachmentsToUpload.insert(CreatedBlock.first); BlockAttachmentCountToUpload++; UnknownAttachments.erase(CreatedBlock.first); } } for (const IoHash& LargeAttachment : LargeAttachments) { if (ForceAll || Needs.contains(LargeAttachment)) { AttachmentsToUpload.insert(LargeAttachment); LargeAttachmentCountToUpload++; UnknownAttachments.erase(LargeAttachment); } } for (const std::vector>& BlockHashes : BlockChunks) { for (const std::pair& Chunk : BlockHashes) { if (ForceAll || Needs.contains(Chunk.first)) { BulkBlockAttachmentsToUpload.insert(std::make_pair(Chunk.first, Chunk.second)); BulkAttachmentCountToUpload++; UnknownAttachments.erase(Chunk.first); } } } if (AttachmentsToUpload.empty() && BulkBlockAttachmentsToUpload.empty()) { ReportMessage(OptionalContext, "No attachments needed"); return; } if (!UnknownAttachments.empty()) { throw RemoteStoreError( fmt::format("Upload requested of {} missing attachments, the base container referenced blocks that are no longer available", UnknownAttachments.size()), gsl::narrow(HttpResponseCode::NotFound), ""); } if (IsCancelled(OptionalContext)) { return; } ReportMessage(OptionalContext, fmt::format("Saving {} attachments ({} blocks, {} attachments, {} bulk attachments)", AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), BlockAttachmentCountToUpload, LargeAttachmentCountToUpload, BulkAttachmentCountToUpload)); Stopwatch Timer; std::atomic AbortFlag(false); std::atomic PauseFlag(false); ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); ptrdiff_t AttachmentsToSave(0); for (const IoHash& RawHash : AttachmentsToUpload) { if (AbortFlag.load()) { break; } AttachmentsToSave++; Work.ScheduleWork( WorkerPool, [&ChunkStore, &RemoteStore, RawHash, &CreatedBlocks, &LooseFileAttachments, &Info, OptionalContext]( std::atomic& AbortFlag) { ZEN_TRACE_CPU("UploadAttachment"); if (AbortFlag.load()) { return; } CompositeBuffer Payload; ChunkBlockDescription Block; if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) { Payload = BlockIt->second.Payload; Block = BlockIt->second.Block; } else if (auto LooseTmpFileIt = LooseFileAttachments.find(RawHash); LooseTmpFileIt != LooseFileAttachments.end()) { Payload = LooseTmpFileIt->second(RawHash); } else { Payload = CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash))); } if (!Payload) { throw RemoteStoreError(fmt::format("Failed to find attachment {}", RawHash), gsl::narrow(HttpResponseCode::NotFound), {}); } const bool IsBlock = Block.BlockHash == RawHash; size_t PayloadSize = Payload.GetSize(); RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(std::move(Payload), RawHash, std::move(Block)); if (Result.ErrorCode) { throw RemoteStoreError(fmt::format("Failed to save attachment '{}', {}", RawHash, NiceBytes(PayloadSize)), Result.ErrorCode, Result.Text); } if (IsBlock) { Info.AttachmentBlocksUploaded.fetch_add(1); Info.AttachmentBlockBytesUploaded.fetch_add(PayloadSize); ZEN_INFO("Saved block attachment '{}' in {} ({})", RawHash, NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000)), NiceBytes(PayloadSize)); } else { Info.AttachmentsUploaded.fetch_add(1); Info.AttachmentBytesUploaded.fetch_add(PayloadSize); ZEN_INFO("Saved large attachment '{}' in {} ({})", RawHash, NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000)), NiceBytes(PayloadSize)); } }); } if (IsCancelled(OptionalContext)) { AbortFlag = true; } if (!BulkBlockAttachmentsToUpload.empty()) { for (const std::vector>& Chunks : BlockChunks) { if (AbortFlag.load()) { break; } std::vector NeededChunks; NeededChunks.reserve(Chunks.size()); for (const std::pair& Chunk : Chunks) { const IoHash& ChunkHash = Chunk.first; if (BulkBlockAttachmentsToUpload.contains(ChunkHash) && !AttachmentsToUpload.contains(ChunkHash)) { NeededChunks.push_back(Chunk.first); } } if (NeededChunks.empty()) { continue; } AttachmentsToSave++; Work.ScheduleWork( WorkerPool, [&RemoteStore, &ChunkStore, NeededChunks = std::move(NeededChunks), &BulkBlockAttachmentsToUpload, &Info, OptionalContext](std::atomic& AbortFlag) { ZEN_TRACE_CPU("UploadChunk"); if (AbortFlag.load()) { return; } size_t ChunksSize = 0; std::vector ChunkBuffers; ChunkBuffers.reserve(NeededChunks.size()); for (const IoHash& Chunk : NeededChunks) { auto It = BulkBlockAttachmentsToUpload.find(Chunk); ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); CompositeBuffer ChunkPayload = It->second(It->first).second; if (!ChunkPayload) { throw RemoteStoreError(fmt::format("Missing chunk {}"sv, Chunk), static_cast(HttpResponseCode::NotFound), fmt::format("Unable to fetch attachment {} required by the oplog"sv, Chunk)); } ChunksSize += ChunkPayload.GetSize(); ChunkBuffers.emplace_back(SharedBuffer(ChunkPayload.Flatten().AsIoBuffer())); } RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); if (Result.ErrorCode) { throw RemoteStoreError(fmt::format("Failed to save attachments with {} chunks", NeededChunks.size()), Result.ErrorCode, Result.Text); } Info.AttachmentsUploaded.fetch_add(ChunkBuffers.size()); Info.AttachmentBytesUploaded.fetch_add(ChunksSize); ZEN_INFO("Saved {} bulk attachments in {} ({})", NeededChunks.size(), NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000)), NiceBytes(ChunksSize)); }); } } Stopwatch SaveAttachmentsProgressTimer; Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t Remaining) { ZEN_UNUSED(IsAborted, IsPaused); if (IsCancelled(OptionalContext) && !AbortFlag.load()) { AbortFlag = true; } uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs(); ReportProgress(OptionalContext, "Saving attachments"sv, fmt::format("{} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, PartialTransferWallTimeMS)), AttachmentsToSave, Remaining, SaveAttachmentsProgressTimer.GetElapsedTimeMs()); }); uint64_t ElapsedTimeMS = Timer.GetElapsedTimeMs(); if (AttachmentsToSave > 0) { ReportProgress(OptionalContext, "Saving attachments"sv, fmt::format("{}", GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS)), AttachmentsToSave, 0, SaveAttachmentsProgressTimer.GetElapsedTimeMs()); } ReportMessage(OptionalContext, fmt::format("Saved {} attachments ({} blocks, {} attachments, {} bulk attachments) in {} {}", AttachmentsToUpload.size() + BulkBlockAttachmentsToUpload.size(), BlockAttachmentCountToUpload, LargeAttachmentCountToUpload, BulkAttachmentCountToUpload, NiceTimeSpanMs(ElapsedTimeMS), GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS))); } } // namespace remotestore_impl std::vector GetBlockHashesFromOplog(CbObjectView ContainerObject) { using namespace std::literals; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); std::vector BlockHashes; BlockHashes.reserve(BlocksArray.Num()); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); BlockHashes.push_back(BlockHash); } return BlockHashes; } std::vector GetBlocksFromOplog(CbObjectView ContainerObject, std::span IncludeBlockHashes) { using namespace std::literals; std::vector Result; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); tsl::robin_set IncludeSet; IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end()); Result.reserve(IncludeBlockHashes.size()); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); if (IncludeSet.contains(BlockHash)) { std::vector ChunkHashes; CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); if (BlockHash == IoHash::Zero) { continue; } ChunkHashes.reserve(ChunksArray.Num()); for (CbFieldView ChunkField : ChunksArray) { ChunkHashes.push_back(ChunkField.AsHash()); } Result.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)}); } } return Result; } CbObject BuildContainer(LoggerRef InLog, CidStore& ChunkStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, size_t MaxBlockSize, size_t MaxChunksPerBlock, size_t MaxChunkEmbedSize, size_t ChunkFileSizeLimit, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, const std::vector& KnownBlocks, WorkerThreadPool& WorkerPool, const std::function& AsyncOnBlock, const std::function& OnLargeAttachment, const std::function>&&)>& OnBlockChunks, bool EmbedLooseFiles, JobContext* OptionalContext) { using namespace std::literals; ZEN_SCOPED_LOG(InLog); remotestore_impl::JobContextLogger JobContextOutput(OptionalContext); Stopwatch Timer; CbObject OplogContainerObject; CompressedBuffer CompressedOpsSection; std::unordered_map UploadAttachments; std::filesystem::path AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); size_t TotalOpCount = Oplog.GetOplogEntryCount(); Stopwatch RewriteOplogTimer; CbObject SectionOps = remotestore_impl::RewriteOplog(InLog, Project, Oplog, IgnoreMissingAttachments, EmbedLooseFiles, AttachmentTempPath, UploadAttachments, OptionalContext); remotestore_impl::ReportMessage(OptionalContext, fmt::format("Rewrote {} ops to new oplog in {}", TotalOpCount, NiceTimeSpanMs(static_cast(RewriteOplogTimer.GetElapsedTimeMs())))); { Stopwatch CompressOpsTimer; CompressedOpsSection = CompressedBuffer::Compress(SectionOps.GetBuffer(), OodleCompressor::Mermaid, OodleCompressionLevel::Fast); remotestore_impl::ReportMessage(OptionalContext, fmt::format("Compressed oplog section {} ({} -> {}) in {}", CompressedOpsSection.DecodeRawHash(), NiceBytes(CompressedOpsSection.DecodeRawSize()), NiceBytes(CompressedOpsSection.GetCompressedSize()), NiceTimeSpanMs(static_cast(CompressOpsTimer.GetElapsedTimeMs())))); } if (remotestore_impl::IsCancelled(OptionalContext)) { return {}; } std::unordered_set FoundHashes; FoundHashes.reserve(UploadAttachments.size()); for (const auto& It : UploadAttachments) { FoundHashes.insert(It.first); } std::unordered_set MissingHashes; std::vector AttachmentsToChunk; remotestore_impl::FindChunkSizes(ChunkStore, WorkerPool, MaxChunkEmbedSize, ChunkFileSizeLimit, AllowChunking, AttachmentTempPath, UploadAttachments, MissingHashes, AttachmentsToChunk, OptionalContext); if (remotestore_impl::IsCancelled(OptionalContext)) { return {}; } for (const IoHash& AttachmentHash : MissingHashes) { auto It = UploadAttachments.find(AttachmentHash); ZEN_ASSERT(It != UploadAttachments.end()); std::optional Op = Oplog.GetOpByKey(It->second.Key); ZEN_ASSERT(Op.has_value()); if (IgnoreMissingAttachments) { remotestore_impl::ReportMessage(OptionalContext, fmt::format("Missing attachment '{}' for op '{}'", AttachmentHash, It->second.Key)); } else { ExtendableStringBuilder<1024> Sb; Sb.Append("Failed to find attachment '"); Sb.Append(AttachmentHash.ToHexString()); Sb.Append("' for op: \n"); Op.value().ToJson(Sb); throw std::runtime_error(Sb.ToString()); } UploadAttachments.erase(AttachmentHash); } std::vector ChunkedFiles = ChunkAttachments(WorkerPool, AttachmentsToChunk, OptionalContext); if (remotestore_impl::IsCancelled(OptionalContext)) { return {}; } for (const remotestore_impl::ChunkedFile& Chunked : ChunkedFiles) { UploadAttachments.erase(Chunked.Chunked.Info.RawHash); for (const IoHash& ChunkHash : Chunked.Chunked.Info.ChunkHashes) { UploadAttachments.erase(ChunkHash); } } size_t ChunkedChunkCount = std::accumulate( ChunkedFiles.begin(), ChunkedFiles.end(), size_t(0), [](size_t Current, const remotestore_impl::ChunkedFile& Value) { return Current + Value.Chunked.Info.ChunkHashes.size(); }); size_t ReusedAttachmentCount = 0; std::vector ReusedBlockIndexes; { std::unordered_set UniqueChunkHashes; UniqueChunkHashes.reserve(FoundHashes.size() + ChunkedChunkCount); UniqueChunkHashes.insert(FoundHashes.begin(), FoundHashes.end()); for (remotestore_impl::ChunkedFile& Chunked : ChunkedFiles) { UniqueChunkHashes.insert(Chunked.Chunked.Info.ChunkHashes.begin(), Chunked.Chunked.Info.ChunkHashes.end()); } std::vector ChunkHashes(UniqueChunkHashes.begin(), UniqueChunkHashes.end()); std::vector ChunkIndexes; ChunkIndexes.resize(ChunkHashes.size()); std::iota(ChunkIndexes.begin(), ChunkIndexes.end(), 0); std::vector UnusedChunkIndexes; ReuseBlocksStatistics ReuseBlocksStats; ReusedBlockIndexes = FindReuseBlocks(JobContextOutput.Log(), /*BlockReuseMinPercentLimit*/ 80, /*IsVerbose*/ false, ReuseBlocksStats, KnownBlocks, ChunkHashes, ChunkIndexes, UnusedChunkIndexes); for (size_t KnownBlockIndex : ReusedBlockIndexes) { const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) { if (UploadAttachments.erase(KnownHash) == 1) { ReusedAttachmentCount++; } } } } std::unordered_map LargeChunkAttachments; std::unordered_map, IoHash::Hasher> LooseUploadAttachments; if (UploadAttachments.empty()) { if (ReusedAttachmentCount != 0) { remotestore_impl::ReportMessage( OptionalContext, fmt::format("Found all {} attachments from {} ops in existing blocks", ReusedAttachmentCount, TotalOpCount)); } } else { const size_t TotalAttachmentCount = UploadAttachments.size() + ReusedAttachmentCount; remotestore_impl::ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops ({} ({:.1f}%) found in existing blocks)", UploadAttachments.size(), TotalOpCount, ReusedAttachmentCount, (100.f * ReusedAttachmentCount) / TotalAttachmentCount)); ResolveAttachments(ChunkStore, WorkerPool, MaxChunkEmbedSize, AttachmentTempPath, UploadAttachments, LargeChunkAttachments, LooseUploadAttachments, OptionalContext); if (remotestore_impl::IsCancelled(OptionalContext)) { return {}; } } std::unordered_set LargeChunkHashes; for (auto& It : LargeChunkAttachments) { UploadAttachments.erase(It.first); LargeChunkHashes.insert(It.first); OnLargeAttachment(It.first, std::move(It.second)); } RwLock BlocksLock; std::vector Blocks; std::vector> SortedUploadAttachments; SortedUploadAttachments.reserve(UploadAttachments.size()); for (const auto& It : UploadAttachments) { SortedUploadAttachments.push_back(std::make_pair(It.first, It.second.Key)); } if (remotestore_impl::IsCancelled(OptionalContext)) { return {}; } remotestore_impl::ReportMessage(OptionalContext, fmt::format("Sorting {} attachments from {} ops", SortedUploadAttachments.size(), TotalOpCount)); // Sort attachments so we get predictable blocks for the same oplog upload std::sort(SortedUploadAttachments.begin(), SortedUploadAttachments.end(), [](const std::pair& Lhs, const std::pair& Rhs) { if (Lhs.second == Rhs.second) { // Same key, sort by raw hash return Lhs.first < Rhs.first; } // Sort by key return Lhs.second < Rhs.second; }); std::vector ChunkedFilesOrder; ChunkedFilesOrder.reserve(ChunkedFiles.size()); for (size_t Index = 0; Index < ChunkedFiles.size(); Index++) { ChunkedFilesOrder.push_back(Index); } std::sort(ChunkedFilesOrder.begin(), ChunkedFilesOrder.end(), [&ChunkedFiles](size_t Lhs, size_t Rhs) { return ChunkedFiles[Lhs].Chunked.Info.RawHash < ChunkedFiles[Rhs].Chunked.Info.RawHash; }); if (remotestore_impl::IsCancelled(OptionalContext)) { return {}; } remotestore_impl::ReportMessage( OptionalContext, fmt::format("Assembling {} attachments and {} chunked parts from {} ops into blocks and loose attachments", SortedUploadAttachments.size(), ChunkedChunkCount, TotalOpCount)); if (remotestore_impl::IsCancelled(OptionalContext)) { return {}; } size_t ChunkAssembleCount = SortedUploadAttachments.size() + ChunkedChunkCount; size_t ChunksAssembled = 0; remotestore_impl::ReportMessage(OptionalContext, fmt::format("Assembling {} attachments from {} ops into blocks", ChunkAssembleCount, TotalOpCount)); std::atomic AbortFlag(false); std::atomic PauseFlag(false); ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); uint32_t ComposedBlocks = 0; uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs(); { Stopwatch BlockCreateProgressTimer; remotestore_impl::BlockComposer Composer(remotestore_impl::BlockComposer::Configuration{ .MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = MaxChunksPerBlock, .MaxChunkEmbedSize = MaxChunkEmbedSize, .IsCancelledFunc = [OptionalContext]() { return remotestore_impl::IsCancelled(OptionalContext); }}); auto OnNewBlock = [&Log, &Work, &WorkerPool, BuildBlocks, &BlockCreateProgressTimer, &BlocksLock, &Blocks, &AsyncOnBlock, &OnBlockChunks, ChunkAssembleCount, &ChunksAssembled, &ComposedBlocks, OptionalContext](std::vector&& ChunkRawHashes, const std::function& FetchAttachmentResolver) { size_t ChunkCount = ChunkRawHashes.size(); std::vector> ChunksInBlock; ChunksInBlock.reserve(ChunkCount); for (const IoHash& AttachmentHash : ChunkRawHashes) { ChunksInBlock.emplace_back(std::make_pair(AttachmentHash, FetchAttachmentResolver(AttachmentHash))); } size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); if (BuildBlocks) { remotestore_impl::AsyncCreateBlock(Log(), Work, WorkerPool, std::move(ChunksInBlock), BlocksLock, Blocks, BlockIndex, AsyncOnBlock, OptionalContext); } else { ZEN_INFO("Bulk group {} attachments", ChunkCount); // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope _(BlocksLock); Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes); OnBlockChunks(std::move(ChunksInBlock)); } ChunksAssembled += ChunkCount; ComposedBlocks++; if (ChunksAssembled % 1000 == 0) { remotestore_impl::ReportProgress( OptionalContext, "Assembling blocks"sv, fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), ChunkAssembleCount, ChunkAssembleCount - ChunksAssembled, BlockCreateProgressTimer.GetElapsedTimeMs()); } }; { std::vector AttachmentHashes; AttachmentHashes.reserve(SortedUploadAttachments.size()); std::vector AttachmentSizes; AttachmentSizes.reserve(SortedUploadAttachments.size()); std::vector AttachmentKeys; AttachmentKeys.reserve(SortedUploadAttachments.size()); for (const std::pair& Attachment : SortedUploadAttachments) { AttachmentHashes.push_back(Attachment.first); if (auto It = UploadAttachments.find(Attachment.first); It != UploadAttachments.end()) { AttachmentSizes.push_back(It->second.Size); } else { throw std::runtime_error( fmt::format("Attachment to upload state inconsistent, could not find attachment {}", Attachment.first)); } AttachmentKeys.push_back(Attachment.second); } auto FetchWholeAttachmentResolver = [&LooseUploadAttachments, &ChunkStore](const IoHash& AttachmentHash) -> FetchChunkFunc { if (auto It = LooseUploadAttachments.find(AttachmentHash); It != LooseUploadAttachments.end()) { uint64_t RawSize = It->second.first; IoBuffer Payload = std::move(It->second.second); return [RawSize, Payload = std::move(Payload)](const IoHash& ChunkHash) mutable -> std::pair { ZEN_UNUSED(ChunkHash); return {RawSize, CompositeBuffer(SharedBuffer(std::move(Payload)))}; }; } else { return [&ChunkStore](const IoHash& RawHash) -> std::pair { IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash); if (!Chunk) { throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash)); } // These are small chunks - make memory resident Chunk = IoBufferBuilder::ReadFromFileMaybe(Chunk); IoHash ValidateRawHash; uint64_t RawSize = 0; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), ValidateRawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (not a compressed buffer)", RawHash)); } if (RawHash != ValidateRawHash) { throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash)); } return {RawSize, Compressed.GetCompressed()}; }; } }; Composer.Compose(AttachmentHashes, AttachmentSizes, AttachmentKeys, [&OnNewBlock, &FetchWholeAttachmentResolver](std::vector&& ChunkRawHashes) { OnNewBlock(std::move(ChunkRawHashes), FetchWholeAttachmentResolver); }); } { std::vector AttachmentHashes; AttachmentHashes.reserve(ChunkedChunkCount); std::vector AttachmentSizes; AttachmentSizes.reserve(ChunkedChunkCount); std::vector AttachmentKeys; AttachmentKeys.reserve(ChunkedChunkCount); tsl::robin_map, IoHash::Hasher> ChunkHashToChunkFileIndexAndChunkIndex; for (size_t ChunkedFileIndex : ChunkedFilesOrder) { const remotestore_impl::ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; const ChunkedInfoWithSource& Chunked = ChunkedFile.Chunked; size_t ChunkCount = Chunked.Info.ChunkHashes.size(); Oid ChunkedFileOid = Oid::NewOid(); for (size_t ChunkIndex = 0; ChunkIndex < ChunkCount; ChunkIndex++) { const IoHash& ChunkHash = Chunked.Info.ChunkHashes[ChunkIndex]; uint64_t ChunkSize = Chunked.ChunkSources[ChunkIndex].Size; { if (ChunkHashToChunkFileIndexAndChunkIndex .insert(std::make_pair(ChunkHash, std::make_pair(ChunkedFileIndex, ChunkIndex))) .second) { if (ChunkSize > MaxChunkEmbedSize) { OnLargeAttachment(ChunkHash, [SourceBuffer = ChunkedFile.Source, ChunkSource = Chunked.ChunkSources[ChunkIndex], ChunkHash](const IoHash& RawHash) -> CompositeBuffer { ZEN_ASSERT(RawHash == ChunkHash); CompressedBuffer Compressed = CompressedBuffer::Compress( SharedBuffer(IoBuffer(SourceBuffer, ChunkSource.Offset, ChunkSource.Size)), OodleCompressor::Mermaid, OodleCompressionLevel::None); return Compressed.GetCompressed(); }); LargeChunkHashes.insert(ChunkHash); } else { AttachmentHashes.push_back(ChunkHash); AttachmentSizes.push_back(ChunkSize); AttachmentKeys.push_back(ChunkedFileOid); } } } } } auto ChunkedFileAttachmentResolver = [&ChunkHashToChunkFileIndexAndChunkIndex, &ChunkedFiles](const IoHash& AttachmentHash) -> FetchChunkFunc { if (auto It = ChunkHashToChunkFileIndexAndChunkIndex.find(AttachmentHash); It != ChunkHashToChunkFileIndexAndChunkIndex.end()) { const std::pair& ChunkFileIndexAndChunkIndex = It->second; size_t ChunkedFileIndex = ChunkFileIndexAndChunkIndex.first; size_t ChunkIndex = ChunkFileIndexAndChunkIndex.second; const remotestore_impl::ChunkedFile& ChunkedFile = ChunkedFiles[ChunkedFileIndex]; const ChunkSource& Source = ChunkedFile.Chunked.ChunkSources[ChunkIndex]; ZEN_ASSERT(Source.Offset + Source.Size <= ChunkedFile.Source.GetSize()); return [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( const IoHash&) -> std::pair { return {Size, CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), OodleCompressor::Mermaid, OodleCompressionLevel::None) .GetCompressed()}; }; } else { ZEN_ASSERT(false); } }; Composer.Compose(AttachmentHashes, AttachmentSizes, AttachmentKeys, [&OnNewBlock, &ChunkedFileAttachmentResolver](std::vector&& ChunkRawHashes) { OnNewBlock(std::move(ChunkRawHashes), ChunkedFileAttachmentResolver); }); } if (remotestore_impl::IsCancelled(OptionalContext)) { Work.Abort(); } Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, IsPaused); if (remotestore_impl::IsCancelled(OptionalContext)) { AbortFlag.store(true); } remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, fmt::format("{}{} remaining...", AbortFlag.load() ? "Aborting, " : "", PendingWork), ComposedBlocks, PendingWork, BlockCreateProgressTimer.GetElapsedTimeMs()); }); if (!AbortFlag.load() && ComposedBlocks > 0) { remotestore_impl::ReportProgress(OptionalContext, "Creating blocks"sv, ""sv, ComposedBlocks, 0, BlockCreateProgressTimer.GetElapsedTimeMs()); uint64_t NowMS = Timer.GetElapsedTimeMs(); remotestore_impl::ReportMessage( OptionalContext, fmt::format("Created {} blocks in {}", ComposedBlocks, NiceTimeSpanMs(NowMS - CreateBlocksStartMS))); } } if (remotestore_impl::IsCancelled(OptionalContext)) { return {}; } // Reused blocks were not composed (their chunks were erased from UploadAttachments) but must // still appear in the container so that a fresh receiver knows to download them. if (BuildBlocks) { for (size_t KnownBlockIndex : ReusedBlockIndexes) { const ChunkBlockDescription& Reused = KnownBlocks[KnownBlockIndex]; Blocks.push_back(Reused); } } CbObjectWriter OplogContainerWriter; RwLock::SharedLockScope _(BlocksLock); OplogContainerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); OplogContainerWriter.BeginArray("blocks"sv); { for (const ChunkBlockDescription& B : Blocks) { ZEN_ASSERT(!B.ChunkRawHashes.empty()); if (BuildBlocks) { ZEN_ASSERT(B.BlockHash != IoHash::Zero); OplogContainerWriter.BeginObject(); { OplogContainerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); OplogContainerWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : B.ChunkRawHashes) { OplogContainerWriter.AddHash(RawHash); } } OplogContainerWriter.EndArray(); // "chunks" } OplogContainerWriter.EndObject(); continue; } ZEN_ASSERT(B.BlockHash == IoHash::Zero); OplogContainerWriter.BeginObject(); { OplogContainerWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : B.ChunkRawHashes) { OplogContainerWriter.AddBinaryAttachment(RawHash); } } OplogContainerWriter.EndArray(); } OplogContainerWriter.EndObject(); } } OplogContainerWriter.EndArray(); // "blocks"sv OplogContainerWriter.BeginArray("chunkedfiles"sv); { for (const remotestore_impl::ChunkedFile& F : ChunkedFiles) { OplogContainerWriter.BeginObject(); { OplogContainerWriter.AddHash("rawhash"sv, F.Chunked.Info.RawHash); OplogContainerWriter.AddInteger("rawsize"sv, F.Chunked.Info.RawSize); OplogContainerWriter.BeginArray("chunks"sv); { for (const IoHash& RawHash : F.Chunked.Info.ChunkHashes) { OplogContainerWriter.AddHash(RawHash); } } OplogContainerWriter.EndArray(); // "chunks" OplogContainerWriter.BeginArray("sequence"sv); { for (uint32_t ChunkIndex : F.Chunked.Info.ChunkSequence) { OplogContainerWriter.AddInteger(ChunkIndex); } } OplogContainerWriter.EndArray(); // "sequence" } OplogContainerWriter.EndObject(); } } OplogContainerWriter.EndArray(); // "chunkedfiles"sv OplogContainerWriter.BeginArray("chunks"sv); { for (const IoHash& AttachmentHash : LargeChunkHashes) { OplogContainerWriter.AddBinaryAttachment(AttachmentHash); } } OplogContainerWriter.EndArray(); // "chunks" OplogContainerObject = OplogContainerWriter.Save(); return OplogContainerObject; } CbObject BuildContainer(LoggerRef InLog, CidStore& ChunkStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, WorkerThreadPool& WorkerPool, size_t MaxBlockSize, size_t MaxChunksPerBlock, size_t MaxChunkEmbedSize, size_t ChunkFileSizeLimit, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, const std::function& AsyncOnBlock, const std::function& OnLargeAttachment, const std::function>&&)>& OnBlockChunks, bool EmbedLooseFiles) { return BuildContainer(InLog, ChunkStore, Project, Oplog, MaxBlockSize, MaxChunksPerBlock, MaxChunkEmbedSize, ChunkFileSizeLimit, BuildBlocks, IgnoreMissingAttachments, AllowChunking, {}, WorkerPool, AsyncOnBlock, OnLargeAttachment, OnBlockChunks, EmbedLooseFiles, /*OptionalContext*/ nullptr); } void SaveOplog(LoggerRef InLog, CidStore& ChunkStore, RemoteProjectStore& RemoteStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, WorkerThreadPool& NetworkWorkerPool, WorkerThreadPool& WorkerPool, size_t MaxBlockSize, size_t MaxChunksPerBlock, size_t MaxChunkEmbedSize, size_t ChunkFileSizeLimit, bool EmbedLooseFiles, bool ForceUpload, bool IgnoreMissingAttachments, JobContext* OptionalContext) { using namespace std::literals; ZEN_SCOPED_LOG(InLog); Stopwatch Timer; remotestore_impl::UploadInfo Info; const RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); std::filesystem::path AttachmentTempPath; if (RemoteStoreInfo.UseTempBlockFiles) { AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); CreateDirectories(AttachmentTempPath); } RwLock AttachmentsLock; std::unordered_set LargeAttachments; std::unordered_map CreatedBlocks; tsl::robin_map LooseLargeFiles; auto MakeTempBlock = [&Log, AttachmentTempPath, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, ChunkBlockDescription&& Block) { std::filesystem::path BlockPath = AttachmentTempPath; BlockPath.append(Block.BlockHash.ToHexString()); IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath); const uint64_t BlockSize = BlockBuffer.GetSize(); RwLock::ExclusiveLockScope __(AttachmentsLock); CreatedBlocks.insert( {Block.BlockHash, {.Payload = CompositeBuffer(SharedBuffer(std::move(BlockBuffer))), .Block = std::move(Block)}}); ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockSize)); }; auto UploadBlock = [&Log, &RemoteStore, &RemoteStoreInfo, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, ChunkBlockDescription&& Block) { IoHash BlockHash = Block.BlockHash; uint64_t CompressedSize = CompressedBlock.GetCompressedSize(); RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block)); if (Result.ErrorCode) { throw RemoteStoreError(fmt::format("Failed to save block attachment {} for oplog '{}': {}", BlockHash, RemoteStoreInfo.ContainerName, Result.Reason), Result.ErrorCode, Result.Text); } Info.AttachmentBlocksUploaded.fetch_add(1); Info.AttachmentBlockBytesUploaded.fetch_add(CompressedSize); ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedSize)); }; std::vector>> BlockChunks; auto OnBlockChunks = [&Log, &BlockChunks](std::vector>&& Chunks) { BlockChunks.push_back({std::make_move_iterator(Chunks.begin()), std::make_move_iterator(Chunks.end())}); ZEN_DEBUG("Found {} block chunks", Chunks.size()); }; auto OnLargeAttachment = [&Log, &AttachmentsLock, &LargeAttachments, &LooseLargeFiles](const IoHash& AttachmentHash, TGetAttachmentBufferFunc&& GetBufferFunc) { { RwLock::ExclusiveLockScope _(AttachmentsLock); LargeAttachments.insert(AttachmentHash); LooseLargeFiles.insert_or_assign(AttachmentHash, std::move(GetBufferFunc)); } ZEN_DEBUG("Found attachment {}", AttachmentHash); }; std::function OnBlock; if (RemoteStoreInfo.UseTempBlockFiles) { OnBlock = MakeTempBlock; } else { OnBlock = UploadBlock; } std::vector KnownBlocks; uint64_t TransferWallTimeMS = 0; RemoteProjectStore::CreateContainerResult ContainerResult = RemoteStore.CreateContainer(); if (ContainerResult.ErrorCode) { throw RemoteStoreError( fmt::format("Failed to create container for oplog '{}': {}", RemoteStoreInfo.ContainerName, ContainerResult.Reason), ContainerResult.ErrorCode, ContainerResult.Text); } if (RemoteStoreInfo.CreateBlocks) { remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetching known blocks from '{}'", RemoteStoreInfo.Description)); Stopwatch GetKnownBlocksTimer; RemoteProjectStore::GetKnownBlocksResult KnownBlocksResult = RemoteStore.GetKnownBlocks(); TransferWallTimeMS += GetKnownBlocksTimer.GetElapsedTimeMs(); for (ChunkBlockDescription& BlockDescription : KnownBlocksResult.Blocks) { if (BlockDescription.ChunkRawLengths.empty()) { ZEN_ASSERT(BlockDescription.ChunkCompressedLengths.empty()); size_t ChunkCount = BlockDescription.ChunkRawHashes.size(); if (ChunkCount > 0) { // Fake sizes, will give usage number of number of chunks used rather than bytes used - better than nothing BlockDescription.ChunkRawLengths.resize(ChunkCount, 1); BlockDescription.ChunkCompressedLengths.resize(ChunkCount, 1); } } } if (KnownBlocksResult.ErrorCode == static_cast(HttpResponseCode::NoContent)) { remotestore_impl::ReportMessage(OptionalContext, fmt::format("No known blocks in '{}', uploading all attachments", RemoteStoreInfo.Description)); } else if (KnownBlocksResult.ErrorCode) { remotestore_impl::ReportMessage(OptionalContext, fmt::format("Failed to get known blocks from '{}' ({}): {}, uploading all attachments", RemoteStoreInfo.Description, KnownBlocksResult.ErrorCode, KnownBlocksResult.Reason)); } else { remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetched {} known blocks from '{}' in {}", KnownBlocksResult.Blocks.size(), RemoteStoreInfo.Description, NiceTimeSpanMs(static_cast(KnownBlocksResult.ElapsedSeconds * 1000.0)))); KnownBlocks = std::move(KnownBlocksResult.Blocks); } } CbObject OplogContainerObject = BuildContainer(InLog, ChunkStore, Project, Oplog, MaxBlockSize, MaxChunksPerBlock, MaxChunkEmbedSize, ChunkFileSizeLimit, RemoteStoreInfo.CreateBlocks, IgnoreMissingAttachments, RemoteStoreInfo.AllowChunking, KnownBlocks, WorkerPool, OnBlock, OnLargeAttachment, OnBlockChunks, EmbedLooseFiles, OptionalContext); if (remotestore_impl::IsCancelled(OptionalContext)) { return; } Info.OplogSizeBytes = OplogContainerObject.GetSize(); if (remotestore_impl::IsCancelled(OptionalContext)) { return; } uint64_t ChunkCount = OplogContainerObject["chunks"sv].AsArrayView().Num(); uint64_t BlockCount = OplogContainerObject["blocks"sv].AsArrayView().Num(); remotestore_impl::ReportMessage(OptionalContext, fmt::format("Saving oplog container '{}' with {} attachments and {} blocks...", RemoteStoreInfo.ContainerName, ChunkCount, BlockCount)); Stopwatch SaveContainerTimer; IoBuffer ContainerPayload = OplogContainerObject.GetBuffer().AsIoBuffer(); ContainerPayload.SetContentType(ZenContentType::kCbObject); RemoteProjectStore::SaveResult ContainerSaveResult = RemoteStore.SaveContainer(std::move(ContainerPayload)); TransferWallTimeMS += SaveContainerTimer.GetElapsedTimeMs(); if (ContainerSaveResult.ErrorCode) { throw RemoteStoreError( fmt::format("Failed to save oplog container for oplog '{}': {}", RemoteStoreInfo.ContainerName, ContainerSaveResult.Reason), ContainerSaveResult.ErrorCode, ContainerSaveResult.Text); } else { remotestore_impl::ReportMessage(OptionalContext, fmt::format("Saved container '{}' in {}", RemoteStoreInfo.ContainerName, NiceTimeSpanMs(static_cast(ContainerSaveResult.ElapsedSeconds * 1000.0)))); } { Stopwatch UploadAttachmentsTimer; UploadAttachments(NetworkWorkerPool, ChunkStore, RemoteStore, LargeAttachments, BlockChunks, CreatedBlocks, LooseLargeFiles, ContainerSaveResult.Needs, ForceUpload, Info, OptionalContext); TransferWallTimeMS += UploadAttachmentsTimer.GetElapsedTimeMs(); const uint32_t MaxTries = 8; uint32_t Try = 0; while (Try < MaxTries) { if (remotestore_impl::IsCancelled(OptionalContext)) { return; } remotestore_impl::ReportMessage(OptionalContext, "Finalizing oplog container..."); RemoteProjectStore::FinalizeResult ContainerFinalizeResult = RemoteStore.FinalizeContainer(ContainerSaveResult.RawHash); if (ContainerFinalizeResult.ErrorCode) { throw RemoteStoreError( fmt::format("Failed to finalize oplog container {}: {}", ContainerSaveResult.RawHash, ContainerFinalizeResult.Reason), ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Text); } remotestore_impl::ReportMessage( OptionalContext, fmt::format("Finalized container '{}' in {}", RemoteStoreInfo.ContainerName, NiceTimeSpanMs(static_cast(ContainerFinalizeResult.ElapsedSeconds * 1000.0)))); if (ContainerFinalizeResult.Needs.empty()) { break; } if (remotestore_impl::IsCancelled(OptionalContext)) { return; } Try++; if (Try == MaxTries) { throw std::runtime_error( fmt::format("Giving up finalize oplog container {} after {} retries, still getting reports of missing attachments", ContainerSaveResult.RawHash, Try)); } remotestore_impl::ReportMessage( OptionalContext, fmt::format("Finalize of container '{}' reported {} missing attachments. Uploading missing attachments. Try {}", RemoteStoreInfo.ContainerName, ContainerFinalizeResult.Needs.size(), Try)); Stopwatch RetryUploadAttachmentsTimer; UploadAttachments(NetworkWorkerPool, ChunkStore, RemoteStore, LargeAttachments, BlockChunks, CreatedBlocks, LooseLargeFiles, ContainerFinalizeResult.Needs, false, Info, OptionalContext); TransferWallTimeMS += RetryUploadAttachmentsTimer.GetElapsedTimeMs(); } } LooseLargeFiles.clear(); CreatedBlocks.clear(); remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats()); remotestore_impl::ReportMessage( OptionalContext, fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}", RemoteStoreInfo.ContainerName, "SUCCESS", NiceTimeSpanMs(Timer.GetElapsedTimeMs()), NiceBytes(Info.OplogSizeBytes), Info.AttachmentBlocksUploaded.load(), NiceBytes(Info.AttachmentBlockBytesUploaded.load()), Info.AttachmentsUploaded.load(), NiceBytes(Info.AttachmentBytesUploaded.load()), remotestore_impl::GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, TransferWallTimeMS))); } RemoteProjectStore::Result ParseOplogContainer( const CbObject& ContainerObject, const std::function RawHashes)>& OnReferencedAttachments, const std::function& HasAttachment, const std::function&& NeededChunkIndexes)>& OnNeedBlock, const std::function& OnNeedAttachment, const std::function& OnChunkedAttachment, CbObject& OutOplogSection, JobContext* OptionalContext) { using namespace std::literals; Stopwatch Timer; MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); CbValidateError ValidateResult = CbValidateError::None; if (CbObject SectionObject = ValidateAndReadCompactBinaryObject(std::move(SectionPayload), ValidateResult); ValidateResult == CbValidateError::None && SectionObject) { OutOplogSection = SectionObject; } else { remotestore_impl::ReportMessage( OptionalContext, fmt::format("Failed to read oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult))); return RemoteProjectStore::Result{gsl::narrow(HttpResponseCode::BadRequest), Timer.GetElapsedTimeMs() / 1000.0, "Section has unexpected data type", "Failed to read oplog container"}; } std::unordered_set NeededAttachments; { CbArrayView OpsArray = OutOplogSection["ops"sv].AsArrayView(); size_t OpCount = OpsArray.Num(); size_t OpsCompleteCount = 0; remotestore_impl::ReportMessage(OptionalContext, fmt::format("Scanning {} ops for attachments", OpCount)); Stopwatch ScanOplogProgressTimer; uint64_t LastReportTimeMs = ScanOplogProgressTimer.GetElapsedTimeMs(); for (CbFieldView OpEntry : OpsArray) { OpEntry.IterateAttachments([&](CbFieldView FieldView) { NeededAttachments.insert(FieldView.AsAttachment()); }); if (remotestore_impl::IsCancelled(OptionalContext)) { return RemoteProjectStore::Result{.ErrorCode = gsl::narrow(HttpResponseCode::OK), .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Reason = "Operation cancelled"}; } OpsCompleteCount++; if (ScanOplogProgressTimer.GetElapsedTimeMs() - LastReportTimeMs > 200) { remotestore_impl::ReportProgress( OptionalContext, "Scanning oplog"sv, fmt::format("{} attachments found, {} ops remaining...", NeededAttachments.size(), OpCount - OpsCompleteCount), OpCount, OpCount - OpsCompleteCount, ScanOplogProgressTimer.GetElapsedTimeMs()); LastReportTimeMs = ScanOplogProgressTimer.GetElapsedTimeMs(); } } remotestore_impl::ReportProgress(OptionalContext, "Scanning oplog"sv, fmt::format("{} attachments found", NeededAttachments.size()), OpCount, OpCount - OpsCompleteCount, ScanOplogProgressTimer.GetElapsedTimeMs()); } { std::vector ReferencedAttachments(NeededAttachments.begin(), NeededAttachments.end()); OnReferencedAttachments(ReferencedAttachments); } if (remotestore_impl::IsCancelled(OptionalContext)) { return RemoteProjectStore::Result{.ErrorCode = gsl::narrow(HttpResponseCode::OK), .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Reason = "Operation cancelled"}; } remotestore_impl::ReportMessage(OptionalContext, fmt::format("Oplog references {} attachments", NeededAttachments.size())); CbArrayView ChunkedFilesArray = ContainerObject["chunkedfiles"sv].AsArrayView(); for (CbFieldView ChunkedFileField : ChunkedFilesArray) { CbObjectView ChunkedFileView = ChunkedFileField.AsObjectView(); IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash(); if (NeededAttachments.erase(RawHash) == 1) { if (!HasAttachment(RawHash)) { ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView); size_t NeededChunkAttachmentCount = 0; OnReferencedAttachments(Chunked.ChunkHashes); for (const IoHash& ChunkHash : Chunked.ChunkHashes) { if (!HasAttachment(ChunkHash)) { if (NeededAttachments.insert(ChunkHash).second) { NeededChunkAttachmentCount++; } } } OnChunkedAttachment(Chunked); remotestore_impl::ReportMessage(OptionalContext, fmt::format("Requesting chunked attachment '{}' ({}) built from {} chunks, need {} chunks", Chunked.RawHash, NiceBytes(Chunked.RawSize), Chunked.ChunkHashes.size(), NeededChunkAttachmentCount)); } } if (remotestore_impl::IsCancelled(OptionalContext)) { return RemoteProjectStore::Result{.ErrorCode = gsl::narrow(HttpResponseCode::OK), .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Reason = "Operation cancelled"}; } } std::vector ThinBlocksDescriptions; size_t NeedBlockCount = 0; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); for (CbFieldView BlockField : BlocksArray) { CbObjectView BlockView = BlockField.AsObjectView(); IoHash BlockHash = BlockView["rawhash"sv].AsBinaryAttachment(); CbArrayView ChunksArray = BlockView["chunks"sv].AsArrayView(); std::vector ChunkHashes; ChunkHashes.reserve(ChunksArray.Num()); for (CbFieldView ChunkField : ChunksArray) { ChunkHashes.push_back(ChunkField.AsHash()); } ThinBlocksDescriptions.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)}); } for (ThinChunkBlockDescription& ThinBlockDescription : ThinBlocksDescriptions) { std::vector NeededBlockChunkIndexes; for (uint32_t ChunkIndex = 0; ChunkIndex < ThinBlockDescription.ChunkRawHashes.size(); ChunkIndex++) { const IoHash& ChunkHash = ThinBlockDescription.ChunkRawHashes[ChunkIndex]; if (NeededAttachments.erase(ChunkHash) == 1) { if (!HasAttachment(ChunkHash)) { NeededBlockChunkIndexes.push_back(ChunkIndex); } } } if (!NeededBlockChunkIndexes.empty()) { if (ThinBlockDescription.BlockHash != IoHash::Zero) { NeedBlockCount++; } OnNeedBlock(std::move(ThinBlockDescription), std::move(NeededBlockChunkIndexes)); } if (remotestore_impl::IsCancelled(OptionalContext)) { return RemoteProjectStore::Result{.ErrorCode = gsl::narrow(HttpResponseCode::OK), .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Reason = "Operation cancelled"}; } } remotestore_impl::ReportMessage(OptionalContext, fmt::format("Requesting {} of {} attachment blocks", NeedBlockCount, BlocksArray.Num())); size_t NeedAttachmentCount = 0; CbArrayView LargeChunksArray = ContainerObject["chunks"sv].AsArrayView(); for (CbFieldView LargeChunksField : LargeChunksArray) { IoHash AttachmentHash = LargeChunksField.AsBinaryAttachment(); if (NeededAttachments.erase(AttachmentHash) == 1) { if (!HasAttachment(AttachmentHash)) { OnNeedAttachment(AttachmentHash); NeedAttachmentCount++; } } if (remotestore_impl::IsCancelled(OptionalContext)) { return RemoteProjectStore::Result{.ErrorCode = gsl::narrow(HttpResponseCode::OK), .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Reason = "Operation cancelled"}; } } remotestore_impl::ReportMessage(OptionalContext, fmt::format("Requesting {} of {} large attachments", NeedAttachmentCount, LargeChunksArray.Num())); return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; } RemoteProjectStore::Result SaveOplogContainer( ProjectStore::Oplog& Oplog, const CbObject& ContainerObject, const std::function RawHashes)>& OnReferencedAttachments, const std::function& HasAttachment, const std::function&& NeededChunkIndexes)>& OnNeedBlock, const std::function& OnNeedAttachment, const std::function& OnChunkedAttachment, JobContext* OptionalContext) { using namespace std::literals; Stopwatch Timer; CbObject OplogSection; RemoteProjectStore::Result Result = ParseOplogContainer(ContainerObject, OnReferencedAttachments, HasAttachment, OnNeedBlock, OnNeedAttachment, OnChunkedAttachment, OplogSection, OptionalContext); if (Result.ErrorCode != 0) { return Result; } Result = remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } void LoadOplog(LoadOplogContext&& Context) { using namespace std::literals; ZEN_SCOPED_LOG(Context.Log); remotestore_impl::JobContextLogger JobContextOutput(Context.OptionalJobContext); remotestore_impl::DownloadInfo Info; Stopwatch Timer; std::unordered_set Attachments; uint64_t BlockCountToDownload = 0; RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = Context.RemoteStore.GetInfo(); remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName)); uint64_t TransferWallTimeMS = 0; Stopwatch LoadContainerTimer; RemoteProjectStore::LoadContainerResult LoadContainerResult = Context.RemoteStore.LoadContainer(); TransferWallTimeMS += LoadContainerTimer.GetElapsedTimeMs(); if (LoadContainerResult.ErrorCode) { remotestore_impl::ReportMessage( Context.OptionalJobContext, fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode)); throw RemoteStoreError( fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode), LoadContainerResult.ErrorCode, LoadContainerResult.Text); } remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Loaded container in {} ({})", NiceTimeSpanMs(static_cast(LoadContainerResult.ElapsedSeconds * 1000)), NiceBytes(LoadContainerResult.ContainerObject.GetSize()))); Info.OplogSizeBytes = LoadContainerResult.ContainerObject.GetSize(); std::atomic AbortFlag(false); std::atomic PauseFlag(false); ParallelWork AttachmentWork(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic_size_t AttachmentCount = 0; Stopwatch LoadAttachmentsTimer; std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1; auto HasAttachment = [&Context](const IoHash& RawHash) { if (Context.ForceDownload) { return false; } if (Context.ChunkStore.ContainsChunk(RawHash)) { return true; } return false; }; struct NeededBlockDownload { ThinChunkBlockDescription ThinBlockDescription; std::vector NeededChunkIndexes; }; std::vector NeededBlockDownloads; auto OnNeedBlock = [&Context, &AttachmentWork, &AbortFlag, &AttachmentCount, &BlockCountToDownload, &Info, &LoadAttachmentsTimer, &DownloadStartMS, &NeededBlockDownloads](ThinChunkBlockDescription&& ThinBlockDescription, std::vector&& NeededChunkIndexes) { if (AbortFlag.load()) { return; } BlockCountToDownload++; AttachmentCount.fetch_add(1); if (ThinBlockDescription.BlockHash == IoHash::Zero) { DownloadAndSaveBlockChunks(Context, AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, std::move(ThinBlockDescription), std::move(NeededChunkIndexes)); } else { NeededBlockDownloads.push_back(NeededBlockDownload{.ThinBlockDescription = std::move(ThinBlockDescription), .NeededChunkIndexes = std::move(NeededChunkIndexes)}); } }; std::vector AttachmentsToDownload; auto OnNeedAttachment = [&AttachmentsToDownload, &AbortFlag, &Attachments, &AttachmentCount](const IoHash& RawHash) { if (!Attachments.insert(RawHash).second) { return; } if (AbortFlag.load()) { return; } AttachmentCount.fetch_add(1); AttachmentsToDownload.push_back(RawHash); }; std::vector FilesToDechunk; auto OnChunkedAttachment = [&FilesToDechunk](const ChunkedInfo& Chunked) { FilesToDechunk.push_back(Chunked); }; auto OnReferencedAttachments = [&Context](std::span RawHashes) { Context.Oplog.CaptureAddedAttachments(RawHashes); }; // Make sure we retain any attachments we download before writing the oplog Context.Oplog.EnableUpdateCapture(); auto _ = MakeGuard([&Context]() { Context.Oplog.DisableUpdateCapture(); }); CbObject OplogSection; RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject, OnReferencedAttachments, HasAttachment, OnNeedBlock, OnNeedAttachment, OnChunkedAttachment, OplogSection, Context.OptionalJobContext); if (Result.ErrorCode != 0) { AbortFlag = true; AttachmentWork.Wait(); throw RemoteStoreError(Result.Reason, Result.ErrorCode, Result.Text); } remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download", NiceTimeSpanMs(static_cast(Result.ElapsedSeconds * 1000.0)), Attachments.size(), BlockCountToDownload, FilesToDechunk.size())); std::vector BlockHashes; std::vector AllNeededChunkHashes; BlockHashes.reserve(NeededBlockDownloads.size()); for (const NeededBlockDownload& BlockDownload : NeededBlockDownloads) { BlockHashes.push_back(BlockDownload.ThinBlockDescription.BlockHash); for (uint32_t ChunkIndex : BlockDownload.NeededChunkIndexes) { AllNeededChunkHashes.push_back(BlockDownload.ThinBlockDescription.ChunkRawHashes[ChunkIndex]); } } tsl::robin_map AllNeededPartialChunkHashesLookup = BuildHashLookup(AllNeededChunkHashes); std::vector> ChunkDownloadedFlags(AllNeededChunkHashes.size()); std::vector DownloadedViaLegacyChunkFlag(AllNeededChunkHashes.size(), false); ChunkBlockAnalyser::BlockResult PartialBlocksResult; remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Fetching descriptions for {} blocks", BlockHashes.size())); RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions = Context.RemoteStore.GetBlockDescriptions(BlockHashes, Context.OptionalCache, Context.CacheBuildId); remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("GetBlockDescriptions took {}. Found {} blocks", NiceTimeSpanMs(uint64_t(BlockDescriptions.ElapsedSeconds * 1000)), BlockDescriptions.Blocks.size())); std::vector BlocksWithDescription; BlocksWithDescription.reserve(BlockDescriptions.Blocks.size()); for (const ChunkBlockDescription& BlockDescription : BlockDescriptions.Blocks) { BlocksWithDescription.push_back(BlockDescription.BlockHash); } { auto WantIt = NeededBlockDownloads.begin(); auto FindIt = BlockDescriptions.Blocks.begin(); while (WantIt != NeededBlockDownloads.end()) { if (FindIt == BlockDescriptions.Blocks.end()) { // Fall back to full download as we can't get enough information about the block DownloadAndSaveBlock(Context, AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, WantIt->ThinBlockDescription.BlockHash, AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, 3); for (uint32_t BlockChunkIndex : WantIt->NeededChunkIndexes) { const IoHash& ChunkHash = WantIt->ThinBlockDescription.ChunkRawHashes[BlockChunkIndex]; auto It = AllNeededPartialChunkHashesLookup.find(ChunkHash); ZEN_ASSERT(It != AllNeededPartialChunkHashesLookup.end()); uint32_t ChunkIndex = It->second; DownloadedViaLegacyChunkFlag[ChunkIndex] = true; } WantIt++; } else if (WantIt->ThinBlockDescription.BlockHash == FindIt->BlockHash) { // Found FindIt++; WantIt++; } else { // Not a requested block? Ignore it FindIt++; } } } std::vector BlockExistsInCache(BlocksWithDescription.size(), false); if (!AllNeededChunkHashes.empty()) { std::vector PartialBlockDownloadModes; if (Context.PartialBlockRequestMode == EPartialBlockRequestMode::Off) { PartialBlockDownloadModes.resize(BlocksWithDescription.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); } else { if (Context.OptionalCache) { std::vector CacheExistsResult = Context.OptionalCache->BlobsExists(Context.CacheBuildId, BlocksWithDescription); if (CacheExistsResult.size() == BlocksWithDescription.size()) { for (size_t BlobIndex = 0; BlobIndex < CacheExistsResult.size(); BlobIndex++) { BlockExistsInCache[BlobIndex] = CacheExistsResult[BlobIndex].HasBody; } } uint64_t FoundBlocks = std::accumulate(BlockExistsInCache.begin(), BlockExistsInCache.end(), uint64_t(0u), [](uint64_t Current, bool Exists) -> uint64_t { return Current + (Exists ? 1 : 0); }); if (FoundBlocks > 0) { remotestore_impl::ReportMessage( Context.OptionalJobContext, fmt::format("Found {} out of {} blocks in cache", FoundBlocks, BlockExistsInCache.size())); } } ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; switch (Context.PartialBlockRequestMode) { case EPartialBlockRequestMode::Off: break; case EPartialBlockRequestMode::ZenCacheOnly: CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1 ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; break; case EPartialBlockRequestMode::Mixed: CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1 ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; break; case EPartialBlockRequestMode::All: CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1 ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; CloudPartialDownloadMode = Context.StoreMaxRangeCountPerRequest > 1 ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; break; } PartialBlockDownloadModes.reserve(BlocksWithDescription.size()); for (uint32_t BlockIndex = 0; BlockIndex < BlocksWithDescription.size(); BlockIndex++) { const bool BlockExistInCache = BlockExistsInCache[BlockIndex]; PartialBlockDownloadModes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode); } } ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size()); ChunkBlockAnalyser PartialAnalyser( JobContextOutput.Log(), BlockDescriptions.Blocks, ChunkBlockAnalyser::Options{.IsQuiet = false, .IsVerbose = false, .HostLatencySec = Context.StoreLatencySec, .HostHighSpeedLatencySec = Context.CacheLatencySec, .HostMaxRangeCountPerRequest = Context.StoreMaxRangeCountPerRequest, .HostHighSpeedMaxRangeCountPerRequest = Context.CacheMaxRangeCountPerRequest}); std::vector NeededBlocks = PartialAnalyser.GetNeeded(AllNeededPartialChunkHashesLookup, [&](uint32_t ChunkIndex) { return !DownloadedViaLegacyChunkFlag[ChunkIndex]; }); PartialBlocksResult = PartialAnalyser.CalculatePartialBlockDownloads(NeededBlocks, PartialBlockDownloadModes); } Stopwatch AttachmentsDownloadProgressTimer; for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes) { DownloadAndSaveBlock(Context, AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, BlockDescriptions.Blocks[FullBlockIndex].BlockHash, AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, 3); } for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocksResult.BlockRanges.size();) { size_t RangeCount = 1; size_t RangesLeft = PartialBlocksResult.BlockRanges.size() - BlockRangeIndex; const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocksResult.BlockRanges[BlockRangeIndex]; while (RangeCount < RangesLeft && CurrentBlockRange.BlockIndex == PartialBlocksResult.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) { RangeCount++; } DownloadAndSavePartialBlock(Context, AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, BlockDescriptions.Blocks[CurrentBlockRange.BlockIndex], BlockExistsInCache[CurrentBlockRange.BlockIndex], PartialBlocksResult.BlockRanges, BlockRangeIndex, RangeCount, AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, /* RetriesLeft*/ 3); BlockRangeIndex += RangeCount; } for (const IoHash& AttachmentToDownload : AttachmentsToDownload) { DownloadAndSaveAttachment(Context, AttachmentWork, Info, LoadAttachmentsTimer, DownloadStartMS, AttachmentToDownload); } uint64_t TotalChunksToDownload = AllNeededChunkHashes.size() + AttachmentsToDownload.size(); AttachmentWork.Wait(1000, [&](bool /*IsAborted*/, bool /*IsPaused*/, std::ptrdiff_t /*Pending*/) { if (remotestore_impl::IsCancelled(Context.OptionalJobContext) && !AbortFlag) { AbortFlag = true; } uint64_t PartialTransferWallTimeMS = TransferWallTimeMS; if (DownloadStartMS != (uint64_t)-1) { PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); } uint64_t CompletedChunkCount = Info.ChunksCompleteCount.load(); uint64_t AttachmentsDownloaded = Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); remotestore_impl::ReportProgress( Context.OptionalJobContext, "Loading attachments"sv, fmt::format("{}/{} ({}) chunks. {} ({}) blobs downloaded. {}", CompletedChunkCount, TotalChunksToDownload, NiceBytes(Info.AttachmentBytesStored.load()), AttachmentsDownloaded, NiceBytes(AttachmentBytesDownloaded), remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, PartialTransferWallTimeMS)), TotalChunksToDownload, TotalChunksToDownload - CompletedChunkCount, AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); }); if (DownloadStartMS != (uint64_t)-1) { TransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); } if (AttachmentCount.load() > 0) { remotestore_impl::ReportProgress(Context.OptionalJobContext, "Loading attachments", ""sv, AttachmentCount.load(), 0, AttachmentsDownloadProgressTimer.GetElapsedTimeMs()); } if (!FilesToDechunk.empty()) { remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); ParallelWork DechunkWork(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::filesystem::path TempFilePath = Context.Oplog.TempPath(); for (size_t ChunkedIndex = 0; ChunkedIndex < FilesToDechunk.size(); ChunkedIndex++) { const ChunkedInfo& Chunked = FilesToDechunk[ChunkedIndex]; std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); DechunkWork.ScheduleWork( Context.WorkerPool, [&Log, &Context, TempFileName, &FilesToDechunk, ChunkedIndex, &Info](std::atomic& AbortFlag) { ZEN_TRACE_CPU("DechunkAttachment"); auto _ = MakeGuard([&Log, &TempFileName] { std::error_code Ec; if (IsFile(TempFileName, Ec)) { RemoveFile(TempFileName, Ec); if (Ec) { ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); } } }); const ChunkedInfo& Chunked = FilesToDechunk[ChunkedIndex]; try { if (AbortFlag.load()) { return; } Stopwatch Timer; IoBuffer TmpBuffer; { BasicFile TmpFile; std::error_code Ec; TmpFile.Open(TempFileName, BasicFile::Mode::kTruncate, Ec); if (Ec) { throw RemoteStoreError( "Write error", gsl::narrow(HttpResponseCode::InternalServerError), fmt::format("Failed to open temp file {} for chunked attachment {}", TempFileName, Chunked.RawHash)); } else { BasicFileWriter TmpWriter(TmpFile, 64u * 1024u); uint64_t ChunkOffset = CompressedBuffer::GetHeaderSizeForNoneEncoder(); BLAKE3Stream HashingStream; for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) { if (AbortFlag.load()) { return; } const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; IoBuffer Chunk = Context.ChunkStore.FindChunkByCid(ChunkHash); if (!Chunk) { remotestore_impl::ReportMessage( Context.OptionalJobContext, fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); // We only add 1 as the resulting missing count will be 1 for the dechunked file Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { throw RemoteStoreError( "Missing chunk", gsl::narrow(HttpResponseCode::NotFound), fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); } return; } IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize); if (RawHash != ChunkHash || !Compressed) { std::string Message = Compressed ? fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}", RawHash, ChunkHash, Chunked.RawHash) : fmt::format("Malformed data for chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash); remotestore_impl::ReportMessage(Context.OptionalJobContext, Message); // We only add 1 as the resulting missing count will be 1 for the dechunked file Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { throw RemoteStoreError("Missing chunk", gsl::narrow(HttpResponseCode::NotFound), Message); } return; } { ZEN_TRACE_CPU("DecompressChunk"); if (!Compressed.DecompressToStream( 0, RawSize, [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset, SourceSize, Offset); for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { MemoryView SegmentData = Segment.GetView(); HashingStream.Append(SegmentData); TmpWriter.Write(SegmentData.GetData(), SegmentData.GetSize(), ChunkOffset + Offset); } return true; })) { remotestore_impl::ReportMessage( Context.OptionalJobContext, fmt::format("Failed to decompress chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); // We only add 1 as the resulting missing count will be 1 for the dechunked file Info.MissingAttachmentCount.fetch_add(1); if (!Context.IgnoreMissingAttachments) { throw RemoteStoreError( "Missing chunk", gsl::narrow(HttpResponseCode::NotFound), fmt::format("Failed to decompress chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); } return; } } ChunkOffset += RawSize; } BLAKE3 RawHash = HashingStream.GetHash(); ZEN_ASSERT(Chunked.RawHash == IoHash::FromBLAKE3(RawHash)); UniqueBuffer Header = CompressedBuffer::CreateHeaderForNoneEncoder(Chunked.RawSize, RawHash); TmpWriter.Write(Header.GetData(), Header.GetSize(), 0); } TmpFile.Close(); TmpBuffer = IoBufferBuilder::MakeFromTemporaryFile(TempFileName); } uint64_t TmpBufferSize = TmpBuffer.GetSize(); CidStore::InsertResult InsertResult = Context.ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); if (InsertResult.New) { Info.AttachmentBytesStored.fetch_add(TmpBufferSize); Info.AttachmentsStored.fetch_add(1); } ZEN_INFO("Dechunked attachment {} ({}) in {}", Chunked.RawHash, NiceBytes(Chunked.RawSize), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } catch (const std::exception& Ex) { throw RemoteStoreError(fmt::format("Failed to dechunk file {}", Chunked.RawHash), gsl::narrow(HttpResponseCode::InternalServerError), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); } Stopwatch DechunkProgressTimer; DechunkWork.Wait(1000, [&](bool /*IsAborted*/, bool /*IsPaused*/, std::ptrdiff_t Remaining) { if (remotestore_impl::IsCancelled(Context.OptionalJobContext) && !AbortFlag) { AbortFlag = true; } remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, fmt::format("{} remaining...", Remaining), FilesToDechunk.size(), Remaining, DechunkProgressTimer.GetElapsedTimeMs()); }); remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0, DechunkProgressTimer.GetElapsedTimeMs()); } if (Context.CleanOplog) { if (Context.OptionalCache) { Context.OptionalCache->Flush(100, [](intptr_t) { return /*DontWaitForPendingOperation*/ false; }); } if (!Context.Oplog.Reset()) { std::string Reason = fmt::format("Failed to clean existing oplog '{}'", Context.Oplog.OplogId()); remotestore_impl::ReportMessage( Context.OptionalJobContext, fmt::format("Aborting ({}): {}", gsl::narrow(HttpResponseCode::InternalServerError), Reason)); throw RemoteStoreError(Reason, gsl::narrow(HttpResponseCode::InternalServerError), ""); } } { RemoteProjectStore::Result WriteResult = remotestore_impl::WriteOplogSection(Context.Oplog, OplogSection, Context.OptionalJobContext); if (WriteResult.ErrorCode) { remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Aborting ({}): {}", WriteResult.ErrorCode, WriteResult.Reason)); throw RemoteStoreError(WriteResult.Reason, WriteResult.ErrorCode, WriteResult.Text); } } remotestore_impl::LogRemoteStoreStatsDetails(Context.RemoteStore.GetStats()); { std::string DownloadDetails; RemoteProjectStore::ExtendedStats ExtendedStats; if (Context.RemoteStore.GetExtendedStats(ExtendedStats)) { if (!ExtendedStats.m_ReceivedBytesPerSource.empty()) { uint64_t Total = 0; ExtendableStringBuilder<128> SB; for (auto& It : ExtendedStats.m_ReceivedBytesPerSource) { if (SB.Size() > 0) { SB.Append(", "sv); } SB.Append(It.first); SB.Append(": "sv); SB.Append(NiceBytes(It.second)); Total += It.second; } remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Downloaded {} ({})", NiceBytes(Total), SB.ToView())); } } } uint64_t TotalDownloads = 1 + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); uint64_t TotalBytesDownloaded = Info.OplogSizeBytes + Info.AttachmentBlockBytesDownloaded.load() + Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); remotestore_impl::ReportMessage( Context.OptionalJobContext, fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} " "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}", RemoteStoreInfo.ContainerName, "SUCCESS", NiceTimeSpanMs(static_cast(Timer.GetElapsedTimeMs())), NiceBytes(Info.OplogSizeBytes), Info.AttachmentBlocksDownloaded.load(), NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), Info.AttachmentBlocksRangesDownloaded.load(), NiceBytes(Info.AttachmentBlockRangeBytesDownloaded.load()), Info.AttachmentsDownloaded.load(), NiceBytes(Info.AttachmentBytesDownloaded.load()), TotalDownloads, NiceBytes(TotalBytesDownloaded), Info.AttachmentsStored.load(), NiceBytes(Info.AttachmentBytesStored.load()), Info.MissingAttachmentCount.load(), remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS))); } ChunkedInfo ReadChunkedInfo(CbObjectView ChunkedFile) { using namespace std::literals; ChunkedInfo Chunked; Chunked.RawHash = ChunkedFile["rawhash"sv].AsHash(); Chunked.RawSize = ChunkedFile["rawsize"sv].AsUInt64(); CbArrayView ChunksArray = ChunkedFile["chunks"sv].AsArrayView(); Chunked.ChunkHashes.reserve(ChunksArray.Num()); for (CbFieldView ChunkField : ChunksArray) { const IoHash ChunkHash = ChunkField.AsHash(); Chunked.ChunkHashes.emplace_back(ChunkHash); } CbArrayView SequenceArray = ChunkedFile["sequence"sv].AsArrayView(); Chunked.ChunkSequence.reserve(SequenceArray.Num()); for (CbFieldView SequenceField : SequenceArray) { uint32_t SequenceIndex = SequenceField.AsUInt32(); ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size()); Chunked.ChunkSequence.push_back(SequenceIndex); } return Chunked; } ////////////////////////////////////////////////////////////////////////// // These are here to avoid vtable leakage RemoteProjectStore::RemoteProjectStore() { } RemoteProjectStore::~RemoteProjectStore() { } #if ZEN_WITH_TESTS namespace projectstore_testutils { using namespace std::literals; static std::string OidAsString(const Oid& Id) { StringBuilder<25> OidStringBuilder; Id.ToString(OidStringBuilder); return OidStringBuilder.ToString(); } static CbPackage CreateBulkDataOplogPackage(const Oid& Id, const std::span>& Attachments) { CbPackage Package; CbObjectWriter Object; Object << "key"sv << OidAsString(Id); if (!Attachments.empty()) { Object.BeginArray("bulkdata"); for (const auto& Attachment : Attachments) { CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash()); Object.BeginObject(); Object << "id"sv << Attachment.first; Object << "type"sv << "Standard"sv; Object << "data"sv << Attach; Object.EndObject(); Package.AddAttachment(Attach); } Object.EndArray(); } Package.SetObject(Object.Save()); return Package; }; static CbPackage CreateFilesOplogPackage(const Oid& Id, const std::filesystem::path ProjectRootDir, const std::span>& Attachments) { CbPackage Package; CbObjectWriter Object; Object << "key"sv << OidAsString(Id); if (!Attachments.empty()) { Object.BeginArray("files"); for (const auto& Attachment : Attachments) { std::filesystem::path ServerPath = std::filesystem::relative(Attachment.second, ProjectRootDir).generic_string(); std::filesystem::path ClientPath = ServerPath; // dummy Object.BeginObject(); Object << "id"sv << Attachment.first; Object << "serverpath"sv << ServerPath.string(); Object << "clientpath"sv << ClientPath.string(); Object.EndObject(); } Object.EndArray(); } Package.SetObject(Object.Save()); return Package; }; // Variant of CreateFilesOplogPackage where each entry includes a "data" field of // CbFieldType::Hash set to IoHash::Zero. CbFieldView::AsHash() returns Zero for a // plain Hash field whose stored value is zero, so RewriteOp still enters the rewrite // path (DataHash == Zero) and calls RewriteCbObject, which then finds the pre-existing // "data" field, triggering the return-true branch at line 1858. static CbPackage CreateFilesOplogPackageWithZeroDataHash(const Oid& Id, const std::filesystem::path ProjectRootDir, const std::span>& Attachments) { CbPackage Package; CbObjectWriter Object; Object << "key"sv << OidAsString(Id); if (!Attachments.empty()) { Object.BeginArray("files"); for (const auto& Attachment : Attachments) { std::filesystem::path ServerPath = std::filesystem::relative(Attachment.second, ProjectRootDir).generic_string(); std::filesystem::path ClientPath = ServerPath; // dummy Object.BeginObject(); Object << "id"sv << Attachment.first; Object << "serverpath"sv << ServerPath.string(); Object << "clientpath"sv << ClientPath.string(); Object.AddHash("data"sv, IoHash::Zero); Object.EndObject(); } Object.EndArray(); } Package.SetObject(Object.Save()); return Package; }; static std::vector> CreateAttachments( const std::span& Sizes, OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, uint64_t BlockSize = 0) { std::vector> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(CreateSemiRandomBlob(Size)), OodleCompressor::Mermaid, CompressionLevel, BlockSize); Result.emplace_back(std::pair(Oid::NewOid(), Compressed)); } return Result; } static std::vector> CreateFileAttachments(const std::filesystem::path& RootDir, const std::span& Sizes) { std::vector> Result; Result.reserve(Sizes.size()); for (size_t Size : Sizes) { IoBuffer FileBlob = CreateRandomBlob(Size); IoHash FileHash = IoHash::HashBuffer(FileBlob); std::filesystem::path UncompressedFilePath = RootDir / "content" / "uncompressed_file" / FileHash.ToHexString(); CreateDirectories(UncompressedFilePath.parent_path()); WriteFile(UncompressedFilePath, FileBlob); Result.push_back({Oid::NewOid(), UncompressedFilePath}); } return Result; } struct CapturingJobContext : public JobContext { bool IsCancelled() const override { return m_Cancel; } void ReportMessage(std::string_view Message) override { RwLock::ExclusiveLockScope _(m_Lock); Messages.emplace_back(Message); } void ReportProgress(std::string_view Op, std::string_view Details, ptrdiff_t, ptrdiff_t, uint64_t) override { RwLock::ExclusiveLockScope _(m_Lock); ProgressMessages.emplace_back(fmt::format("{}: {}", Op, Details)); } bool HasMessage(std::string_view Substr) const { RwLock::SharedLockScope _(m_Lock); return std::any_of(Messages.begin(), Messages.end(), [Substr](const std::string& M) { return M.find(Substr) != std::string::npos; }); } bool m_Cancel = false; std::vector Messages; std::vector ProgressMessages; private: mutable RwLock m_Lock; }; // Worker pool pair with separate NetworkPool and WorkerPool. struct TestWorkerPools { private: uint32_t m_NetworkCount; uint32_t m_WorkerCount; public: WorkerThreadPool NetworkPool; WorkerThreadPool WorkerPool; TestWorkerPools() : m_NetworkCount(Max(GetHardwareConcurrency() / 4u, 2u)) , m_WorkerCount(m_NetworkCount < GetHardwareConcurrency() ? Max(GetHardwareConcurrency() - m_NetworkCount, 4u) : 4u) , NetworkPool(m_NetworkCount) , WorkerPool(m_WorkerCount) { } }; inline uint32_t GetWorkerCount() { return Max(GetHardwareConcurrency() / 4u, 2u); } inline IoHash MakeTestHash(uint8_t Index) { uint8_t Data[20] = {}; Data[0] = Index; return IoHash::MakeFrom(Data); } inline Oid MakeTestOid(uint32_t Index) { uint32_t Data[3] = {Index, 0, 0}; return Oid::FromMemory(Data); } // MaxChunks must be <= 127 (so MeasureVarUInt(MaxChunks) == 1) and MaxChunkEmbedSize is // fixed at 100 to keep header sizes deterministic in BlockComposer tests. inline remotestore_impl::BlockComposer::Configuration MakeTestConfig(uint64_t UsableSize, uint64_t MaxChunks) { constexpr uint64_t MaxChunkEmbedSize = 100; uint64_t MaxHeaderSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + MeasureVarUInt(MaxChunks) + MeasureVarUInt(MaxChunkEmbedSize) * MaxChunks; return remotestore_impl::BlockComposer::Configuration{ .MaxBlockSize = UsableSize + MaxHeaderSize, .MaxChunksPerBlock = MaxChunks, .MaxChunkEmbedSize = MaxChunkEmbedSize, }; } } // namespace projectstore_testutils TEST_SUITE_BEGIN("remotestore.projectstore"); struct ExportForceDisableBlocksTrue_ForceTempBlocksFalse { static const bool ForceDisableBlocks = true; static const bool ForceEnableTempBlocks = false; }; struct ExportForceDisableBlocksFalse_ForceTempBlocksFalse { static const bool ForceDisableBlocks = false; static const bool ForceEnableTempBlocks = false; }; struct ExportForceDisableBlocksFalse_ForceTempBlocksTrue { static const bool ForceDisableBlocks = false; static const bool ForceEnableTempBlocks = true; }; TEST_CASE_TEMPLATE("project.store.export", Settings, ExportForceDisableBlocksTrue_ForceTempBlocksFalse, ExportForceDisableBlocksFalse_ForceTempBlocksFalse, ExportForceDisableBlocksFalse_ForceTempBlocksTrue) { using namespace std::literals; using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; ProjectStore ProjectStore(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; Ref Project(ProjectStore.NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); Ref Oplog = Project->NewOplog("oplog1", {}); CHECK(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{77}))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{7123, 583, 690, 99}))); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{55, 122}))); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( Oid::NewOid(), CreateAttachments(std::initializer_list{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); Oplog->AppendNewOplogEntry( CreateFilesOplogPackage(Oid::NewOid(), RootDir, CreateFileAttachments(RootDir, std::initializer_list{423 * 1024, 2 * 1024, 3213, 762 * 1024}))); FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = 32 * 1024u, .ChunkFileSizeLimit = 64u * 1024u}, /*.FolderPath = */ ExportDir.Path(), /*.Name = */ std::string("oplog1"), /*OptionalBaseName = */ std::string(), /*.ForceDisableBlocks = */ Settings::ForceDisableBlocks, /*.ForceEnableTempBlocks = */ Settings::ForceEnableTempBlocks}; std::shared_ptr RemoteStore = CreateFileRemoteStore(Log(), Options); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; SaveOplog(Log(), CidStore, *RemoteStore, *Project.Get(), *Oplog, NetworkPool, WorkerPool, Options.MaxBlockSize, Options.MaxChunksPerBlock, Options.MaxChunkEmbedSize, Options.ChunkFileSizeLimit, true, false, false, nullptr); Ref OplogImport = Project->NewOplog("oplog2", {}); REQUIRE(OplogImport); CapturingJobContext Ctx; auto DoLoad = [&](bool Force, bool Clean) { LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .OptionalCache = nullptr, .CacheBuildId = Oid::Zero, .Oplog = *OplogImport, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = Force, .IgnoreMissingAttachments = false, .CleanOplog = Clean, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, .OptionalJobContext = &Ctx}); }; DoLoad(false, false); DoLoad(true, false); DoLoad(false, true); DoLoad(true, true); } // Populates ExportDir with a SaveOplog call using the same data as project.store.export. static std::shared_ptr SetupExportStore(CidStore& CidStore, ProjectStore::Project& Project, WorkerThreadPool& NetworkPool, WorkerThreadPool& WorkerPool, const std::filesystem::path& ExportDir) { using namespace projectstore_testutils; using namespace std::literals; Ref Oplog = Project.NewOplog("oplog_export", {}); if (!Oplog) { throw std::runtime_error("Failed to create oplog"); } Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{77}))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{7123, 583, 690, 99}))); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{55, 122}))); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( Oid::NewOid(), CreateAttachments(std::initializer_list{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage( Oid::NewOid(), Project.RootDir, CreateFileAttachments(Project.RootDir, std::initializer_list{423 * 1024, 2 * 1024, 3213, 762 * 1024}))); FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = 32 * 1024u, .ChunkFileSizeLimit = 64u * 1024u}, /*.FolderPath =*/ExportDir, /*.Name =*/std::string("oplog_export"), /*.OptionalBaseName =*/std::string(), /*.ForceDisableBlocks =*/false, /*.ForceEnableTempBlocks =*/false}; std::shared_ptr RemoteStore = CreateFileRemoteStore(Log(), Options); SaveOplog(Log(), CidStore, *RemoteStore, Project, *Oplog, NetworkPool, WorkerPool, Options.MaxBlockSize, Options.MaxChunksPerBlock, Options.MaxChunkEmbedSize, Options.ChunkFileSizeLimit, /*EmbedLooseFiles*/ true, /*ForceUpload*/ false, /*IgnoreMissingAttachments*/ false, /*OptionalContext*/ nullptr); return RemoteStore; } // Creates an export store with six 512 KB chunks packed into one ~3 MB block (MaxBlockSize=8 MB). // The ~1.5 MB slack exceeds the ChunkBlockAnalyser threshold, enabling partial-block downloads. // Uses its own GcManager/CidStore/ProjectStore so each call is independent. static std::shared_ptr SetupPartialBlockExportStore(WorkerThreadPool& NetworkPool, WorkerThreadPool& WorkerPool, const std::filesystem::path& ExportDir) { using namespace projectstore_testutils; using namespace std::literals; GcManager LocalGc; CidStore LocalCidStore(LocalGc); CidStoreConfiguration LocalCidConfig = {.RootDirectory = ExportDir / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; LocalCidStore.Initialize(LocalCidConfig); std::filesystem::path LocalProjectBasePath = ExportDir / "proj"; ProjectStore LocalProjectStore(LocalCidStore, LocalProjectBasePath, LocalGc, ProjectStore::Configuration{}); Ref LocalProject(LocalProjectStore.NewProject(LocalProjectBasePath / "p"sv, "p"sv, (ExportDir / "root").string(), (ExportDir / "engine").string(), (ExportDir / "game").string(), (ExportDir / "game" / "game.uproject").string())); Ref Oplog = LocalProject->NewOplog("oplog_partial_block", {}); if (!Oplog) { throw std::runtime_error("Failed to create oplog"); } Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( Oid::NewOid(), CreateAttachments(std::initializer_list{512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u}, OodleCompressionLevel::None))); // MaxChunkEmbedSize must exceed 512 KB (compressed size with None encoding) or all chunks // become loose attachments and no block is created. FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 8u * 1024u * 1024u, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize, .ChunkFileSizeLimit = 64u * 1024u * 1024u}, /*.FolderPath =*/ExportDir, /*.Name =*/std::string("oplog_partial_block"), /*.OptionalBaseName =*/std::string(), /*.ForceDisableBlocks =*/false, /*.ForceEnableTempBlocks =*/false}; std::shared_ptr RemoteStore = CreateFileRemoteStore(Log(), Options); SaveOplog(Log(), LocalCidStore, *RemoteStore, *LocalProject, *Oplog, NetworkPool, WorkerPool, Options.MaxBlockSize, Options.MaxChunksPerBlock, Options.MaxChunkEmbedSize, Options.ChunkFileSizeLimit, /*EmbedLooseFiles*/ true, /*ForceUpload*/ false, /*IgnoreMissingAttachments*/ false, /*OptionalContext*/ nullptr); return RemoteStore; } static IoHash FindBlockWithMultipleChunks(RemoteProjectStore& Store, size_t MinChunkCount) { RemoteProjectStore::LoadContainerResult ContainerResult = Store.LoadContainer(); if (ContainerResult.ErrorCode != 0) { return {}; } std::vector BlockHashes = GetBlockHashesFromOplog(ContainerResult.ContainerObject); if (BlockHashes.empty()) { return {}; } RemoteProjectStore::GetBlockDescriptionsResult Descriptions = Store.GetBlockDescriptions(BlockHashes, nullptr, Oid{}); if (Descriptions.ErrorCode != 0) { return {}; } for (const ChunkBlockDescription& Desc : Descriptions.Blocks) { if (Desc.ChunkRawHashes.size() >= MinChunkCount) { return Desc.BlockHash; } } return {}; } // Seeds TargetCidStore with even-indexed chunks (0, 2, 4 ...) from BlockHash, leaving // odd chunks absent to create non-adjacent missing ranges for partial-block download tests. static void SeedCidStoreWithAlternateChunks(CidStore& TargetCidStore, RemoteProjectStore& Source, const IoHash& BlockHash) { RemoteProjectStore::LoadAttachmentResult BlockResult = Source.LoadAttachment(BlockHash); if (BlockResult.ErrorCode != 0 || !BlockResult.Bytes) { return; } IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(BlockResult.Bytes), RawHash, RawSize); if (!Compressed) { return; } CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); if (!BlockPayload) { return; } uint32_t ChunkIndex = 0; uint64_t HeaderSize = 0; IterateChunkBlock( BlockPayload.Flatten(), [&TargetCidStore, &ChunkIndex](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) { if (ChunkIndex % 2 == 0) { IoBuffer ChunkData = Chunk.GetCompressed().Flatten().AsIoBuffer(); TargetCidStore.AddChunk(ChunkData, AttachmentHash); } ++ChunkIndex; }, HeaderSize); } TEST_CASE("project.store.import.context_settings") { using namespace std::literals; using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; // Export-side CAS and project store; kept disjoint from the import side. GcManager ExportGc; CidStore ExportCidStore(ExportGc); CidStoreConfiguration ExportCidConfig = {.RootDirectory = TempDir.Path() / "export_cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; ExportCidStore.Initialize(ExportCidConfig); std::filesystem::path ExportBasePath = TempDir.Path() / "export_projectstore"; ProjectStore ExportProjectStore(ExportCidStore, ExportBasePath, ExportGc, ProjectStore::Configuration{}); Ref ExportProject(ExportProjectStore.NewProject(ExportBasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr RemoteStore = SetupExportStore(ExportCidStore, *ExportProject, NetworkPool, WorkerPool, ExportDir.Path()); // Import-side CAS starts empty so the first import downloads from the remote store without ForceDownload. GcManager ImportGc; CidStore ImportCidStore(ImportGc); CidStoreConfiguration ImportCidConfig = {.RootDirectory = TempDir.Path() / "import_cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; ImportCidStore.Initialize(ImportCidConfig); std::filesystem::path ImportBasePath = TempDir.Path() / "import_projectstore"; ProjectStore ImportProjectStore(ImportCidStore, ImportBasePath, ImportGc, ProjectStore::Configuration{}); Ref ImportProject(ImportProjectStore.NewProject(ImportBasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); const Oid CacheBuildId = Oid::NewOid(); BuildStorageCache::Statistics CacheStats; std::unique_ptr Cache = CreateInMemoryBuildStorageCache(256u, CacheStats); auto ResetCacheStats = [&]() { CacheStats.TotalBytesRead = 0; CacheStats.TotalBytesWritten = 0; CacheStats.TotalRequestCount = 0; CacheStats.TotalRequestTimeUs = 0; CacheStats.TotalExecutionTimeUs = 0; CacheStats.PeakSentBytes = 0; CacheStats.PeakReceivedBytes = 0; CacheStats.PeakBytesPerSec = 0; CacheStats.PutBlobCount = 0; CacheStats.PutBlobByteCount = 0; }; int OpJobIndex = 0; CapturingJobContext OpJobContext; // Each call creates a fresh oplog to prevent short-circuiting on already-present data. auto DoImport = [&](BuildStorageCache* OptCache, EPartialBlockRequestMode Mode, double StoreLatency, uint64_t StoreRanges, double CacheLatency, uint64_t CacheRanges, bool PopulateCache, bool ForceDownload) -> void { Ref ImportOplog = ImportProject->NewOplog(fmt::format("import_{}", OpJobIndex++), {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = ImportCidStore, .RemoteStore = *RemoteStore, .OptionalCache = OptCache, .CacheBuildId = CacheBuildId, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = ForceDownload, .IgnoreMissingAttachments = false, .CleanOplog = false, .PartialBlockRequestMode = Mode, .PopulateCache = PopulateCache, .StoreLatencySec = StoreLatency, .StoreMaxRangeCountPerRequest = StoreRanges, .CacheLatencySec = CacheLatency, .CacheMaxRangeCountPerRequest = CacheRanges, .OptionalJobContext = &OpJobContext}); }; // Shorthand: Mode=All, low latency, 128 ranges for both store and cache. auto ImportAll = [&](BuildStorageCache* OptCache, bool Populate, bool Force) -> void { DoImport(OptCache, EPartialBlockRequestMode::All, 0.001, 128u, 0.001, 128u, Populate, Force); }; SUBCASE("mode_off_no_cache") { DoImport(nullptr, EPartialBlockRequestMode::Off, -1.0, (uint64_t)-1, -1.0, (uint64_t)-1, false, false); } SUBCASE("mode_all_multirange_cloud_no_cache") { // StoreMaxRangeCountPerRequest > 1 -> MultiRange cloud path. DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 128u, -1.0, 0u, false, false); } SUBCASE("mode_all_singlerange_cloud_no_cache") { // StoreMaxRangeCountPerRequest == 1 -> SingleRange cloud path. DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 1u, -1.0, 0u, false, false); } SUBCASE("mode_mixed_high_latency_no_cache") { // High store latency encourages range merging; Mixed uses SingleRange for cloud, Off for cache. DoImport(nullptr, EPartialBlockRequestMode::Mixed, 0.1, 128u, -1.0, 0u, false, false); } SUBCASE("cache_populate_and_hit") { // First import: CidStore empty -> blocks downloaded and written to cache. ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); CHECK(CacheStats.PutBlobCount > 0); // Re-import with Force=true: HasAttachment overridden, blocks served from cache. ResetCacheStats(); ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true); CHECK(CacheStats.PutBlobCount == 0); CHECK(CacheStats.TotalRequestCount > 0); } SUBCASE("cache_no_populate_flag") { ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/false); CHECK(CacheStats.PutBlobCount == 0); } SUBCASE("mode_zencacheonly_cache_multirange") { // Pre-populate; re-import via ZenCacheOnly. All chunks needed -> FullBlockIndexes path (GetBuildBlob). ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); ResetCacheStats(); DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 128u, false, true); CHECK(CacheStats.TotalRequestCount > 0); } SUBCASE("mode_zencacheonly_cache_singlerange") { // Pre-populate; re-import via ZenCacheOnly with CacheMaxRangeCountPerRequest=1. All chunks needed -> GetBuildBlob (full-blob). ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); ResetCacheStats(); DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 1u, false, true); CHECK(CacheStats.TotalRequestCount > 0); } SUBCASE("mode_all_cache_and_cloud_multirange") { // Pre-populate cache; All mode uses multi-range for both the cache and cloud paths. ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); ResetCacheStats(); ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true); CHECK(CacheStats.TotalRequestCount > 0); } SUBCASE("partial_block_cloud_multirange") { ScopedTemporaryDirectory PartialExportDir; std::shared_ptr PartialRemoteStore = SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path()); // Seeding even-indexed chunks (0, 2, 4) leaves odd ones (1, 3, 5) absent in // ImportCidStore. Three non-adjacent needed positions -> three BlockRangeDescriptors. IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); CHECK(BlockHash != IoHash::Zero); SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash); // StoreMaxRangeCountPerRequest=128 -> all three ranges sent in one LoadAttachmentRanges call. Ref PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_multi_{}", OpJobIndex++), {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = ImportCidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = nullptr, .CacheBuildId = CacheBuildId, .Oplog = *PartialOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = false, .IgnoreMissingAttachments = false, .CleanOplog = false, .PartialBlockRequestMode = EPartialBlockRequestMode::All, .PopulateCache = false, .StoreLatencySec = 0.001, .StoreMaxRangeCountPerRequest = 128u, .CacheLatencySec = -1.0, .CacheMaxRangeCountPerRequest = 0u, .OptionalJobContext = &OpJobContext}); } SUBCASE("partial_block_cloud_singlerange") { // Same block layout as partial_block_cloud_multirange but StoreMaxRangeCountPerRequest=1. // DownloadPartialBlock issues one LoadAttachmentRanges call per range. ScopedTemporaryDirectory PartialExportDir; std::shared_ptr PartialRemoteStore = SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path()); IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); CHECK(BlockHash != IoHash::Zero); SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash); Ref PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_single_{}", OpJobIndex++), {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = ImportCidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = nullptr, .CacheBuildId = CacheBuildId, .Oplog = *PartialOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = false, .IgnoreMissingAttachments = false, .CleanOplog = false, .PartialBlockRequestMode = EPartialBlockRequestMode::All, .PopulateCache = false, .StoreLatencySec = 0.001, .StoreMaxRangeCountPerRequest = 1u, .CacheLatencySec = -1.0, .CacheMaxRangeCountPerRequest = 0u, .OptionalJobContext = &OpJobContext}); } SUBCASE("partial_block_cache_multirange") { ScopedTemporaryDirectory PartialExportDir; std::shared_ptr PartialRemoteStore = SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path()); IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); CHECK(BlockHash != IoHash::Zero); // Phase 1: full block download from remote populates the cache. { Ref Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p1_{}", OpJobIndex++), {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = ImportCidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, .Oplog = *Phase1Oplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = false, .IgnoreMissingAttachments = false, .CleanOplog = false, .PartialBlockRequestMode = EPartialBlockRequestMode::All, .PopulateCache = true, .StoreLatencySec = 0.001, .StoreMaxRangeCountPerRequest = 128u, .CacheLatencySec = 0.001, .CacheMaxRangeCountPerRequest = 128u, .OptionalJobContext = &OpJobContext}); CHECK(CacheStats.PutBlobCount > 0); } ResetCacheStats(); // Phase 2: fresh CidStore with even chunks seeded; CacheMaxRangeCountPerRequest=128 -> GetBuildBlobRanges. GcManager Phase2Gc; CidStore Phase2CidStore(Phase2Gc); CidStoreConfiguration Phase2CidConfig = {.RootDirectory = TempDir.Path() / "partial_cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; Phase2CidStore.Initialize(Phase2CidConfig); SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash); Ref Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p2_{}", OpJobIndex++), {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = Phase2CidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, .Oplog = *Phase2Oplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = false, .IgnoreMissingAttachments = false, .CleanOplog = false, .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly, .PopulateCache = false, .StoreLatencySec = 0.001, .StoreMaxRangeCountPerRequest = 128u, .CacheLatencySec = 0.001, .CacheMaxRangeCountPerRequest = 128u, .OptionalJobContext = &OpJobContext}); CHECK(CacheStats.TotalRequestCount > 0); } SUBCASE("partial_block_cache_singlerange") { ScopedTemporaryDirectory PartialExportDir; std::shared_ptr PartialRemoteStore = SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path()); IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); CHECK(BlockHash != IoHash::Zero); // Phase 1: full block download from remote into cache. { Ref Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p1_{}", OpJobIndex++), {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = ImportCidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, .Oplog = *Phase1Oplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = false, .IgnoreMissingAttachments = false, .CleanOplog = false, .PartialBlockRequestMode = EPartialBlockRequestMode::All, .PopulateCache = true, .StoreLatencySec = 0.001, .StoreMaxRangeCountPerRequest = 128u, .CacheLatencySec = 0.001, .CacheMaxRangeCountPerRequest = 128u, .OptionalJobContext = &OpJobContext}); CHECK(CacheStats.PutBlobCount > 0); } ResetCacheStats(); // Phase 2: CacheMaxRangeCountPerRequest=1 -> GetBuildBlob with range offset, called per needed range. GcManager Phase2Gc; CidStore Phase2CidStore(Phase2Gc); CidStoreConfiguration Phase2CidConfig = {.RootDirectory = TempDir.Path() / "partial_cas_single", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; Phase2CidStore.Initialize(Phase2CidConfig); SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash); Ref Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p2_{}", OpJobIndex++), {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = Phase2CidStore, .RemoteStore = *PartialRemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, .Oplog = *Phase2Oplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = false, .IgnoreMissingAttachments = false, .CleanOplog = false, .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly, .PopulateCache = false, .StoreLatencySec = 0.001, .StoreMaxRangeCountPerRequest = 128u, .CacheLatencySec = 0.001, .CacheMaxRangeCountPerRequest = 1u, .OptionalJobContext = &OpJobContext}); CHECK(CacheStats.TotalRequestCount > 0); } } static Ref MakeTestProject(CidStore& CidStore, GcManager& Gc, const std::filesystem::path& TempDir, std::unique_ptr& OutProjectStore) { using namespace std::literals; CidStoreConfiguration CidConfig = {.RootDirectory = TempDir / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; CidStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir / "projectstore"; OutProjectStore = std::make_unique(CidStore, BasePath, Gc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir / "root"; std::filesystem::path EngineRootDir = TempDir / "engine"; std::filesystem::path ProjectRootDir = TempDir / "game"; std::filesystem::path ProjectFilePath = TempDir / "game" / "game.uproject"; return Ref(OutProjectStore->NewProject(BasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); } static void RunSaveOplog(CidStore& CidStore, ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, WorkerThreadPool& NetworkPool, WorkerThreadPool& WorkerPool, const std::filesystem::path& ExportDir, const std::string& Name, size_t MaxBlockSize, size_t MaxChunksPerBlock, size_t MaxChunkEmbedSize, bool EmbedLooseFiles, bool ForceUpload, bool IgnoreMissingAttachments, JobContext* OptionalContext, bool ForceDisableBlocks, std::shared_ptr* OutRemoteStore = nullptr) { FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = MaxChunksPerBlock, .MaxChunkEmbedSize = MaxChunkEmbedSize, .ChunkFileSizeLimit = 64u * 1024u * 1024u}, /*.FolderPath =*/ExportDir, /*.Name =*/Name, /*.OptionalBaseName =*/std::string(), /*.ForceDisableBlocks =*/ForceDisableBlocks, /*.ForceEnableTempBlocks =*/false}; std::shared_ptr RemoteStore = CreateFileRemoteStore(Log(), Options); if (OutRemoteStore) { *OutRemoteStore = RemoteStore; } SaveOplog(Log(), CidStore, *RemoteStore, Project, Oplog, NetworkPool, WorkerPool, Options.MaxBlockSize, Options.MaxChunksPerBlock, Options.MaxChunkEmbedSize, Options.ChunkFileSizeLimit, EmbedLooseFiles, ForceUpload, IgnoreMissingAttachments, OptionalContext); } TEST_CASE("project.store.export.no_attachments_needed") { // With no binary attachments, UploadAttachments reports "No attachments needed". using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); Ref Oplog = Project->NewOplog("oplog_no_att", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; CapturingJobContext Ctx; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_no_att", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/true, /*ForceUpload=*/true, /*IgnoreMissingAttachments=*/false, &Ctx, /*ForceDisableBlocks=*/false); CHECK(Ctx.HasMessage("No attachments needed")); } TEST_CASE("project.store.embed_loose_files_true") { // EmbedLooseFiles=true: file-op entries are rewritten with a BinaryAttachment field. Round-trip must succeed. using namespace projectstore_testutils; using namespace std::literals; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path RootDir = TempDir.Path() / "root"; Ref Oplog = Project->NewOplog("oplog_embed_true", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry( CreateFilesOplogPackage(Oid::NewOid(), RootDir, CreateFileAttachments(RootDir, std::initializer_list{1024, 2048}))); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr RemoteStore; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_embed_true", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore); Ref ImportOplog = Project->NewOplog("oplog_embed_true_import", {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.embed_loose_files_false" * doctest::skip()) // superseded by buildcontainer.embed_loose_files_false_no_rewrite { // EmbedLooseFiles=false: file-op entries pass through unrewritten. Round-trip must succeed. using namespace projectstore_testutils; using namespace std::literals; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path RootDir = TempDir.Path() / "root"; Ref Oplog = Project->NewOplog("oplog_embed_false", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry( CreateFilesOplogPackage(Oid::NewOid(), RootDir, CreateFileAttachments(RootDir, std::initializer_list{1024, 2048}))); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr RemoteStore; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_embed_false", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/false, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore); Ref ImportOplog = Project->NewOplog("oplog_embed_false_import", {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.missing_attachment_ignored" * doctest::skip()) // superseded by buildcontainer.ignore_missing_file_attachment_warn { using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path RootDir = TempDir.Path() / "root"; auto FileAtts = CreateFileAttachments(RootDir, std::initializer_list{512, 1024}); Ref Oplog = Project->NewOplog("oplog_missing_att", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); for (const auto& [Id, Path] : FileAtts) { std::filesystem::remove(Path); } TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; CapturingJobContext Ctx; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_missing_att", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/true, &Ctx, /*ForceDisableBlocks=*/false); CHECK(Ctx.HasMessage("Missing attachment")); } TEST_CASE("project.store.export.missing_chunk_in_cidstore" * doctest::skip()) // superseded by buildcontainer.ignore_missing_binary_attachment_warn/throw { using namespace projectstore_testutils; using namespace std::literals; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); IoBuffer FakeData = CreateRandomBlob(256); IoHash FakeHash = IoHash::HashBuffer(FakeData); CbObjectWriter Object; Object << "key"sv << OidAsString(Oid::NewOid()); Object.BeginArray("bulkdata"sv); { Object.BeginObject(); Object << "id"sv << Oid::NewOid(); Object << "type"sv << "Standard"sv; Object.AddBinaryAttachment("data"sv, FakeHash); Object.EndObject(); } Object.EndArray(); CbPackage Package; Package.SetObject(Object.Save()); Ref Oplog = Project->NewOplog("oplog_missing_cid", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(Package); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; CHECK_THROWS(RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_missing_cid", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false)); } TEST_CASE("project.store.export.large_file_attachment_direct") { // File > 2 x MaxChunkEmbedSize: classified as a direct large attachment (no compression attempt). Round-trip must succeed. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path RootDir = TempDir.Path() / "root"; // 96 KB > 2 x 32 KB -> direct large attachment. auto FileAtts = CreateFileAttachments(RootDir, std::initializer_list{96u * 1024u}); Ref Oplog = Project->NewOplog("oplog_large_direct", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunkEmbedSize = 32u * 1024u; std::shared_ptr RemoteStore; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_large_direct", 64u * 1024u, 1000, MaxChunkEmbedSize, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore); Ref ImportOplog = Project->NewOplog("oplog_large_direct_import", {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.large_file_attachment_via_temp") { // File with MaxChunkEmbedSize < size <= 2xMaxChunkEmbedSize: compressed to a temp buffer; // if still large (incompressible), goes to OnLargeAttachment. Round-trip must succeed. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path RootDir = TempDir.Path() / "root"; // 48 KB: 32 KB < 48 KB <= 64 KB -> temp-compression path; incompressible data stays > 32 KB. auto FileAtts = CreateFileAttachments(RootDir, std::initializer_list{48u * 1024u}); Ref Oplog = Project->NewOplog("oplog_large_via_temp", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunkEmbedSize = 32u * 1024u; std::shared_ptr RemoteStore; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_large_via_temp", 64u * 1024u, 1000, MaxChunkEmbedSize, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore); Ref ImportOplog = Project->NewOplog("oplog_large_via_temp_import", {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.large_chunk_from_cidstore") { // Bulkdata attachment in CidStore with compressed size > MaxChunkEmbedSize -> OnLargeAttachment. Round-trip must succeed. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); // 64 KB with None encoding -> compressed ~ 64 KB > MaxChunkEmbedSize = 32 KB. auto Attachments = CreateAttachments(std::initializer_list{64u * 1024u}, OodleCompressionLevel::None); Ref Oplog = Project->NewOplog("oplog_large_cid", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), Attachments)); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunkEmbedSize = 32u * 1024u; std::shared_ptr RemoteStore; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_large_cid", 64u * 1024u, 1000, MaxChunkEmbedSize, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore); Ref ImportOplog = Project->NewOplog("oplog_large_cid_import", {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.block_reuse") { // Second export to the same store: FindReuseBlocks matches existing blocks; no new blocks are written. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); // 20 KB with None encoding: compressed ~ 20 KB < MaxChunkEmbedSize = 32 KB -> goes into a block. Ref Oplog = Project->NewOplog("oplog_reuse", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( Oid::NewOid(), CreateAttachments(std::initializer_list{20u * 1024u, 20u * 1024u}, OodleCompressionLevel::None))); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunkEmbedSize = 32u * 1024u; constexpr size_t MaxBlockSize = 64u * 1024u; std::shared_ptr RemoteStore; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_reuse", MaxBlockSize, 1000, MaxChunkEmbedSize, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore); RemoteProjectStore::GetKnownBlocksResult KnownAfterFirst = RemoteStore->GetKnownBlocks(); REQUIRE(!KnownAfterFirst.Blocks.empty()); std::vector BlockHashesAfterFirst; for (const ChunkBlockDescription& B : KnownAfterFirst.Blocks) { BlockHashesAfterFirst.push_back(B.BlockHash); } SaveOplog(Log(), CidStore, *RemoteStore, *Project, *Oplog, NetworkPool, WorkerPool, MaxBlockSize, 1000, MaxChunkEmbedSize, 64u * 1024u * 1024u, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr); RemoteProjectStore::GetKnownBlocksResult KnownAfterSecond = RemoteStore->GetKnownBlocks(); std::vector BlockHashesAfterSecond; for (const ChunkBlockDescription& B : KnownAfterSecond.Blocks) { BlockHashesAfterSecond.push_back(B.BlockHash); } std::sort(BlockHashesAfterFirst.begin(), BlockHashesAfterFirst.end()); std::sort(BlockHashesAfterSecond.begin(), BlockHashesAfterSecond.end()); CHECK(BlockHashesAfterFirst == BlockHashesAfterSecond); } TEST_CASE("project.store.export.max_chunks_per_block") { // MaxChunksPerBlock=2 with 3 attachments from one op -> at least 2 blocks produced. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); // 2 KB with None encoding: compressed ~ 2 KB < MaxChunkEmbedSize = 4 KB -> enters block assembly. Ref Oplog = Project->NewOplog("oplog_max_chunks", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( Oid::NewOid(), CreateAttachments(std::initializer_list{2u * 1024u, 2u * 1024u, 2u * 1024u}, OodleCompressionLevel::None))); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunksPerBlock = 2; constexpr size_t MaxBlockSize = 1u * 1024u * 1024u; constexpr size_t MaxChunkEmbedSize = 4u * 1024u; std::shared_ptr RemoteStore; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_max_chunks", MaxBlockSize, MaxChunksPerBlock, MaxChunkEmbedSize, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore); RemoteProjectStore::GetKnownBlocksResult KnownBlocks = RemoteStore->GetKnownBlocks(); CHECK(KnownBlocks.Blocks.size() >= 2); Ref ImportOplog = Project->NewOplog("oplog_max_chunks_import", {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.max_data_per_block") { // Verifies ComposeBlocks respects UsableBlockSize = MaxBlockSize - MaxHeaderSize. // With MaxBlockSize=7168, MaxChunksPerBlock=32: MaxHeaderSize=129, UsableBlockSize=7039. // Oids[1] contributes 7041 compressed bytes (> 7039) to force a block boundary at that exact limit. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); Ref Oplog = Project->NewOplog("oplog_max_data_per_block", {}); REQUIRE(Oplog); std::vector Oids; Oids.push_back(Oid::NewOid()); Oids.push_back(Oid::NewOid()); Oids.push_back(Oid::NewOid()); Oids.push_back(Oid::NewOid()); Oids.push_back(Oid::NewOid()); std::sort(Oids.begin(), Oids.end()); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oids[0], CreateAttachments(std::initializer_list{2u * 1024u}, OodleCompressionLevel::None))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oids[1], CreateAttachments(std::initializer_list{3u * 1024u, 2u * 1024u, 2u * 1024u, 875u, 875u, 875u}, OodleCompressionLevel::None))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oids[2], CreateAttachments(std::initializer_list{875u, 875u}, OodleCompressionLevel::None))); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( Oids[3], CreateAttachments(std::initializer_list{875u, 875u, 875u, 875u, 875u, 875u}, OodleCompressionLevel::None))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oids[4], CreateAttachments(std::initializer_list{1676, 1678}, OodleCompressionLevel::None))); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxChunksPerBlock = 32; constexpr size_t MaxBlockSize = 7u * 1024u; constexpr size_t MaxChunkEmbedSize = 3u * 1024u; std::shared_ptr RemoteStore; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_max_data_per_block", MaxBlockSize, MaxChunksPerBlock, MaxChunkEmbedSize, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore); RemoteProjectStore::GetKnownBlocksResult KnownBlocks = RemoteStore->GetKnownBlocks(); CHECK(KnownBlocks.Blocks.size() >= 2); Ref ImportOplog = Project->NewOplog("oplog_max_data_per_block_import", {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.export.file_deleted_between_phases") { // File exists during RewriteOp but is deleted before AllowChunking workers run. // With IgnoreMissingAttachments=true the export continues. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path RootDir = TempDir.Path() / "root"; auto FileAtts = CreateFileAttachments(RootDir, std::initializer_list{512, 1024}); Ref Oplog = Project->NewOplog("oplog_file_deleted", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); std::vector FilePaths; for (const auto& [Id, Path] : FileAtts) { FilePaths.push_back(Path); } // Deletes files when "Rewrote" arrives, before AllowChunking workers run. struct DeleteOnRewriteContext : public CapturingJobContext { std::vector* Paths = nullptr; void ReportMessage(std::string_view Message) override { CapturingJobContext::ReportMessage(Message); if (Message.find("Rewrote") != std::string_view::npos && Paths) { for (const auto& P : *Paths) { std::filesystem::remove(P); } } } }; TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; DeleteOnRewriteContext Ctx; Ctx.Paths = &FilePaths; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_file_deleted", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/true, &Ctx, /*ForceDisableBlocks=*/false); CHECK(Ctx.HasMessage("Missing attachment")); for (const auto& P : FilePaths) { CHECK(!std::filesystem::exists(P)); } } TEST_CASE("project.store.embed_loose_files_zero_data_hash") { // File-op entries with "data": IoHash::Zero (unresolved marker) trigger RewriteOp to // read from disk and replace with a resolved BinaryAttachment. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path RootDir = TempDir.Path() / "root"; auto FileAtts = CreateFileAttachments(RootDir, std::initializer_list{512, 1024}); Ref Oplog = Project->NewOplog("oplog_zero_data_hash", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackageWithZeroDataHash(Oid::NewOid(), RootDir, FileAtts)); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr RemoteStore; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_zero_data_hash", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore); Ref ImportOplog = Project->NewOplog("oplog_zero_data_hash_import", {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); } TEST_CASE("project.store.embed_loose_files_already_resolved") { // After an export->import round-trip, oplog entries carry resolved "data": BinaryAttachment(H). // A re-export must preserve those fields without re-reading from disk. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir1; ScopedTemporaryDirectory ExportDir2; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path RootDir = TempDir.Path() / "root"; auto FileAtts = CreateFileAttachments(RootDir, std::initializer_list{512, 1024}); Ref Oplog = Project->NewOplog("oplog_already_resolved", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr RemoteStore1; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir1.Path(), "oplog_already_resolved", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore1); Ref ImportOplog = Project->NewOplog("oplog_already_resolved_import", {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore1, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed}); RunSaveOplog(CidStore, *Project, *ImportOplog, NetworkPool, WorkerPool, ExportDir2.Path(), "oplog_already_resolved_reexport", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/true, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false); } TEST_CASE("project.store.import.missing_attachment") { // Export a small oplog with ForceDisableBlocks=true (only loose .blob files), delete one // attachment, then test both sides of IgnoreMissingAttachments. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); Ref Oplog = Project->NewOplog("oplog_missing_att", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{512, 1024}))); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{2048, 3000}))); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr RemoteStore; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_missing_att", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/false, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/true, &RemoteStore); // Find and delete one .blob attachment file from the remote store directory. std::filesystem::path DeletedBlob; for (const auto& Entry : std::filesystem::recursive_directory_iterator(ExportDir.Path())) { if (Entry.path().extension() == ".blob") { DeletedBlob = Entry.path(); break; } } REQUIRE(!DeletedBlob.empty()); std::error_code Ec; std::filesystem::remove(DeletedBlob, Ec); REQUIRE(!Ec); SUBCASE("throws_when_not_ignored") { Ref ImportOplog = Project->NewOplog("oplog_missing_att_throw", {}); REQUIRE(ImportOplog); CapturingJobContext Ctx; CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = true, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, .OptionalJobContext = &Ctx}), RemoteStoreError); } SUBCASE("succeeds_when_ignored") { Ref ImportOplog = Project->NewOplog("oplog_missing_att_ignore", {}); REQUIRE(ImportOplog); CapturingJobContext Ctx; CHECK_NOTHROW(LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = true, .IgnoreMissingAttachments = true, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, .OptionalJobContext = &Ctx})); CHECK(Ctx.HasMessage("Failed to load attachments")); } } TEST_CASE("project.store.import.error.load_container_failure") { // LoadContainer() on a nonexistent path returns non-zero ErrorCode -> LoadOplog throws RemoteStoreError. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path NonExistentPath = TempDir.Path() / "does_not_exist"; FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024u, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = 32u * 1024u, .ChunkFileSizeLimit = 64u * 1024u * 1024u}, /*.FolderPath =*/NonExistentPath, /*.Name =*/"load_container_failure", /*.OptionalBaseName =*/std::string(), /*.ForceDisableBlocks =*/false, /*.ForceEnableTempBlocks =*/false}; std::shared_ptr RemoteStore = CreateFileRemoteStore(Log(), Options); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; Ref ImportOplog = Project->NewOplog("load_container_failure_import", {}); REQUIRE(ImportOplog); CapturingJobContext Ctx; CHECK_THROWS_AS(LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = false, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, .OptionalJobContext = &Ctx}), RemoteStoreError); } TEST_CASE("project.store.blockcomposer.path_a_standalone_block") { // Path A: one op with exactly MaxChunksPerBlock chunks -> emitted as a standalone block without merging into pending. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); std::vector Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4)}; std::vector Sizes = {100, 100, 100, 100}; std::vector Keys = {Op1, Op1, Op1, Op1}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 1); CHECK(Blocks[0].size() == 4); CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[0][3] == MakeTestHash(4)); } TEST_CASE("project.store.blockcomposer.path_b_fits_pending") { // Path B: a single op whose chunks fit in the empty pending block. // No flush occurs during processing; the final flush emits the one pending block. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); std::vector Hashes = {MakeTestHash(1), MakeTestHash(2)}; std::vector Sizes = {60, 80}; // each <= MaxChunkEmbedSize (100); sum=140 << UsableSize (1000) std::vector Keys = {Op1, Op1}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 1); CHECK(Blocks[0].size() == 2); CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[0][1] == MakeTestHash(2)); } TEST_CASE("project.store.blockcomposer.path_b_exact_count_fill") { // Path B: pending reaches MaxChunksPerBlock exactly -> immediate flush, no separate final flush. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); std::vector Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4)}; std::vector Sizes = {100, 100, 100, 100}; std::vector Keys = {Op1, Op1, Op2, Op2}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 1); CHECK(Blocks[0].size() == 4); CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[0][3] == MakeTestHash(4)); } TEST_CASE("project.store.blockcomposer.path_c_75pct_flush") { // Path C: pending is >75% full when the next op doesn't fit -> pending flushed first, new op placed via Path B. // UsableSize=100, threshold=75 bytes; Op1=80 bytes > 75%. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 100; // 75% threshold = 75 bytes constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); // Op1: 80 bytes -> Path B, pending = {80 bytes, 1 chunk} (80 > 75) // Op2: 30 bytes -> does not fit (80+30=110 > 100) and 80 > 75 -> Path C flush, // then Path B, pending = {30 bytes} -> final flush std::vector Hashes = {MakeTestHash(1), MakeTestHash(2)}; std::vector Sizes = {80, 30}; std::vector Keys = {Op1, Op2}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 2); CHECK(Blocks[0].size() == 1); CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[1].size() == 1); CHECK(Blocks[1][0] == MakeTestHash(2)); } TEST_CASE("project.store.blockcomposer.path_d_partial_fill") { // Path D: pending <=75% full but chunk count is the binding constraint. Greedy fill adds chunks until count capacity, then flushes. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; // 75% threshold = 750 bytes constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); // Op1: 3 x 100 bytes -> Path B, pending = {3 chunks, 300 bytes} (300 <= 750) // Op2: 2 x 100 bytes -> 3+2=5 > MaxChunks=4; 300+200=500 <= 1000; 300 <= 750 -> Path D // D adds op2[0] to pending (4 chunks, count capacity reached), flushes -> block 1 // Remaining op2[1] -> Path B (pending empty) -> final flush -> block 2 std::vector Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4), MakeTestHash(5)}; std::vector Sizes = {100, 100, 100, 100, 100}; std::vector Keys = {Op1, Op1, Op1, Op2, Op2}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 2); CHECK(Blocks[0].size() == 4); CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[0][1] == MakeTestHash(2)); CHECK(Blocks[0][2] == MakeTestHash(3)); CHECK(Blocks[0][3] == MakeTestHash(4)); CHECK(Blocks[1].size() == 1); CHECK(Blocks[1][0] == MakeTestHash(5)); } TEST_CASE("project.store.blockcomposer.cancellation") { // IsCancelledFunc returns true on the second outer-loop iteration. // Op1 (4 chunks, Path A) is fully emitted before cancellation; Op2 is never started. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; constexpr uint64_t MaxChunks = 4; int CallCount = 0; remotestore_impl::BlockComposer::Configuration Config = MakeTestConfig(UsableSize, MaxChunks); Config.IsCancelledFunc = [&]() { return ++CallCount > 1; }; remotestore_impl::BlockComposer Composer(Config); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); std::vector Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4), MakeTestHash(5), MakeTestHash(6)}; std::vector Sizes = {100, 100, 100, 100, 100, 100}; std::vector Keys = {Op1, Op1, Op1, Op1, Op2, Op2}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 1); CHECK(Blocks[0].size() == 4); } TEST_CASE("project.store.blockcomposer.final_flush") { // Three ops with all chunks fitting in pending (no mid-stream flush) -> single block from final flush. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); Oid Op3 = MakeTestOid(3); std::vector Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3)}; std::vector Sizes = {60, 80, 70}; // each <= MaxChunkEmbedSize (100); sum=210 << UsableSize (1000) std::vector Keys = {Op1, Op2, Op3}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 1); CHECK(Blocks[0].size() == 3); CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[0][1] == MakeTestHash(2)); CHECK(Blocks[0][2] == MakeTestHash(3)); } TEST_CASE("project.store.blockcomposer.path_b_b_c") { // Path B -> Path B -> Path C: two ops accumulate past 75% threshold; third op triggers Path C flush. // UsableSize=200, threshold=150; two ops of 90 bytes each accumulate 180 bytes, exceeding threshold. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 200; // 75% threshold = 150 bytes constexpr uint64_t MaxChunks = 8; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); Oid Op3 = MakeTestOid(3); // Op1: 90 bytes -> Path B, pending = {90 bytes, 1 chunk} (90 <= 150) // Op2: 90 bytes -> Path B, pending = {180 bytes, 2 chunks} (180 > 150) // Op3: 60 bytes -> does not fit (180+60=240 > 200) and 180 > 150 -> Path C flush -> block 1 // then Path B, pending = {60 bytes} -> final flush -> block 2 std::vector Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3)}; std::vector Sizes = {90, 90, 60}; std::vector Keys = {Op1, Op2, Op3}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 2); CHECK(Blocks[0].size() == 2); CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[0][1] == MakeTestHash(2)); CHECK(Blocks[1].size() == 1); CHECK(Blocks[1][0] == MakeTestHash(3)); } TEST_CASE("project.store.blockcomposer.path_a_b_final_flush") { // Path A -> Path B -> final flush: first op count-saturates -> standalone block, second op placed via Path B. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); // Op1: 4 x 100 bytes -> MaxChunksPerBlock reached -> CurrentOpFillFullBlock=true -> Path A // Op2: 2 x 100 bytes -> Path B (pending empty) -> final flush std::vector Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4), MakeTestHash(5), MakeTestHash(6)}; std::vector Sizes = {100, 100, 100, 100, 100, 100}; std::vector Keys = {Op1, Op1, Op1, Op1, Op2, Op2}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 2); CHECK(Blocks[0].size() == 4); CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[0][3] == MakeTestHash(4)); CHECK(Blocks[1].size() == 2); CHECK(Blocks[1][0] == MakeTestHash(5)); CHECK(Blocks[1][1] == MakeTestHash(6)); } TEST_CASE("project.store.blockcomposer.empty_input") { // Zero attachments -> no blocks emitted. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); std::vector> Blocks; Composer.Compose({}, {}, {}, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); CHECK(Blocks.empty()); } TEST_CASE("project.store.blockcomposer.single_attachment") { // Single chunk -> Path B into empty pending, final flush emits it. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); std::vector Hashes = {MakeTestHash(1)}; std::vector Sizes = {60}; std::vector Keys = {Op1}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 1); CHECK(Blocks[0].size() == 1); CHECK(Blocks[0][0] == MakeTestHash(1)); } TEST_CASE("project.store.blockcomposer.path_a_size_saturation") { // Path A by size overflow: 60+60 > UsableSize=100; first chunk emitted standalone, second via Path B. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 100; // MaxChunkEmbedSize=100; two 60-byte chunks overflow constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); // chunk0=60, chunk1=60: 60+60=120 > UsableSize=100 -> size overflow after gathering chunk0 std::vector Hashes = {MakeTestHash(1), MakeTestHash(2)}; std::vector Sizes = {60, 60}; std::vector Keys = {Op1, Op1}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 2); CHECK(Blocks[0].size() == 1); CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[1].size() == 1); CHECK(Blocks[1][0] == MakeTestHash(2)); } TEST_CASE("project.store.blockcomposer.path_b_exact_size_fill") { // Path B immediate flush when pending reaches UsableBlockSize exactly (vs count-fill in path_b_exact_count_fill). using namespace projectstore_testutils; constexpr uint64_t UsableSize = 100; constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); // Op1: 60 bytes -> Path B, pending = {60 bytes, 1 chunk} // Op2: 40 bytes -> 60+40=100 == UsableSize -> Path B, immediate size-exact flush std::vector Hashes = {MakeTestHash(1), MakeTestHash(2)}; std::vector Sizes = {60, 40}; std::vector Keys = {Op1, Op2}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 1); CHECK(Blocks[0].size() == 2); CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[0][1] == MakeTestHash(2)); } TEST_CASE("project.store.blockcomposer.path_d_size_limited_greedy") { // Path D where greedy fill is limited by size (not count). MaxChunks=8 ensures size is binding. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 200; // 75% threshold = 150 bytes constexpr uint64_t MaxChunks = 8; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); std::vector Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4)}; std::vector Sizes = {90, 60, 60, 60}; std::vector Keys = {Op1, Op2, Op2, Op2}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 2); CHECK(Blocks[0].size() == 2); CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[0][1] == MakeTestHash(2)); CHECK(Blocks[1].size() == 2); CHECK(Blocks[1][0] == MakeTestHash(3)); CHECK(Blocks[1][1] == MakeTestHash(4)); } TEST_CASE("project.store.blockcomposer.path_a_pending_untouched") { // Path A leaves pending untouched: Op1 in pending, Op2 count-saturates -> standalone block. Final flush emits Op1. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); // Op1: 2 x 60 bytes -> Path B, pending = {2 chunks, 120 bytes} // Op2: 4 x 100 bytes -> count reaches MaxChunks=4 -> CurrentOpFillFullBlock=true -> Path A // Path A emits Op2 standalone as block 1; pending (Op1's chunks) is left untouched. // Final flush emits pending -> block 2. std::vector Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4), MakeTestHash(5), MakeTestHash(6)}; std::vector Sizes = {60, 60, 100, 100, 100, 100}; std::vector Keys = {Op1, Op1, Op2, Op2, Op2, Op2}; std::vector> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector&& B) { Blocks.push_back(std::move(B)); }); REQUIRE(Blocks.size() == 2); CHECK(Blocks[0].size() == 4); CHECK(Blocks[0][0] == MakeTestHash(3)); CHECK(Blocks[0][3] == MakeTestHash(6)); CHECK(Blocks[1].size() == 2); CHECK(Blocks[1][0] == MakeTestHash(1)); CHECK(Blocks[1][1] == MakeTestHash(2)); } // --------------------------------------------------------------------------- // BuildContainer-direct tests // --------------------------------------------------------------------------- TEST_CASE("buildcontainer.public_overload_smoke") { // Verifies the public BuildContainer overload runs successfully and calls AsyncOnBlock. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); Ref Oplog = Project->NewOplog("bc_smoke", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{1024}))); WorkerThreadPool WorkerPool(GetWorkerCount()); std::atomic BlockCallCount{0}; CbObject Container = BuildContainer( Log(), CidStore, *Project, *Oplog, WorkerPool, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/true, [&](CompressedBuffer&&, ChunkBlockDescription&&) { BlockCallCount.fetch_add(1); }, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector>&&) {}, /*EmbedLooseFiles=*/false); CHECK(Container.GetSize() > 0); CHECK(BlockCallCount.load() >= 1); } TEST_CASE("buildcontainer.build_blocks_false_on_block_chunks") { // BuildBlocks=false: small attachments go to OnBlockChunks instead of AsyncOnBlock. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); Ref Oplog = Project->NewOplog("bc_no_blocks", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{1024, 1024, 1024}))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{1024, 1024, 1024}))); WorkerThreadPool WorkerPool(GetWorkerCount()); std::atomic BlockChunksCallCount{0}; CbObject Container = BuildContainer( Log(), CidStore, *Project, *Oplog, WorkerPool, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/false, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/true, [](CompressedBuffer&&, ChunkBlockDescription&&) { CHECK(false); }, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [&](std::vector>&&) { BlockChunksCallCount.fetch_add(1); }, /*EmbedLooseFiles=*/false); CHECK(Container.GetSize() > 0); CHECK(BlockChunksCallCount.load() >= 1); } TEST_CASE("buildcontainer.ignore_missing_binary_attachment_warn") { // A bulk-data op references a hash that is absent from CidStore. // SUBCASE warn: IgnoreMissingAttachments=true -> ReportMessage("Missing attachment ..."). // SUBCASE throw: IgnoreMissingAttachments=false -> std::runtime_error. using namespace projectstore_testutils; using namespace std::literals; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); // Fabricate a hash not in CidStore and build a package that references it as a // BinaryAttachment field but carries no inline attachment data. IoBuffer FakeData = CreateRandomBlob(256); IoHash FakeHash = IoHash::HashBuffer(FakeData); CbObjectWriter Object; Object << "key"sv << OidAsString(Oid::NewOid()); Object.BeginArray("bulkdata"sv); { Object.BeginObject(); Object << "id"sv << Oid::NewOid(); Object << "type"sv << "Standard"sv; Object.AddBinaryAttachment("data"sv, FakeHash); Object.EndObject(); } Object.EndArray(); CbPackage Package; Package.SetObject(Object.Save()); Ref Oplog = Project->NewOplog("bc_missing_bin", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(Package); WorkerThreadPool WorkerPool(GetWorkerCount()); SUBCASE("warn") { CapturingJobContext Ctx; BuildContainer( Log(), CidStore, *Project, *Oplog, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/true, /*AllowChunking=*/true, {}, WorkerPool, [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector>&&) {}, /*EmbedLooseFiles=*/false, &Ctx); CHECK(Ctx.HasMessage("Missing attachment")); } SUBCASE("throw") { CHECK_THROWS(BuildContainer( Log(), CidStore, *Project, *Oplog, WorkerPool, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/true, [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector>&&) {}, /*EmbedLooseFiles=*/false)); } } TEST_CASE("buildcontainer.ignore_missing_file_attachment_warn") { // File attachments are created on disk then deleted before BuildContainer runs. // SUBCASE warn: IgnoreMissingAttachments=true -> ReportMessage("Missing attachment ..."). // SUBCASE throw: IgnoreMissingAttachments=false -> exception. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path RootDir = TempDir.Path() / "root"; auto FileAtts = CreateFileAttachments(RootDir, std::initializer_list{512, 1024}); Ref Oplog = Project->NewOplog("bc_missing_file", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); // Delete files before BuildContainer runs so RewriteOp finds them missing. for (const auto& [Id, Path] : FileAtts) { std::filesystem::remove(Path); } WorkerThreadPool WorkerPool(GetWorkerCount()); SUBCASE("warn") { CapturingJobContext Ctx; BuildContainer( Log(), CidStore, *Project, *Oplog, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/true, /*AllowChunking=*/true, {}, WorkerPool, [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector>&&) {}, /*EmbedLooseFiles=*/true, &Ctx); CHECK(Ctx.HasMessage("Missing attachment")); } SUBCASE("throw") { CHECK_THROWS(BuildContainer( Log(), CidStore, *Project, *Oplog, WorkerPool, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/true, [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector>&&) {}, /*EmbedLooseFiles=*/true)); } } TEST_CASE("buildcontainer.zero_byte_file_attachment") { // A zero-byte file on disk is a valid attachment. BuildContainer must process // it without hitting ZEN_ASSERT(UploadAttachment->Size != 0) in // ResolveAttachments. The empty file flows through the compress-inline path // and becomes a LooseUploadAttachment with raw size 0. using namespace projectstore_testutils; using namespace std::literals; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path RootDir = TempDir.Path() / "root"; auto FileAtts = CreateFileAttachments(RootDir, std::initializer_list{512}); Ref Oplog = Project->NewOplog("bc_zero_byte_file", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateFilesOplogPackage(Oid::NewOid(), RootDir, FileAtts)); // Truncate the file to zero bytes after the oplog entry is created. // The file still exists on disk so RewriteOplog's IsFile() check passes, // but MakeFromFile returns a zero-size buffer. std::filesystem::resize_file(FileAtts[0].second, 0); WorkerThreadPool WorkerPool(GetWorkerCount()); CbObject Container = BuildContainer( Log(), CidStore, *Project, *Oplog, WorkerPool, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/true, [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector>&&) {}, /*EmbedLooseFiles=*/true); CHECK(Container.GetSize() > 0); // The zero-byte attachment is packed into a block via the compress-inline path. CbArrayView Blocks = Container["blocks"sv].AsArrayView(); CHECK(Blocks.Num() > 0); } TEST_CASE("buildcontainer.embed_loose_files_false_no_rewrite") { // EmbedLooseFiles=false: RewriteOp is skipped for file-op entries; they pass through // unchanged. Neither AsyncOnBlock nor OnLargeAttachment should fire. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); std::filesystem::path RootDir = TempDir.Path() / "root"; Ref Oplog = Project->NewOplog("bc_embed_false", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry( CreateFilesOplogPackage(Oid::NewOid(), RootDir, CreateFileAttachments(RootDir, std::initializer_list{1024, 2048}))); WorkerThreadPool WorkerPool(GetWorkerCount()); CbObject Container = BuildContainer( Log(), CidStore, *Project, *Oplog, WorkerPool, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/true, [](CompressedBuffer&&, ChunkBlockDescription&&) { CHECK(false); }, [](const IoHash&, TGetAttachmentBufferFunc&&) { CHECK(false); }, [](std::vector>&&) {}, /*EmbedLooseFiles=*/false); CHECK(Container.GetSize() > 0); } TEST_CASE("buildcontainer.allow_chunking_false") { // AllowChunking=false: attachments exceeding ChunkFileSizeLimit skip chunking -> OnLargeAttachment. // AllowChunking=true: same data is chunked, but chunk still exceeds MaxChunkEmbedSize -> OnLargeAttachment; // exercises the AllowChunking branch in FindChunkSizes. // 4 KB attachment: > MaxChunkEmbedSize (2 KB) and > ChunkFileSizeLimit (1 KB). using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); // None encoding: compressed ~ 4 KB > MaxChunkEmbedSize (2 KB) and ChunkFileSizeLimit (1 KB). Ref Oplog = Project->NewOplog("bc_allow_chunk", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{4096}, OodleCompressionLevel::None))); WorkerThreadPool WorkerPool(GetWorkerCount()); constexpr size_t TestMaxBlockSize = 16u * 1024u; constexpr size_t TestMaxChunkEmbedSize = 2u * 1024u; constexpr size_t TestChunkFileSizeLimit = 1u * 1024u; SUBCASE("allow_chunking_false") { std::atomic LargeAttachmentCallCount{0}; BuildContainer( Log(), CidStore, *Project, *Oplog, WorkerPool, TestMaxBlockSize, 1000, TestMaxChunkEmbedSize, TestChunkFileSizeLimit, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/false, [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [&](const IoHash&, TGetAttachmentBufferFunc&&) { LargeAttachmentCallCount.fetch_add(1); }, [](std::vector>&&) {}, /*EmbedLooseFiles=*/false); CHECK(LargeAttachmentCallCount.load() >= 1); } SUBCASE("allow_chunking_true") { // Chunking branch in FindChunkSizes is taken, but the ~4 KB chunk still exceeds MaxChunkEmbedSize -> OnLargeAttachment. std::atomic LargeAttachmentCallCount{0}; BuildContainer( Log(), CidStore, *Project, *Oplog, WorkerPool, TestMaxBlockSize, 1000, TestMaxChunkEmbedSize, TestChunkFileSizeLimit, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/true, [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [&](const IoHash&, TGetAttachmentBufferFunc&&) { LargeAttachmentCallCount.fetch_add(1); }, [](std::vector>&&) {}, /*EmbedLooseFiles=*/false); CHECK(LargeAttachmentCallCount.load() >= 1); } } TEST_CASE("buildcontainer.async_on_block_exception_propagates") { // If AsyncOnBlock throws, the exception must propagate out of BuildContainer. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); Ref Oplog = Project->NewOplog("bc_block_exc", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{1024, 1024, 1024}))); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{1024, 1024, 1024}))); WorkerThreadPool WorkerPool(GetWorkerCount()); CHECK_THROWS_AS(BuildContainer( Log(), CidStore, *Project, *Oplog, WorkerPool, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/true, [](CompressedBuffer&&, ChunkBlockDescription&&) { throw std::runtime_error("inject_block"); }, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector>&&) {}, /*EmbedLooseFiles=*/false), std::runtime_error); } TEST_CASE("buildcontainer.on_large_attachment_exception_propagates") { // OnLargeAttachment exception must propagate. 64 KB with MaxChunkEmbedSize=32 KB -> OnLargeAttachment. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); Ref Oplog = Project->NewOplog("bc_large_exc", {}); REQUIRE(Oplog); // 64 KB with OodleCompressionLevel::None -> compressed ~ 64 KB > MaxChunkEmbedSize (32 KB). Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{64u * 1024u}, OodleCompressionLevel::None))); WorkerThreadPool WorkerPool(GetWorkerCount()); CHECK_THROWS_AS(BuildContainer( Log(), CidStore, *Project, *Oplog, WorkerPool, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/false, [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) { throw std::runtime_error("inject_large"); }, [](std::vector>&&) {}, /*EmbedLooseFiles=*/false), std::runtime_error); } TEST_CASE("buildcontainer.context_cancellation_aborts") { // IsCancelled() returns true from the start; BuildContainer must not crash or throw. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); Ref Oplog = Project->NewOplog("bc_cancel", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{1024, 1024}))); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{1024, 1024}))); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{1024, 1024}))); WorkerThreadPool WorkerPool(GetWorkerCount()); CapturingJobContext Ctx; Ctx.m_Cancel = true; CHECK_NOTHROW(BuildContainer( Log(), CidStore, *Project, *Oplog, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/true, {}, WorkerPool, [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector>&&) {}, /*EmbedLooseFiles=*/false, &Ctx)); } TEST_CASE("buildcontainer.context_progress_reporting") { // BuildContainer calls ReportProgress at least once ("Scanning oplog"). using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); Ref Oplog = Project->NewOplog("bc_progress", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{1024}))); WorkerThreadPool WorkerPool(GetWorkerCount()); CapturingJobContext Ctx; BuildContainer( Log(), CidStore, *Project, *Oplog, 64u * 1024u, 1000, 32u * 1024u, 64u * 1024u * 1024u, /*BuildBlocks=*/true, /*IgnoreMissingAttachments=*/false, /*AllowChunking=*/true, {}, WorkerPool, [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector>&&) {}, /*EmbedLooseFiles=*/false, &Ctx); CHECK(!Ctx.ProgressMessages.empty()); } TEST_CASE("getblocksfromoplog.filtered") { // GetBlocksFromOplog(ContainerObject, IncludeBlockHashes) returns only the requested blocks. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr RemoteStore = SetupExportStore(CidStore, *Project, NetworkPool, WorkerPool, ExportDir.Path()); RemoteProjectStore::LoadContainerResult ContainerResult = RemoteStore->LoadContainer(); REQUIRE(ContainerResult.ErrorCode == 0); std::vector AllBlockHashes = GetBlockHashesFromOplog(ContainerResult.ContainerObject); REQUIRE(!AllBlockHashes.empty()); // Filter to the first block only. std::vector Subset = {AllBlockHashes[0]}; std::vector Filtered = GetBlocksFromOplog(ContainerResult.ContainerObject, Subset); CHECK(Filtered.size() == 1); CHECK(Filtered[0].BlockHash == AllBlockHashes[0]); CHECK(!Filtered[0].ChunkRawHashes.empty()); // Empty include set returns empty result (exercises the no-match branch). std::vector Empty = GetBlocksFromOplog(ContainerResult.ContainerObject, std::span{}); CHECK(Empty.empty()); } // --------------------------------------------------------------------------- // SaveOplog-focused tests // --------------------------------------------------------------------------- TEST_CASE("saveoplog.cancellation") { // IsCancelled() returns true from the start; SaveOplog must not throw. using namespace projectstore_testutils; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); Ref Oplog = Project->NewOplog("oplog_cancel_save", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{1024, 2048}))); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; CapturingJobContext Ctx; Ctx.m_Cancel = true; CHECK_NOTHROW(RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_cancel_save", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/false, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, &Ctx, /*ForceDisableBlocks=*/false)); } // --------------------------------------------------------------------------- // LoadOplog-focused tests // --------------------------------------------------------------------------- TEST_CASE("loadoplog.missing_block_attachment_ignored") { // Export creates a block file; deleting it then loading with IgnoreMissingAttachments=true // must succeed and report the failure via "Failed to download block attachment". using namespace projectstore_testutils; using namespace std::literals; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; GcManager Gc; CidStore CidStore(Gc); std::unique_ptr ProjectStoreDummy; Ref Project = MakeTestProject(CidStore, Gc, TempDir.Path(), ProjectStoreDummy); Ref Oplog = Project->NewOplog("oplog_missing_block", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry( CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list{1024, 1024, 2048, 512}))); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr RemoteStore; RunSaveOplog(CidStore, *Project, *Oplog, NetworkPool, WorkerPool, ExportDir.Path(), "oplog_missing_block", 64u * 1024u, 1000, 32u * 1024u, /*EmbedLooseFiles=*/false, /*ForceUpload=*/false, /*IgnoreMissingAttachments=*/false, /*OptionalContext=*/nullptr, /*ForceDisableBlocks=*/false, &RemoteStore); RemoteProjectStore::GetKnownBlocksResult KnownBlocks = RemoteStore->GetKnownBlocks(); REQUIRE(KnownBlocks.ErrorCode == 0); REQUIRE(!KnownBlocks.Blocks.empty()); for (const ChunkBlockDescription& BlockDesc : KnownBlocks.Blocks) { std::string HexStr = BlockDesc.BlockHash.ToHexString(); std::filesystem::path BlockPath = ExportDir.Path() / HexStr.substr(0, 3) / HexStr.substr(3, 2) / (HexStr.substr(5) + ".blob"); REQUIRE(std::filesystem::exists(BlockPath)); std::filesystem::remove(BlockPath); } CapturingJobContext Ctx; Ref ImportOplog = Project->NewOplog("oplog_missing_block_import", {}); CHECK_NOTHROW(LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = CidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = true, .IgnoreMissingAttachments = true, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, .OptionalJobContext = &Ctx})); CHECK(Ctx.HasMessage("Failed to download block attachment")); } TEST_CASE("loadoplog.clean_oplog_with_populated_cache") { // Second import with CleanOplog=true and a non-null cache exercises the OptionalCache->Flush() path. using namespace projectstore_testutils; using namespace std::literals; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; // Export side. GcManager ExportGc; CidStore ExportCidStore(ExportGc); CidStoreConfiguration ExportCidConfig = {.RootDirectory = TempDir.Path() / "export_cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; ExportCidStore.Initialize(ExportCidConfig); std::filesystem::path ExportBasePath = TempDir.Path() / "export_projectstore"; ProjectStore ExportProjectStore(ExportCidStore, ExportBasePath, ExportGc, ProjectStore::Configuration{}); Ref ExportProject(ExportProjectStore.NewProject(ExportBasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; std::shared_ptr RemoteStore = SetupExportStore(ExportCidStore, *ExportProject, NetworkPool, WorkerPool, ExportDir.Path()); // Import side, starts empty. GcManager ImportGc; CidStore ImportCidStore(ImportGc); CidStoreConfiguration ImportCidConfig = {.RootDirectory = TempDir.Path() / "import_cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; ImportCidStore.Initialize(ImportCidConfig); std::filesystem::path ImportBasePath = TempDir.Path() / "import_projectstore"; ProjectStore ImportProjectStore(ImportCidStore, ImportBasePath, ImportGc, ProjectStore::Configuration{}); Ref ImportProject(ImportProjectStore.NewProject(ImportBasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); const Oid CacheBuildId = Oid::NewOid(); BuildStorageCache::Statistics CacheStats; std::unique_ptr Cache = CreateInMemoryBuildStorageCache(256u, CacheStats); { Ref Phase1Oplog = ImportProject->NewOplog("oplog_clean_cache_p1", {}); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = ImportCidStore, .RemoteStore = *RemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, .Oplog = *Phase1Oplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = false, .IgnoreMissingAttachments = false, .CleanOplog = false, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, .PopulateCache = true}); } { Ref Phase2Oplog = ImportProject->NewOplog("oplog_clean_cache_p2", {}); CHECK_NOTHROW(LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = ImportCidStore, .RemoteStore = *RemoteStore, .OptionalCache = Cache.get(), .CacheBuildId = CacheBuildId, .Oplog = *Phase2Oplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = true, .IgnoreMissingAttachments = false, .CleanOplog = true, .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, .PopulateCache = false})); } } TEST_CASE("project.store.export.block_reuse_fresh_receiver") { // Regression test: after a second export that reuses existing blocks, a fresh import must still // receive all chunks. The bug: FindReuseBlocks erases reused-block chunks from UploadAttachments, // but never adds the reused blocks to the container's "blocks" section. A fresh receiver then // silently misses those chunks because ParseOplogContainer never sees them. using namespace projectstore_testutils; using namespace std::literals; ScopedTemporaryDirectory TempDir; ScopedTemporaryDirectory ExportDir; // -- Export side ---------------------------------------------------------- GcManager ExportGc; CidStore ExportCidStore(ExportGc); CidStoreConfiguration ExportCidConfig = {.RootDirectory = TempDir.Path() / "export_cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; ExportCidStore.Initialize(ExportCidConfig); std::filesystem::path ExportBasePath = TempDir.Path() / "export_projectstore"; ProjectStore ExportProjectStore(ExportCidStore, ExportBasePath, ExportGc, ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; Ref ExportProject(ExportProjectStore.NewProject(ExportBasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); // 20 KB with None encoding: compressed ~ 20 KB < MaxChunkEmbedSize (32 KB) -> packed into blocks. Ref Oplog = ExportProject->NewOplog("oplog_reuse_rt", {}); REQUIRE(Oplog); Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( Oid::NewOid(), CreateAttachments(std::initializer_list{20u * 1024u, 20u * 1024u}, OodleCompressionLevel::None))); TestWorkerPools Pools; WorkerThreadPool& NetworkPool = Pools.NetworkPool; WorkerThreadPool& WorkerPool = Pools.WorkerPool; constexpr size_t MaxBlockSize = 64u * 1024u; constexpr size_t MaxChunksPerBlock = 1000; constexpr size_t MaxChunkEmbedSize = 32u * 1024u; constexpr size_t ChunkFileSizeLimit = 64u * 1024u * 1024u; // First export: creates blocks on disk. FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = MaxChunksPerBlock, .MaxChunkEmbedSize = MaxChunkEmbedSize, .ChunkFileSizeLimit = ChunkFileSizeLimit}, /*.FolderPath =*/ExportDir.Path(), /*.Name =*/std::string("oplog_reuse_rt"), /*.OptionalBaseName =*/std::string(), /*.ForceDisableBlocks =*/false, /*.ForceEnableTempBlocks =*/false}; std::shared_ptr RemoteStore = CreateFileRemoteStore(Log(), Options); SaveOplog(Log(), ExportCidStore, *RemoteStore, *ExportProject, *Oplog, NetworkPool, WorkerPool, MaxBlockSize, MaxChunksPerBlock, MaxChunkEmbedSize, ChunkFileSizeLimit, /*EmbedLooseFiles*/ true, /*ForceUpload*/ false, /*IgnoreMissingAttachments*/ false, /*OptionalContext*/ nullptr); // Verify first export produced blocks. RemoteProjectStore::GetKnownBlocksResult KnownAfterFirst = RemoteStore->GetKnownBlocks(); REQUIRE(!KnownAfterFirst.Blocks.empty()); // Second export to the SAME store: triggers block reuse via GetKnownBlocks. SaveOplog(Log(), ExportCidStore, *RemoteStore, *ExportProject, *Oplog, NetworkPool, WorkerPool, MaxBlockSize, MaxChunksPerBlock, MaxChunkEmbedSize, ChunkFileSizeLimit, /*EmbedLooseFiles*/ true, /*ForceUpload*/ false, /*IgnoreMissingAttachments*/ false, /*OptionalContext*/ nullptr); // Verify the container has no duplicate block entries. { RemoteProjectStore::LoadContainerResult ContainerResult = RemoteStore->LoadContainer(); REQUIRE(ContainerResult.ErrorCode == 0); std::vector BlockHashes = GetBlockHashesFromOplog(ContainerResult.ContainerObject); REQUIRE(!BlockHashes.empty()); std::unordered_set UniqueBlockHashes(BlockHashes.begin(), BlockHashes.end()); CHECK(UniqueBlockHashes.size() == BlockHashes.size()); } // Collect all attachment hashes referenced by the oplog ops. std::unordered_set ExpectedHashes; Oplog->IterateOplogWithKey([&](int, const Oid&, CbObjectView Op) { Op.IterateAttachments([&](CbFieldView FieldView) { ExpectedHashes.insert(FieldView.AsAttachment()); }); }); REQUIRE(!ExpectedHashes.empty()); // -- Import side (fresh, empty CAS) -------------------------------------- GcManager ImportGc; CidStore ImportCidStore(ImportGc); CidStoreConfiguration ImportCidConfig = {.RootDirectory = TempDir.Path() / "import_cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; ImportCidStore.Initialize(ImportCidConfig); std::filesystem::path ImportBasePath = TempDir.Path() / "import_projectstore"; ProjectStore ImportProjectStore(ImportCidStore, ImportBasePath, ImportGc, ProjectStore::Configuration{}); Ref ImportProject(ImportProjectStore.NewProject(ImportBasePath / "proj1"sv, "proj1"sv, RootDir.string(), EngineRootDir.string(), ProjectRootDir.string(), ProjectFilePath.string())); Ref ImportOplog = ImportProject->NewOplog("oplog_reuse_rt_import", {}); REQUIRE(ImportOplog); LoadOplog(LoadOplogContext{.Log = Log(), .ChunkStore = ImportCidStore, .RemoteStore = *RemoteStore, .Oplog = *ImportOplog, .NetworkWorkerPool = NetworkPool, .WorkerPool = WorkerPool, .ForceDownload = true, .IgnoreMissingAttachments = false, .PartialBlockRequestMode = EPartialBlockRequestMode::All}); // Every attachment hash from the original oplog must be present in the import CAS. for (const IoHash& Hash : ExpectedHashes) { CHECK_MESSAGE(ImportCidStore.ContainsChunk(Hash), "Missing chunk after import: ", Hash); } } TEST_SUITE_END(); #endif // ZEN_WITH_TESTS void remoteprojectstore_forcelink() { } } // namespace zen