diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-14 23:12:04 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-14 23:12:04 +0100 |
| commit | 73a3944d15e09b437ad22f6899d8bfdcc96db339 (patch) | |
| tree | 9bf11fec06fcfa71ef006fc3c4f13f4449660fb8 | |
| parent | broke out ChunkAttachments helper (diff) | |
| download | zen-73a3944d15e09b437ad22f6899d8bfdcc96db339.tar.xz zen-73a3944d15e09b437ad22f6899d8bfdcc96db339.zip | |
operator on CompositeBuffer for block building
refactor and clean up
4 files changed, 458 insertions, 420 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 44d52451c..a04063c4c 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -5214,7 +5214,7 @@ BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content, { BlockContent.emplace_back(std::make_pair( Content.ChunkedContent.ChunkHashes[ChunkIndex], - [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> { + [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompositeBuffer> { CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache); ZEN_ASSERT(Chunk); uint64_t RawSize = Chunk.GetSize(); @@ -5224,7 +5224,7 @@ BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content, const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; - return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)}; + return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel).GetCompressed()}; })); } diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp index 11a38917c..0fe3c09ce 100644 --- a/src/zenremotestore/chunking/chunkblock.cpp +++ b/src/zenremotestore/chunking/chunkblock.cpp @@ -352,12 +352,9 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); for (const auto& It : FetchChunks) { - std::pair<uint64_t, CompressedBuffer> Chunk = It.second(It.first); - ZEN_ASSERT_SLOW(Chunk.second.DecodeRawSize() == Chunk.first); - ZEN_ASSERT_SLOW(Chunk.second.DecodeRawHash() == It.first); - ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk.second.Decompress().AsIoBuffer()) == It.first); - uint64_t ChunkSize = 0; - std::span<const SharedBuffer> Segments = Chunk.second.GetCompressed().GetSegments(); + std::pair<uint64_t, CompositeBuffer> Chunk = It.second(It.first); + uint64_t ChunkSize = 0; + std::span<const SharedBuffer> Segments = Chunk.second.GetSegments(); for (const SharedBuffer& Segment : Segments) { ZEN_ASSERT(Segment.IsOwned()); @@ -377,18 +374,6 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, CompressedBuffer CompressedBlock = CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); OutBlock.BlockHash = CompressedBlock.DecodeRawHash(); - -#if ZEN_BUILD_DEBUG - uint64_t VerifyHeaderSize = 0; - ZEN_ASSERT_SLOW(IterateChunkBlock( - CompressedBlock.Decompress(), - [](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) { - ZEN_ASSERT(Chunk.DecodeRawHash() == AttachmentHash); - SharedBuffer Decompressed = Chunk.Decompress(); - ZEN_ASSERT(AttachmentHash == IoHash::HashBuffer(Decompressed.AsIoBuffer())); - }, - VerifyHeaderSize)); -#endif // ZEN_BUILD_DEBUG return CompressedBlock; } @@ -972,8 +957,8 @@ TEST_CASE("chunkblock.block") for (const auto& It : AttachmentsWithId) { Chunks.push_back( - std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { - return {Buffer.DecodeRawSize(), Buffer}; + std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompositeBuffer> { + return {Buffer.DecodeRawSize(), Buffer.GetCompressed()}; })); } ChunkBlockDescription Block; @@ -1007,8 +992,8 @@ TEST_CASE("chunkblock.reuseblocks") for (const auto& It : AttachmentsWithId) { Chunks.push_back( - std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { - return {Buffer.DecodeRawSize(), Buffer}; + std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompositeBuffer> { + return {Buffer.DecodeRawSize(), Buffer.GetCompressed()}; })); } ChunkBlockDescription Block; diff --git a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h index 931bb2097..e3a5f6539 100644 --- a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h +++ b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h @@ -31,7 +31,7 @@ ChunkBlockDescription ParseChunkBlockDescription(const CbObjectView& BlockO std::vector<ChunkBlockDescription> ParseBlockMetadatas(std::span<const CbObject> BlockMetadatas); CbObject BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData); ChunkBlockDescription GetChunkBlockDescription(const SharedBuffer& BlockPayload, const IoHash& RawHash); -typedef std::function<std::pair<uint64_t, CompressedBuffer>(const IoHash& RawHash)> FetchChunkFunc; +typedef std::function<std::pair<uint64_t, CompositeBuffer>(const IoHash& RawHash)> FetchChunkFunc; CompressedBuffer GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock); bool IterateChunkBlock(const SharedBuffer& BlockPayload, diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 51dd686bb..39e2f2413 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -253,6 +253,7 @@ namespace remotestore_impl { const IoHash& FirstAttachmentHash = AttachmentHashes[SortedUploadAttachmentsIndex]; const Oid FirstAttachmentOpKey = AttachmentKeys[SortedUploadAttachmentsIndex]; uint64_t CurrentOpAttachmentsSize = AttachmentSizes[SortedUploadAttachmentsIndex]; + ZEN_ASSERT(CurrentOpAttachmentsSize <= m_Config.MaxChunkEmbedSize); std::vector<IoHash> CurrentOpRawHashes; CurrentOpRawHashes.push_back(FirstAttachmentHash); @@ -272,6 +273,7 @@ namespace remotestore_impl { } const IoHash& NextAttachmentHash = AttachmentHashes[NextSortedUploadAttachmentsIndex]; uint64_t NextOpAttachmentSize = AttachmentSizes[NextSortedUploadAttachmentsIndex]; + ZEN_ASSERT(NextOpAttachmentSize <= m_Config.MaxChunkEmbedSize); if (CurrentOpAttachmentsSize + NextOpAttachmentSize > m_UsableBlockSize) { @@ -362,6 +364,7 @@ namespace remotestore_impl { ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize); ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock); } + ZEN_ASSERT(AddedChunkSize <= CurrentOpAttachmentsSize); ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize); ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock); @@ -410,6 +413,63 @@ namespace remotestore_impl { const uint64_t m_UsableBlockSize = 0; }; + IoBuffer CompressToTempFile(const IoHash& RawHash, + const IoBuffer& RawData, + const std::filesystem::path& AttachmentPath, + OodleCompressor Compressor, + OodleCompressionLevel CompressionLevel) + { + ZEN_ASSERT(!IsFile(AttachmentPath)); + BasicFile CompressedFile; + std::error_code Ec; + CompressedFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to create temp file for blob {} at '{}'", RawHash, AttachmentPath)); + } + + if (RawData.GetSize() < 512u * 1024u) + { + CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), Compressor, CompressionLevel); + if (!CompressedBlob) + { + throw std::runtime_error(fmt::format("Failed to compress blob {}", RawHash)); + } + CompressedFile.Write(CompressedBlob.GetCompressed(), 0); + } + else + { + bool CouldCompress = CompressedBuffer::CompressToStream( + CompositeBuffer(SharedBuffer(RawData)), + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + CompressedFile.Write(RangeBuffer, Offset); + ZEN_UNUSED(SourceOffset, SourceSize); + }, + Compressor, + CompressionLevel); + if (!CouldCompress) + { + // Compressed is larger than source data... + CompressedBuffer CompressedBlob = + CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), OodleCompressor::Mermaid, OodleCompressionLevel::None); + if (!CompressedBlob) + { + throw std::runtime_error(fmt::format("Failed to compress blob {}", RawHash)); + } + CompressedFile.SetFileSize(0); + CompressedFile.Write(CompressedBlob.GetCompressed(), 0); + } + } + IoBuffer TempAttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath); + ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); + CompressedFile.Close(); + ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); + TempAttachmentBuffer.SetDeleteOnClose(true); + ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); + return TempAttachmentBuffer; + } + struct FoundAttachment { std::filesystem::path RawPath; // If not stored in cid @@ -734,7 +794,8 @@ namespace remotestore_impl { } }); } - Work.Wait(200, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + + Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, IsPaused); if (remotestore_impl::IsCancelled(OptionalContext)) { @@ -747,6 +808,16 @@ namespace remotestore_impl { PendingWork, FindChunkSizesTimer.GetElapsedTimeMs()); }); + + if (!AbortFlag.load()) + { + remotestore_impl::ReportProgress(OptionalContext, + "Finding attachments"sv, + "", + UploadAttachments.size(), + 0, + FindChunkSizesTimer.GetElapsedTimeMs()); + } } struct ChunkedFile @@ -818,7 +889,7 @@ namespace remotestore_impl { }); } - Work.Wait(200, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, IsPaused); if (remotestore_impl::IsCancelled(OptionalContext)) { @@ -831,9 +902,190 @@ namespace remotestore_impl { PendingWork, ChunkAttachmentsTimer.GetElapsedTimeMs()); }); + + if (!AbortFlag.load()) + { + remotestore_impl::ReportProgress(OptionalContext, + "Chunking attachments"sv, + "", + AttachmentsToChunk.size(), + 0, + ChunkAttachmentsTimer.GetElapsedTimeMs()); + } return ChunkedFiles; } + void ResolveAttachments(CidStore& ChunkStore, + WorkerThreadPool& WorkerPool, + uint64_t MaxChunkEmbedSize, + const std::filesystem::path& AttachmentTempPath, + std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher>& UploadAttachments, + std::unordered_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher>& LargeChunkAttachments, + std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher>& LooseUploadAttachments, + JobContext* OptionalContext) + { + if (UploadAttachments.empty()) + { + return; + } + Stopwatch UploadAttachmentsTimer; + + RwLock ResolveLock; + + std::atomic<bool> AbortFlag(false); + std::atomic<bool> PauseFlag(false); + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + for (auto& It : UploadAttachments) + { + if (remotestore_impl::IsCancelled(OptionalContext)) + { + AbortFlag.store(true); + break; + } + Work.ScheduleWork( + WorkerPool, + [&ChunkStore, + MaxChunkEmbedSize, + &AttachmentTempPath, + &ResolveLock, + &LargeChunkAttachments, + &LooseUploadAttachments, + UploadAttachment = &It.second, + RawHash = It.first, + OptionalContext](std::atomic<bool>& AbortFlag) { + if (remotestore_impl::IsCancelled(OptionalContext)) + { + AbortFlag.store(true); + } + if (AbortFlag) + { + return; + } + ZEN_ASSERT(UploadAttachment->Size != 0); + if (!UploadAttachment->RawPath.empty()) + { + if (UploadAttachment->Size > (MaxChunkEmbedSize * 2)) + { + // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't + // it will be a loose attachment instead of going into a block + + TGetAttachmentBufferFunc FetchFunc = [RawPath = UploadAttachment->RawPath, + AttachmentTempPath, + UploadAttachment](const IoHash& RawHash) -> CompositeBuffer { + IoBuffer RawData = IoBufferBuilder::MakeFromFile(RawPath); + if (!RawData) + { + throw std::runtime_error( + fmt::format("Failed to read source file for blob {} from '{}'", RawHash, RawPath)); + } + + std::filesystem::path AttachmentPath = AttachmentTempPath; + AttachmentPath.append(RawHash.ToHexString()); + + IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash, + RawData, + AttachmentPath, + OodleCompressor::Mermaid, + OodleCompressionLevel::VeryFast); + TempAttachmentBuffer.SetDeleteOnClose(true); + + ZEN_INFO("Saved temp attachment to '{}', {} ({})", + AttachmentPath, + NiceBytes(UploadAttachment->Size), + NiceBytes(TempAttachmentBuffer.GetSize())); + return CompositeBuffer(SharedBuffer(std::move(TempAttachmentBuffer))); + }; + + RwLock::ExclusiveLockScope _(ResolveLock); + LargeChunkAttachments.insert_or_assign(RawHash, std::move(FetchFunc)); + } + else + { + // Compress inline - check compressed size to see if it should go into a block or not + IoBuffer RawData = IoBufferBuilder::MakeFromFile(UploadAttachment->RawPath); + if (!RawData) + { + throw std::runtime_error( + fmt::format("Failed to read source file for blob {} from '{}'", RawHash, UploadAttachment->RawPath)); + } + + std::filesystem::path TempFilePath = AttachmentTempPath; + TempFilePath.append(RawHash.ToHexString()); + + IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash, + RawData, + TempFilePath, + OodleCompressor::Mermaid, + OodleCompressionLevel::VeryFast); + TempAttachmentBuffer.SetDeleteOnClose(true); + + uint64_t CompressedSize = TempAttachmentBuffer.GetSize(); + + ZEN_INFO("Saved temp attachment to '{}', {} ({})", + TempFilePath, + NiceBytes(UploadAttachment->Size), + NiceBytes(CompressedSize)); + + if (CompressedSize > MaxChunkEmbedSize) + { + TGetAttachmentBufferFunc FetchFunc = [Data = std::move(TempAttachmentBuffer)](const IoHash&) { + return CompositeBuffer(SharedBuffer(std::move(Data))); + }; + + RwLock::ExclusiveLockScope _(ResolveLock); + LargeChunkAttachments.insert_or_assign(RawHash, std::move(FetchFunc)); + } + else + { + UploadAttachment->Size = CompressedSize; + + std::pair<uint64_t, IoBuffer> LooseAttachment(RawData.GetSize(), std::move(TempAttachmentBuffer)); + + RwLock::ExclusiveLockScope _(ResolveLock); + LooseUploadAttachments.insert_or_assign(RawHash, std::move(LooseAttachment)); + } + } + } + else + { + if (UploadAttachment->Size > MaxChunkEmbedSize) + { + TGetAttachmentBufferFunc FetchFunc = [&ChunkStore](const IoHash& RawHash) { + return CompositeBuffer(SharedBuffer(std::move(ChunkStore.FindChunkByCid(RawHash)))); + }; + RwLock::ExclusiveLockScope _(ResolveLock); + LargeChunkAttachments.insert_or_assign(RawHash, std::move(FetchFunc)); + } + } + }); + } + + Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused); + if (remotestore_impl::IsCancelled(OptionalContext)) + { + AbortFlag.store(true); + } + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("{}{} remaining...", AbortFlag.load() ? "Aborting, " : "", PendingWork), + UploadAttachments.size(), + PendingWork, + UploadAttachmentsTimer.GetElapsedTimeMs()); + }); + + if (!AbortFlag.load()) + { + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + "", + UploadAttachments.size(), + 0, + UploadAttachmentsTimer.GetElapsedTimeMs()); + } + } + RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext) { using namespace std::literals; @@ -941,70 +1193,6 @@ namespace remotestore_impl { JobContext* m_OptionalContext; }; - IoBuffer CompressToTempFile(const IoHash& RawHash, - const IoBuffer& RawData, - const std::filesystem::path& AttachmentPath, - OodleCompressor Compressor, - OodleCompressionLevel CompressionLevel) - { - ZEN_ASSERT(!IsFile(AttachmentPath)); - BasicFile CompressedFile; - std::error_code Ec; - CompressedFile.Open(AttachmentPath, BasicFile::Mode::kTruncateDelete, Ec); - if (Ec) - { - throw std::system_error(Ec, fmt::format("Failed to create temp file for blob {} at '{}'", RawHash, AttachmentPath)); - } - - bool CouldCompress = CompressedBuffer::CompressToStream( - CompositeBuffer(SharedBuffer(RawData)), - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset); - CompressedFile.Write(RangeBuffer, Offset); - ZEN_UNUSED(SourceOffset, SourceSize); - // StreamRawBytes += SourceSize; - // StreamCompressedBytes += RangeBuffer.GetSize(); - }, - Compressor, - CompressionLevel); - if (CouldCompress) - { -#if ZEN_BUILD_DEBUG - uint64_t CompressedSize = CompressedFile.FileSize(); - // void* FileHandle = CompressedFile.Handle(); - IoBuffer TempPayload = IoBuffer(IoBuffer::BorrowedFile, CompressedFile.Handle(), 0, CompressedSize); - // ZEN_ASSERT(TempPayload); - // TempPayload.SetDeleteOnClose(true); - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), VerifyRawHash, VerifyRawSize); - ZEN_ASSERT(Compressed); - ZEN_ASSERT(RawHash == VerifyRawHash); - ZEN_ASSERT(RawData.GetSize() == VerifyRawSize); - // CompressedFile.Attach(FileHandle); -#endif // ZEN_BUILD_DEBUG - } - else - { - // Compressed is larger than source data... - CompressedBuffer CompressedBlob = - CompressedBuffer::Compress(SharedBuffer(std::move(RawData)), OodleCompressor::Mermaid, OodleCompressionLevel::None); - if (!CompressedBlob) - { - throw std::runtime_error(fmt::format("Failed to compress blob {}", RawHash)); - } - CompressedFile.SetFileSize(0); - CompressedFile.Write(CompressedBlob.GetCompressed(), 0); - } - IoBuffer TempAttachmentBuffer = IoBufferBuilder::MakeFromFile(AttachmentPath); - ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); - CompressedFile.Close(); - ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); - TempAttachmentBuffer.SetDeleteOnClose(true); - ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)).CompressedBuffer::Decompress()); - return TempAttachmentBuffer; - } - void DownloadAndSaveBlockChunks(LoadOplogContext& Context, Latch& AttachmentsDownloadLatch, Latch& AttachmentsWriteLatch, @@ -2265,7 +2453,7 @@ namespace remotestore_impl { { auto It = BulkBlockAttachmentsToUpload.find(Chunk); ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); - CompressedBuffer ChunkPayload = It->second(It->first).second; + CompositeBuffer ChunkPayload = It->second(It->first).second; if (!ChunkPayload) { RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), @@ -2274,8 +2462,8 @@ namespace remotestore_impl { ChunkBuffers.clear(); break; } - ChunksSize += ChunkPayload.GetCompressedSize(); - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer())); + ChunksSize += ChunkPayload.GetSize(); + ChunkBuffers.emplace_back(SharedBuffer(ChunkPayload.Flatten().AsIoBuffer())); } RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); if (Result.ErrorCode) @@ -2583,268 +2771,33 @@ BuildContainer(CidStore& ChunkStore, } } - RwLock ResolveLock; - std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; + std::unordered_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LargeChunkAttachments; std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments; remotestore_impl::ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); - Stopwatch ResolveAttachmentsProgressTimer; - Latch ResolveAttachmentsLatch(1); - for (auto& It : UploadAttachments) - { - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - return {}; - } - - ResolveAttachmentsLatch.AddCount(1); - - WorkerPool.ScheduleWork( - [&ChunkStore, - UploadAttachment = &It.second, - RawHash = It.first, - &ResolveAttachmentsLatch, - &ResolveLock, - &ChunkedHashes, - &LargeChunkHashes, - &LooseUploadAttachments, - &MissingHashes, - &OnLargeAttachment, - &AttachmentTempPath, - MaxChunkEmbedSize, - ChunkFileSizeLimit, - AllowChunking, - &RemoteResult, - OptionalContext]() { - ZEN_TRACE_CPU("PrepareChunk"); - - auto _ = MakeGuard([&ResolveAttachmentsLatch] { ResolveAttachmentsLatch.CountDown(); }); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - return; - } - - try - { - ZEN_ASSERT(UploadAttachment->Size != 0); - if (!UploadAttachment->RawPath.empty()) - { - if (UploadAttachment->Size > (MaxChunkEmbedSize * 2)) - { - // Assume the compressed file is going to be larger than MaxChunkEmbedSize, even if it isn't - // it will be a loose attachment instead of going into a block - OnLargeAttachment( - RawHash, - [RawPath = UploadAttachment->RawPath, AttachmentTempPath, UploadAttachment]( - const IoHash& RawHash) -> CompositeBuffer { - IoBuffer RawData = IoBufferBuilder::MakeFromFile(RawPath); - if (!RawData) - { - throw std::runtime_error( - fmt::format("Failed to read source file for blob {} from '{}'", RawHash, RawPath)); - } - - std::filesystem::path AttachmentPath = AttachmentTempPath; - AttachmentPath.append(RawHash.ToHexString()); - - IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash, - RawData, - AttachmentPath, - OodleCompressor::Mermaid, - OodleCompressionLevel::VeryFast); - TempAttachmentBuffer.SetDeleteOnClose(true); - -#if ZEN_BUILD_DEBUG - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - ZEN_ASSERT_SLOW( - CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), VerifyRawHash, VerifyRawSize)); - ZEN_ASSERT_SLOW(VerifyRawHash == RawHash); - ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize()); -#endif // ZEN_BUILD_DEBUG - - ZEN_INFO("Saved temp attachment to '{}', {} ({})", - AttachmentPath, - NiceBytes(UploadAttachment->Size), - NiceBytes(TempAttachmentBuffer.GetSize())); - return CompositeBuffer(SharedBuffer(std::move(TempAttachmentBuffer))); - }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); - } - else - { - // Compress inline - check compressed size to see if it should go into a block or not - IoBuffer RawData = IoBufferBuilder::MakeFromFile(UploadAttachment->RawPath); - if (!RawData) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to read attachment {}", UploadAttachment->RawPath), - ""); - return; - } - - std::filesystem::path TempFilePath = AttachmentTempPath; - TempFilePath.append(RawHash.ToHexString()); - - try - { - IoBuffer TempAttachmentBuffer = remotestore_impl::CompressToTempFile(RawHash, - RawData, - TempFilePath, - OodleCompressor::Mermaid, - OodleCompressionLevel::VeryFast); - TempAttachmentBuffer.SetDeleteOnClose(true); -#if ZEN_BUILD_DEBUG - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - ZEN_ASSERT_SLOW( - CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), VerifyRawHash, VerifyRawSize)); - ZEN_ASSERT_SLOW(CompressedBuffer::FromCompressedNoValidate(IoBuffer(TempAttachmentBuffer)) - .CompressedBuffer::Decompress()); - ZEN_ASSERT_SLOW(VerifyRawHash == RawHash); - ZEN_ASSERT_SLOW(VerifyRawSize == RawData.GetSize()); -#endif // ZEN_BUILD_DEBUG - - uint64_t CompressedSize = TempAttachmentBuffer.GetSize(); - - ZEN_INFO("Saved temp attachment to '{}', {} ({})", - TempFilePath, - NiceBytes(UploadAttachment->Size), - NiceBytes(CompressedSize)); - - if (CompressedSize > MaxChunkEmbedSize) - { - OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { - return CompositeBuffer(SharedBuffer(std::move(Data))); - }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); - } - else - { - UploadAttachment->Size = CompressedSize; - { - IoHash VerifyRawHash2; - uint64_t VerifyRawSize2; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempAttachmentBuffer), - VerifyRawHash2, - VerifyRawSize2); - ZEN_ASSERT_SLOW(Compressed.Decompress()); - ZEN_ASSERT_SLOW(VerifyRawHash2 == RawHash); - ZEN_ASSERT_SLOW(VerifyRawSize2 == RawData.GetSize()); - } - - ResolveLock.WithExclusiveLock([RawHash, - RawSize = RawData.GetSize(), - &LooseUploadAttachments, - Data = std::move(TempAttachmentBuffer)]() { - LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); - }); - } - } - catch (const std::system_error& SysEx) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to compress blob {} to temporary file '{}', reason: ({}) {}", - RawHash, - TempFilePath, - SysEx.code().value(), - SysEx.what()), - ""); - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to compress blob {} to temporary file '{}', reason: {}", - RawHash, - TempFilePath, - Ex.what()), - ""); - } - } - } - else - { - if (UploadAttachment->Size > MaxChunkEmbedSize) - { - OnLargeAttachment(RawHash, [&ChunkStore](const IoHash& RawHash) { - return CompositeBuffer(SharedBuffer(std::move(ChunkStore.FindChunkByCid(RawHash)))); - }); - ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); - } - } - } - catch (const std::exception& Ex) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), - fmt::format("Failed to resolve attachment {}", RawHash), - Ex.what()); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - ResolveAttachmentsLatch.CountDown(); - - { - while (!ResolveAttachmentsLatch.Wait(1000)) - { - ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - while (!ResolveAttachmentsLatch.Wait(1000)) - { - Remaining = ResolveAttachmentsLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - "Aborted"sv, - UploadAttachments.size(), - Remaining, - ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); - } - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - "Aborted"sv, - UploadAttachments.size(), - 0, - ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); - return {}; - } - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - fmt::format("{} remaining...", Remaining), - UploadAttachments.size(), - Remaining, - ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); - } - if (UploadAttachments.size() > 0) - { - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - ""sv, - UploadAttachments.size(), - 0, - ResolveAttachmentsProgressTimer.GetElapsedTimeMs()); - } - } + ResolveAttachments(ChunkStore, + WorkerPool, + MaxChunkEmbedSize, + AttachmentTempPath, + UploadAttachments, + LargeChunkAttachments, + LooseUploadAttachments, + OptionalContext); if (remotestore_impl::IsCancelled(OptionalContext)) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return {}; } - for (const auto& It : LargeChunkHashes) + std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; + + for (auto& It : LargeChunkAttachments) { - UploadAttachments.erase(It); + UploadAttachments.erase(It.first); + LargeChunkHashes.insert(It.first); + OnLargeAttachment(It.first, std::move(It.second)); } RwLock BlocksLock; @@ -3022,38 +2975,23 @@ BuildContainer(CidStore& ChunkStore, { uint64_t RawSize = It->second.first; IoBuffer Payload = It->second.second; - - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), VerifyRawHash, VerifyRawSize); - ZEN_ASSERT_SLOW(Compressed.Decompress()); - ZEN_ASSERT_SLOW(VerifyRawHash == AttachmentHash); - ZEN_ASSERT_SLOW(VerifyRawSize == RawSize); - - LooseUploadAttachments.erase(It); - return [RawSize = RawSize, - Payload = SharedBuffer(Payload)](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> { + return [RawSize = RawSize, Payload = Payload](const IoHash& ChunkHash) -> std::pair<uint64_t, CompositeBuffer> { ZEN_UNUSED(ChunkHash); - // CompressedBuffer Compressed = - // CompressedBuffer::FromCompressedNoValidate(Payload.AsIoBuffer()); - IoHash VerifyRawHash; - uint64_t VerifyRawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Payload, VerifyRawHash, VerifyRawSize); - ZEN_ASSERT_SLOW(Compressed.Decompress()); - ZEN_ASSERT_SLOW(VerifyRawHash == ChunkHash); - ZEN_ASSERT_SLOW(VerifyRawSize == RawSize); - return {RawSize, Compressed}; + return {RawSize, CompositeBuffer(SharedBuffer(std::move(Payload)))}; }; } else { - ZEN_ASSERT_SLOW(ChunkStore.ContainsChunk(AttachmentHash)); - return [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> { + return [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompositeBuffer> { IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash); if (!Chunk) { throw std::runtime_error(fmt::format("Failed to find chunk {} in cid store", RawHash)); } + + // These are small chunks - make memory resident + Chunk = IoBufferBuilder::ReadFromFileMaybe(Chunk); + IoHash ValidateRawHash; uint64_t RawSize = 0; CompressedBuffer Compressed = @@ -3066,7 +3004,7 @@ BuildContainer(CidStore& ChunkStore, { throw std::runtime_error(fmt::format("Chunk {} in cid store is malformed (mismatching raw hash)", RawHash)); } - return {RawSize, Compressed}; + return {RawSize, Compressed.GetCompressed()}; }; } }; @@ -3118,7 +3056,9 @@ BuildContainer(CidStore& ChunkStore, return Compressed.GetCompressed(); }); - ResolveLock.WithExclusiveLock([ChunkHash, &LargeChunkHashes]() { LargeChunkHashes.insert(ChunkHash); }); + // ResolveLock.WithExclusiveLock([ChunkHash, &LargeChunkHashes]() { + LargeChunkHashes.insert(ChunkHash); + // }); } else { @@ -3145,11 +3085,12 @@ BuildContainer(CidStore& ChunkStore, ZEN_ASSERT(Source.Offset + Source.Size <= ChunkedFile.Source.GetSize()); return [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( - const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + const IoHash&) -> std::pair<uint64_t, CompositeBuffer> { return {Size, CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), OodleCompressor::Mermaid, - OodleCompressionLevel::None)}; + OodleCompressionLevel::None) + .GetCompressed()}; }; } else @@ -6915,7 +6856,7 @@ TEST_CASE("project.store.blockcomposer.path_b_fits_pending") Oid Op1 = MakeTestOid(1); std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2)}; - std::vector<uint64_t> Sizes = {200, 300}; + std::vector<uint64_t> Sizes = {60, 80}; // each <= MaxChunkEmbedSize (100); sum=140 << UsableSize (1000) std::vector<Oid> Keys = {Op1, Op1}; std::vector<std::vector<IoHash>> Blocks; @@ -6958,19 +6899,22 @@ TEST_CASE("project.store.blockcomposer.path_c_75pct_flush") // Path C: the pending block is >75% full by bytes when a new op arrives that does not fit. // The pending block is flushed first; the new op chunk is then placed via Path B, and // emitted by the final flush. + // + // UsableSize=100 so that 75% threshold=75. MaxChunkEmbedSize is fixed at 100 by MakeTestConfig, + // so Op1=80 bytes exceeds the 75% threshold within a single chunk. using namespace projectstore_testutils; - constexpr uint64_t UsableSize = 1000; // 75% threshold = 750 bytes + constexpr uint64_t UsableSize = 100; // 75% threshold = 75 bytes constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); - // Op1: 800 bytes -> Path B, pending = {800 bytes, 1 chunk} (800 > 750) - // Op2: 300 bytes -> does not fit (800+300=1100 > 1000) and 800 > 750 -> Path C flush, - // then Path B, pending = {300 bytes} -> final flush + // Op1: 80 bytes -> Path B, pending = {80 bytes, 1 chunk} (80 > 75) + // Op2: 30 bytes -> does not fit (80+30=110 > 100) and 80 > 75 -> Path C flush, + // then Path B, pending = {30 bytes} -> final flush std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2)}; - std::vector<uint64_t> Sizes = {800, 300}; + std::vector<uint64_t> Sizes = {80, 30}; std::vector<Oid> Keys = {Op1, Op2}; std::vector<std::vector<IoHash>> Blocks; @@ -7063,7 +7007,7 @@ TEST_CASE("project.store.blockcomposer.final_flush") Oid Op2 = MakeTestOid(2); Oid Op3 = MakeTestOid(3); std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3)}; - std::vector<uint64_t> Sizes = {100, 200, 150}; + std::vector<uint64_t> Sizes = {60, 80, 70}; // each <= MaxChunkEmbedSize (100); sum=210 << UsableSize (1000) std::vector<Oid> Keys = {Op1, Op2, Op3}; std::vector<std::vector<IoHash>> Blocks; @@ -7084,19 +7028,21 @@ TEST_CASE("project.store.blockcomposer.path_b_b_c") // by the final flush. using namespace projectstore_testutils; - constexpr uint64_t UsableSize = 1000; // 75% threshold = 750 bytes + // UsableSize=200 so that 75% threshold=150. MaxChunkEmbedSize is fixed at 100 by MakeTestConfig, + // so two ops of 90 bytes each accumulate 180 bytes in pending, exceeding the threshold. + constexpr uint64_t UsableSize = 200; // 75% threshold = 150 bytes constexpr uint64_t MaxChunks = 8; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); Oid Op3 = MakeTestOid(3); - // Op1: 400 bytes -> Path B, pending = {400 bytes, 1 chunk} - // Op2: 400 bytes -> Path B, pending = {800 bytes, 2 chunks} (800 > 750) - // Op3: 300 bytes -> does not fit (1100 > 1000) and 800 > 750 -> Path C flush -> block 1 - // then Path B, pending = {300 bytes} -> final flush -> block 2 + // Op1: 90 bytes -> Path B, pending = {90 bytes, 1 chunk} (90 <= 150) + // Op2: 90 bytes -> Path B, pending = {180 bytes, 2 chunks} (180 > 150) + // Op3: 60 bytes -> does not fit (180+60=240 > 200) and 180 > 150 -> Path C flush -> block 1 + // then Path B, pending = {60 bytes} -> final flush -> block 2 std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3)}; - std::vector<uint64_t> Sizes = {400, 400, 300}; + std::vector<uint64_t> Sizes = {90, 90, 60}; std::vector<Oid> Keys = {Op1, Op2, Op3}; std::vector<std::vector<IoHash>> Blocks; @@ -7110,12 +7056,11 @@ TEST_CASE("project.store.blockcomposer.path_b_b_c") CHECK(Blocks[1][0] == MakeTestHash(3)); } -TEST_CASE("project.store.blockcomposer.path_d_b") +TEST_CASE("project.store.blockcomposer.path_a_b_final_flush") { - // Multi-step: Path D -> Path B - // Path D fills and flushes the pending block with a partial set of the current op chunks. - // The remainder of that op then fits into the freshly emptied pending block via Path B, - // and is emitted by the final flush. + // Multi-step: Path A -> Path B -> final flush + // The first op exactly fills a block (count-saturated, Path A). The second op fits into + // the now-empty pending block via Path B and is emitted by the final flush. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; @@ -7124,47 +7069,86 @@ TEST_CASE("project.store.blockcomposer.path_d_b") Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); - // Op1: 3 x 100 bytes -> Path B, pending = {3 chunks, 300 bytes} - // Op2: 2 x 100 bytes -> count 3+2=5 > 4; bytes 300+200=500 <= 1000; 300 <= 750 -> Path D - // D adds op2[0] to pending (4 chunks, count capacity reached), flushes -> block 1 - // op2[1] remaining -> Path B (pending empty) -> final flush -> block 2 - std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4), MakeTestHash(5)}; - std::vector<uint64_t> Sizes = {100, 100, 100, 100, 100}; - std::vector<Oid> Keys = {Op1, Op1, Op1, Op2, Op2}; + // Op1: 4 x 100 bytes -> MaxChunksPerBlock reached -> CurrentOpFillFullBlock=true -> Path A + // Op2: 2 x 100 bytes -> Path B (pending empty) -> final flush + std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4), MakeTestHash(5), MakeTestHash(6)}; + std::vector<uint64_t> Sizes = {100, 100, 100, 100, 100, 100}; + std::vector<Oid> Keys = {Op1, Op1, Op1, Op1, Op2, Op2}; std::vector<std::vector<IoHash>> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); }); + // Block 1: Path A standalone (all 4 Op1 chunks). Block 2: final flush of Op2 (2 chunks). REQUIRE(Blocks.size() == 2); CHECK(Blocks[0].size() == 4); + CHECK(Blocks[0][0] == MakeTestHash(1)); CHECK(Blocks[0][3] == MakeTestHash(4)); - CHECK(Blocks[1].size() == 1); + CHECK(Blocks[1].size() == 2); CHECK(Blocks[1][0] == MakeTestHash(5)); + CHECK(Blocks[1][1] == MakeTestHash(6)); } -TEST_CASE("project.store.blockcomposer.path_c_b") +TEST_CASE("project.store.blockcomposer.empty_input") { - // Multi-step: Path C -> Path B - // Path C flushes the pending block. The op chunk that triggered Path C is then placed - // via Path B into the freshly emptied pending block, and emitted by the final flush. + // Compose called with zero attachments emits no blocks and does not invoke the final flush. using namespace projectstore_testutils; - constexpr uint64_t UsableSize = 1000; // 75% threshold = 750 bytes + constexpr uint64_t UsableSize = 1000; + constexpr uint64_t MaxChunks = 4; + remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); + + std::vector<std::vector<IoHash>> Blocks; + Composer.Compose({}, {}, {}, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); }); + + CHECK(Blocks.empty()); +} + +TEST_CASE("project.store.blockcomposer.single_attachment") +{ + // A single chunk from a single op: the gather inner loop never runs. + // Path B places it into the empty pending block; the final flush emits it. + using namespace projectstore_testutils; + + constexpr uint64_t UsableSize = 1000; + constexpr uint64_t MaxChunks = 4; + remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); + + Oid Op1 = MakeTestOid(1); + std::vector<IoHash> Hashes = {MakeTestHash(1)}; + std::vector<uint64_t> Sizes = {60}; + std::vector<Oid> Keys = {Op1}; + + std::vector<std::vector<IoHash>> Blocks; + Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); }); + + REQUIRE(Blocks.size() == 1); + CHECK(Blocks[0].size() == 1); + CHECK(Blocks[0][0] == MakeTestHash(1)); +} + +TEST_CASE("project.store.blockcomposer.path_a_size_saturation") +{ + // Path A triggered by size overflow in the gather phase (not count). + // Op1 has 2 chunks of 60 bytes; adding the second would exceed UsableSize=100. + // The gather loop sets CurrentOpFillFullBlock=true after collecting only the first chunk, + // which Path A emits as a standalone block. The second chunk is then processed in the next + // outer-loop iteration and placed via Path B, emitted by the final flush. + using namespace projectstore_testutils; + + constexpr uint64_t UsableSize = 100; // MaxChunkEmbedSize=100; two 60-byte chunks overflow constexpr uint64_t MaxChunks = 4; remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); Oid Op1 = MakeTestOid(1); - Oid Op2 = MakeTestOid(2); - // Op1: 800 bytes -> Path B, pending = {800 bytes, 1 chunk} - // Op2: 300 bytes -> does not fit (1100 > 1000) and 800 > 750 -> Path C flush -> block 1 - // then Path B, pending = {300 bytes} -> final flush -> block 2 + // chunk0=60, chunk1=60: 60+60=120 > UsableSize=100 -> size overflow after gathering chunk0 std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2)}; - std::vector<uint64_t> Sizes = {800, 300}; - std::vector<Oid> Keys = {Op1, Op2}; + std::vector<uint64_t> Sizes = {60, 60}; + std::vector<Oid> Keys = {Op1, Op1}; std::vector<std::vector<IoHash>> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); }); + // Block 1: Path A standalone (chunk0 only). Block 2: final flush of chunk1. REQUIRE(Blocks.size() == 2); CHECK(Blocks[0].size() == 1); CHECK(Blocks[0][0] == MakeTestHash(1)); @@ -7172,11 +7156,78 @@ TEST_CASE("project.store.blockcomposer.path_c_b") CHECK(Blocks[1][0] == MakeTestHash(2)); } -TEST_CASE("project.store.blockcomposer.path_a_b_final_flush") +TEST_CASE("project.store.blockcomposer.path_b_exact_size_fill") { - // Multi-step: Path A -> Path B -> final flush - // The first op exactly fills a block (count-saturated, Path A). The second op fits into - // the now-empty pending block via Path B and is emitted by the final flush. + // Path B immediate flush triggered by exact byte fill (PendingBlockSize == UsableBlockSize), + // as opposed to the count-fill case covered by path_b_exact_count_fill. + // Op1 adds 60 bytes; Op2 adds the remaining 40 bytes to exactly reach UsableSize=100. + // Path B flushes immediately after Op2 merges; no separate final flush is needed. + using namespace projectstore_testutils; + + constexpr uint64_t UsableSize = 100; + constexpr uint64_t MaxChunks = 4; + remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); + + Oid Op1 = MakeTestOid(1); + Oid Op2 = MakeTestOid(2); + // Op1: 60 bytes -> Path B, pending = {60 bytes, 1 chunk} + // Op2: 40 bytes -> 60+40=100 == UsableSize -> Path B, immediate size-exact flush + std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2)}; + std::vector<uint64_t> Sizes = {60, 40}; + std::vector<Oid> Keys = {Op1, Op2}; + + std::vector<std::vector<IoHash>> Blocks; + Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); }); + + REQUIRE(Blocks.size() == 1); + CHECK(Blocks[0].size() == 2); + CHECK(Blocks[0][0] == MakeTestHash(1)); + CHECK(Blocks[0][1] == MakeTestHash(2)); +} + +TEST_CASE("project.store.blockcomposer.path_d_size_limited_greedy") +{ + // Path D where the greedy fill loop stops because the next chunk would exceed + // UsableBlockSize by bytes (not because count reaches MaxChunksPerBlock). + // + // UsableSize=200, MaxChunks=8 (high count cap to ensure size is the binding constraint). + // Op1 puts 90 bytes in pending. Op2 has 3x60-byte chunks (total 180); they don't fit + // (90+180=270>200) and pending is <=75% full (90<=150) -> Path D. + // Greedy loop adds Op2[0] (90+60=150<=200), then Op2[1] would overflow (150+60=210>200) + // -> breaks on size. Pending [Op1,Op2[0]] is flushed as block 1. + // Op2[1] and Op2[2] (120 bytes total) fit in the empty pending via Path B -> final flush + // -> block 2. + using namespace projectstore_testutils; + + constexpr uint64_t UsableSize = 200; // 75% threshold = 150 bytes + constexpr uint64_t MaxChunks = 8; + remotestore_impl::BlockComposer Composer(MakeTestConfig(UsableSize, MaxChunks)); + + Oid Op1 = MakeTestOid(1); + Oid Op2 = MakeTestOid(2); + std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4)}; + std::vector<uint64_t> Sizes = {90, 60, 60, 60}; + std::vector<Oid> Keys = {Op1, Op2, Op2, Op2}; + + std::vector<std::vector<IoHash>> Blocks; + Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); }); + + // Block 1: [Op1, Op2[0]] (Path D size-limited flush). Block 2: [Op2[1], Op2[2]] (final flush). + REQUIRE(Blocks.size() == 2); + CHECK(Blocks[0].size() == 2); + CHECK(Blocks[0][0] == MakeTestHash(1)); + CHECK(Blocks[0][1] == MakeTestHash(2)); + CHECK(Blocks[1].size() == 2); + CHECK(Blocks[1][0] == MakeTestHash(3)); + CHECK(Blocks[1][1] == MakeTestHash(4)); +} + +TEST_CASE("project.store.blockcomposer.path_a_pending_untouched") +{ + // Path A leaves the pending block untouched. Op1 accumulates 2 chunks in pending via + // Path B. Op2 then count-saturates the gather phase (4 chunks == MaxChunksPerBlock), + // triggering Path A which emits Op2 as a standalone block without disturbing pending. + // The final flush then emits Op1's 2 chunks still held in pending. using namespace projectstore_testutils; constexpr uint64_t UsableSize = 1000; @@ -7185,23 +7236,25 @@ TEST_CASE("project.store.blockcomposer.path_a_b_final_flush") Oid Op1 = MakeTestOid(1); Oid Op2 = MakeTestOid(2); - // Op1: 4 x 100 bytes -> MaxChunksPerBlock reached -> CurrentOpFillFullBlock=true -> Path A - // Op2: 2 x 100 bytes -> Path B (pending empty) -> final flush + // Op1: 2 x 60 bytes -> Path B, pending = {2 chunks, 120 bytes} + // Op2: 4 x 100 bytes -> count reaches MaxChunks=4 -> CurrentOpFillFullBlock=true -> Path A + // Path A emits Op2 standalone as block 1; pending (Op1's chunks) is left untouched. + // Final flush emits pending -> block 2. std::vector<IoHash> Hashes = {MakeTestHash(1), MakeTestHash(2), MakeTestHash(3), MakeTestHash(4), MakeTestHash(5), MakeTestHash(6)}; - std::vector<uint64_t> Sizes = {100, 100, 100, 100, 100, 100}; - std::vector<Oid> Keys = {Op1, Op1, Op1, Op1, Op2, Op2}; + std::vector<uint64_t> Sizes = {60, 60, 100, 100, 100, 100}; + std::vector<Oid> Keys = {Op1, Op1, Op2, Op2, Op2, Op2}; std::vector<std::vector<IoHash>> Blocks; Composer.Compose(Hashes, Sizes, Keys, [&](std::vector<IoHash>&& B) { Blocks.push_back(std::move(B)); }); - // Block 1: Path A standalone (all 4 Op1 chunks). Block 2: final flush of Op2 (2 chunks). + // Block 1: Path A standalone (Op2's 4 chunks). Block 2: final flush of Op1's 2 pending chunks. REQUIRE(Blocks.size() == 2); CHECK(Blocks[0].size() == 4); - CHECK(Blocks[0][0] == MakeTestHash(1)); - CHECK(Blocks[0][3] == MakeTestHash(4)); + CHECK(Blocks[0][0] == MakeTestHash(3)); + CHECK(Blocks[0][3] == MakeTestHash(6)); CHECK(Blocks[1].size() == 2); - CHECK(Blocks[1][0] == MakeTestHash(5)); - CHECK(Blocks[1][1] == MakeTestHash(6)); + CHECK(Blocks[1][0] == MakeTestHash(1)); + CHECK(Blocks[1][1] == MakeTestHash(2)); } TEST_SUITE_END(); |