diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-14 12:09:44 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-03-14 14:19:08 +0100 |
| commit | a214747b9339a2701831f01b4034bbbf223ac194 (patch) | |
| tree | 390f093d570bd31df2d0b451bca41815e0f13fc8 /src | |
| parent | exception guards in httpprojectstore (diff) | |
| download | zen-a214747b9339a2701831f01b4034bbbf223ac194.tar.xz zen-a214747b9339a2701831f01b4034bbbf223ac194.zip | |
refactored out block composer
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 602 |
1 files changed, 294 insertions, 308 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 123e8d3be..ed734aa21 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -66,6 +66,8 @@ namespace zen { } */ namespace remotestore_impl { + using namespace std::literals; + ////////////////////////////// AsyncRemoteResult struct AsyncRemoteResult @@ -170,6 +172,220 @@ namespace remotestore_impl { return BlockIndex; } + class BlockComposer + { + public: + struct Configuration + { + uint64_t MaxBlockSize = 0; + uint64_t MaxChunksPerBlock = 0; + uint64_t MaxChunkEmbedSize = 0; + std::function<bool()> IsCancelledFunc; + }; + + explicit BlockComposer(const Configuration& Config) : m_Config(Config), m_UsableBlockSize(CalculateUsableBlockSize(m_Config)) {} + + void Compose(std::span<const IoHash> AttachmentHashes, + std::span<const uint64_t> AttachmentSizes, + std::span<const Oid> AttachmentKeys, + const std::function<void(std::vector<IoHash>&& ChunkRawHashes)>& OnNewBlock) + { + std::vector<IoHash> PendingChunkHashes; + uint64_t PendingBlockSize = 0; + + size_t SortedUploadAttachmentsIndex = 0; + + Stopwatch AssembleBlocksProgressTimer; + while (SortedUploadAttachmentsIndex < AttachmentHashes.size()) + { + if (m_Config.IsCancelledFunc && m_Config.IsCancelledFunc()) + { + return; + } + + const IoHash& FirstAttachmentHash = AttachmentHashes[SortedUploadAttachmentsIndex]; + const Oid FirstAttachmentOpKey = AttachmentKeys[SortedUploadAttachmentsIndex]; + uint64_t CurrentOpAttachmentsSize = AttachmentSizes[SortedUploadAttachmentsIndex]; + + std::vector<IoHash> CurrentOpRawHashes; + CurrentOpRawHashes.push_back(FirstAttachmentHash); + + std::vector<uint64_t> CurrentOpChunkSizes; + CurrentOpChunkSizes.push_back(CurrentOpAttachmentsSize); + + bool CurrentOpFillFullBlock = false; + + while (SortedUploadAttachmentsIndex + CurrentOpRawHashes.size() < AttachmentHashes.size()) + { + size_t NextSortedUploadAttachmentsIndex = SortedUploadAttachmentsIndex + CurrentOpChunkSizes.size(); + const Oid NextAttachmentOpKey = AttachmentKeys[NextSortedUploadAttachmentsIndex]; + if (NextAttachmentOpKey != FirstAttachmentOpKey) + { + break; + } + const IoHash& NextAttachmentHash = AttachmentHashes[NextSortedUploadAttachmentsIndex]; + uint64_t NextOpAttachmentSize = AttachmentSizes[NextSortedUploadAttachmentsIndex]; + + if (CurrentOpAttachmentsSize + NextOpAttachmentSize > m_UsableBlockSize) + { + CurrentOpFillFullBlock = true; + break; + } + CurrentOpRawHashes.push_back(NextAttachmentHash); + CurrentOpChunkSizes.push_back(NextOpAttachmentSize); + CurrentOpAttachmentsSize += NextOpAttachmentSize; + + if (CurrentOpRawHashes.size() == m_Config.MaxChunksPerBlock) + { + CurrentOpFillFullBlock = true; + break; + } + } + + SortedUploadAttachmentsIndex += CurrentOpChunkSizes.size(); + + ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock); + ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size()); + // ZEN_ASSERT(CurrentOpAttachmentsSize == std::accumulate(CurrentOpChunkSizes.begin(), + // CurrentOpChunkSizes.end(), uint64_t(0u))); + + while (!CurrentOpChunkSizes.empty()) + { + ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size()); + + size_t CurrentOpAttachmentCount = CurrentOpChunkSizes.size(); + ZEN_ASSERT(CurrentOpAttachmentsSize <= m_UsableBlockSize); + ZEN_ASSERT(CurrentOpAttachmentCount <= m_Config.MaxChunksPerBlock); + + if (CurrentOpFillFullBlock) + { + ZEN_ASSERT(CurrentOpAttachmentsSize <= m_UsableBlockSize); + ZEN_ASSERT(CurrentOpRawHashes.size() <= m_Config.MaxChunksPerBlock); + OnNewBlock(std::move(CurrentOpRawHashes)); + CurrentOpChunkSizes.clear(); + CurrentOpAttachmentsSize = 0; + + ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock); + } + else if ((PendingBlockSize + CurrentOpAttachmentsSize) <= m_UsableBlockSize && + (PendingChunkHashes.size() + CurrentOpAttachmentCount) <= m_Config.MaxChunksPerBlock) + { + // All attachments for Op fits in the current block... + PendingChunkHashes.insert(PendingChunkHashes.end(), CurrentOpRawHashes.begin(), CurrentOpRawHashes.end()); + PendingBlockSize += CurrentOpAttachmentsSize; + + CurrentOpRawHashes.clear(); + CurrentOpChunkSizes.clear(); + CurrentOpAttachmentsSize = 0; + + ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock); + if (PendingBlockSize == m_UsableBlockSize || PendingChunkHashes.size() == m_Config.MaxChunksPerBlock) + { + OnNewBlock(std::move(PendingChunkHashes)); + PendingChunkHashes.clear(); + PendingBlockSize = 0; + } + + ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock); + } + else if (PendingBlockSize > (m_UsableBlockSize * 3) / 4) + { + // Our ops does not fit in the current block and the current block is using 75% of max size so lets move our op + // to the next block... + ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock); + ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize); + OnNewBlock(std::move(PendingChunkHashes)); + PendingChunkHashes.clear(); + PendingBlockSize = 0; + + ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock); + } + else + { + // Fit as many as possible in current block... + // I think this is dead code - either the CurrentOpRawHashes fills an entire block and the + // CurrentOpFillFullBlock is set, or the + + ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock); + + size_t AddedChunkCount = 0; + uint64_t AddedChunkSize = 0; + + for (size_t CurrentChunkIndex = 0; CurrentChunkIndex < CurrentOpRawHashes.size(); CurrentChunkIndex++) + { + uint64_t ChunkSize = CurrentOpChunkSizes[CurrentChunkIndex]; + if (PendingBlockSize + ChunkSize > m_UsableBlockSize) + { + break; + } + if (PendingChunkHashes.size() == m_Config.MaxChunksPerBlock) + { + break; + } + PendingBlockSize += ChunkSize; + PendingChunkHashes.push_back(CurrentOpRawHashes[CurrentChunkIndex]); + AddedChunkSize += ChunkSize; + AddedChunkCount++; + + ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock); + } + + ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock); + + ZEN_ASSERT(AddedChunkCount < CurrentOpRawHashes.size()); + + OnNewBlock(std::move(PendingChunkHashes)); + PendingChunkHashes.clear(); + PendingBlockSize = 0; + + CurrentOpRawHashes.erase(CurrentOpRawHashes.begin(), CurrentOpRawHashes.begin() + AddedChunkCount); + CurrentOpChunkSizes.erase(CurrentOpChunkSizes.begin(), CurrentOpChunkSizes.begin() + AddedChunkCount); + CurrentOpAttachmentsSize -= AddedChunkSize; + + ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size()); + ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock); + } + + ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock); + } + } + if (!PendingChunkHashes.empty()) + { + ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock); + + ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize); + ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock); + OnNewBlock(std::move(PendingChunkHashes)); + PendingChunkHashes.clear(); + } + } + + private: + static uint64_t CalculateUsableBlockSize(const Configuration& Config) + { + ZEN_ASSERT(Config.MaxChunksPerBlock > 0); + ZEN_ASSERT(Config.MaxChunkEmbedSize > 0); + uint64_t MaxHeaderSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + MeasureVarUInt(Config.MaxChunksPerBlock) + + MeasureVarUInt(Config.MaxChunkEmbedSize) * Config.MaxChunksPerBlock; + ZEN_ASSERT(Config.MaxBlockSize > MaxHeaderSize); + return Config.MaxBlockSize - MaxHeaderSize; + } + + const Configuration m_Config; + const uint64_t m_UsableBlockSize = 0; + }; + RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext) { using namespace std::literals; @@ -1269,14 +1485,14 @@ namespace remotestore_impl { WorkerThreadPool::EMode::EnableBacklog); }; - void CreateBlock(WorkerThreadPool& WorkerPool, - Latch& OpSectionsLatch, - std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, - RwLock& SectionsLock, - std::vector<ChunkBlockDescription>& Blocks, - size_t BlockIndex, - const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, - AsyncRemoteResult& RemoteResult) + void AsyncCreateBlock(WorkerThreadPool& WorkerPool, + Latch& OpSectionsLatch, + std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, + RwLock& SectionsLock, + std::vector<ChunkBlockDescription>& Blocks, + size_t BlockIndex, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, + AsyncRemoteResult& RemoteResult) { OpSectionsLatch.AddCount(1); WorkerPool.ScheduleWork( @@ -2673,297 +2889,73 @@ BuildContainer(CidStore& ChunkStore, uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs(); try { - auto NewBlock = - [&WorkerPool, BuildBlocks, &BlockCreateLatch, &BlocksLock, &Blocks, &AsyncOnBlock, &OnBlockChunks, &RemoteResult]( - std::vector<IoHash>&& ChunkRawHashes, - const std::function<FetchChunkFunc(const IoHash&)>& FetchResolveFunc) { - size_t ChunkCount = ChunkRawHashes.size(); - std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock; - ChunksInBlock.reserve(ChunkCount); - - for (const IoHash& AttachmentHash : ChunkRawHashes) - { - ChunksInBlock.emplace_back(std::make_pair(AttachmentHash, FetchResolveFunc(AttachmentHash))); - } - - size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); - if (BuildBlocks) - { -#if 0 - ChunkBlockDescription Block; - CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(ChunksInBlock), Block); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); - ZEN_ASSERT_SLOW(BlockHash != IoHash::Zero); - ZEN_UNUSED(BlockHash); - { - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - // RwLock::SharedLockScope __(SectionsLock); - Blocks[BlockIndex] = Block; - } - // uint64_t BlockSize = CompressedBlock.GetCompressedSize(); - ZEN_INFO("Generated block {} with {} chunks ({})", BlockHash, ChunkCount, NiceBytes(CompressedBlock.GetCompressedSize())); - - AsyncOnBlock(std::move(CompressedBlock), std::move(Block)); -#else - remotestore_impl::CreateBlock(WorkerPool, - BlockCreateLatch, - std::move(ChunksInBlock), - BlocksLock, - Blocks, - BlockIndex, - AsyncOnBlock, - RemoteResult); - // ComposedBlocks++; -#endif - } - else - { - ZEN_INFO("Bulk group {} attachments", ChunkCount); - - // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index - RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes); - OnBlockChunks(std::move(ChunksInBlock)); - } - // uint64_t NowMS = Timer.GetElapsedTimeMs(); - // ZEN_INFO("Assembled block {} with {} chunks in {} ({})", - // BlockIndex, - // ChunkCount, - // NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS), - // NiceBytes(BlockSize)); - // FetchAttachmentsStartMS = NowMS; - // ComposedBlocks++; - }; - - auto ComposeBlocks = [&ChunksAssembled, &ComposedBlocks, ChunkAssembleCount, &NewBlock]( - uint64_t MaxBlockSize, - uint64_t MaxChunksPerBlock, - uint64_t MaxChunkEmbedSize, - std::vector<IoHash> AttachmentHashes, - std::vector<uint64_t> AttachmentSizes, - std::vector<Oid> AttachmentKeys, - std::function<FetchChunkFunc(const IoHash& AttachmentHash)>&& FetchResolveFunc, - JobContext* OptionalContext, - remotestore_impl::AsyncRemoteResult& RemoteResult) { - ZEN_ASSERT(AttachmentHashes.size() == AttachmentSizes.size()); - ZEN_ASSERT(AttachmentHashes.size() == AttachmentKeys.size()); - ZEN_ASSERT(FetchResolveFunc); - - uint64_t MaxHeaderSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + MeasureVarUInt(MaxChunksPerBlock) + - MeasureVarUInt(MaxChunkEmbedSize) * MaxChunksPerBlock; - uint64_t UsableBlockSize = MaxBlockSize - MaxHeaderSize; - - std::vector<IoHash> PendingChunkHashes; - uint64_t PendingBlockSize = 0; - - size_t SortedUploadAttachmentsIndex = 0; - - Stopwatch AssembleBlocksProgressTimer; - while (SortedUploadAttachmentsIndex < AttachmentHashes.size()) + Stopwatch AssembleBlocksProgressTimer; + remotestore_impl::BlockComposer Composer(remotestore_impl::BlockComposer::Configuration{ + .MaxBlockSize = MaxBlockSize, + .MaxChunksPerBlock = MaxChunksPerBlock, + .MaxChunkEmbedSize = MaxChunkEmbedSize, + .IsCancelledFunc = [OptionalContext]() { return remotestore_impl::IsCancelled(OptionalContext); }}); + + auto OnNewBlock = [&WorkerPool, + BuildBlocks, + &AssembleBlocksProgressTimer, + &BlockCreateLatch, + &BlocksLock, + &Blocks, + &AsyncOnBlock, + &OnBlockChunks, + ChunkAssembleCount, + &ChunksAssembled, + &ComposedBlocks, + OptionalContext, + &RemoteResult](std::vector<IoHash>&& ChunkRawHashes, + const std::function<FetchChunkFunc(const IoHash& AttachmentHash)>& FetchAttachmentResolver) { + size_t ChunkCount = ChunkRawHashes.size(); + std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock; + ChunksInBlock.reserve(ChunkCount); + + for (const IoHash& AttachmentHash : ChunkRawHashes) { - if (remotestore_impl::IsCancelled(OptionalContext)) - { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage( - OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - break; - } - - const IoHash& FirstAttachmentHash = AttachmentHashes[SortedUploadAttachmentsIndex]; - const Oid FirstAttachmentOpKey = AttachmentKeys[SortedUploadAttachmentsIndex]; - uint64_t CurrentOpAttachmentsSize = AttachmentSizes[SortedUploadAttachmentsIndex]; - - std::vector<IoHash> CurrentOpRawHashes; - CurrentOpRawHashes.push_back(FirstAttachmentHash); - - std::vector<uint64_t> CurrentOpChunkSizes; - CurrentOpChunkSizes.push_back(CurrentOpAttachmentsSize); - - bool CurrentOpFillFullBlock = false; - - while (SortedUploadAttachmentsIndex + CurrentOpRawHashes.size() < AttachmentHashes.size()) - { - size_t NextSortedUploadAttachmentsIndex = SortedUploadAttachmentsIndex + CurrentOpChunkSizes.size(); - const Oid NextAttachmentOpKey = AttachmentKeys[NextSortedUploadAttachmentsIndex]; - if (NextAttachmentOpKey != FirstAttachmentOpKey) - { - break; - } - const IoHash& NextAttachmentHash = AttachmentHashes[NextSortedUploadAttachmentsIndex]; - uint64_t NextOpAttachmentSize = AttachmentSizes[NextSortedUploadAttachmentsIndex]; - - if (CurrentOpAttachmentsSize + NextOpAttachmentSize > UsableBlockSize) - { - CurrentOpFillFullBlock = true; - break; - } - CurrentOpRawHashes.push_back(NextAttachmentHash); - CurrentOpChunkSizes.push_back(NextOpAttachmentSize); - CurrentOpAttachmentsSize += NextOpAttachmentSize; - - if (CurrentOpRawHashes.size() == MaxChunksPerBlock) - { - CurrentOpFillFullBlock = true; - break; - } - } - - ChunksAssembled += CurrentOpChunkSizes.size(); - SortedUploadAttachmentsIndex += CurrentOpChunkSizes.size(); - - ZEN_ASSERT(PendingBlockSize < UsableBlockSize); - ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock); - ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size()); - // ZEN_ASSERT(CurrentOpAttachmentsSize == std::accumulate(CurrentOpChunkSizes.begin(), - // CurrentOpChunkSizes.end(), uint64_t(0u))); - - while (!CurrentOpChunkSizes.empty()) - { - ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size()); - // ZEN_ASSERT(CurrentOpAttachmentsSize == std::accumulate(CurrentOpChunkSizes.begin(), - // CurrentOpChunkSizes.end(), uint64_t(0u))); - - size_t CurrentOpAttachmentCount = CurrentOpChunkSizes.size(); - ZEN_ASSERT(CurrentOpAttachmentsSize <= UsableBlockSize); - ZEN_ASSERT(CurrentOpAttachmentCount <= MaxChunksPerBlock); - - if (CurrentOpFillFullBlock) - { - ZEN_ASSERT(CurrentOpRawHashes.size() <= MaxChunksPerBlock); - ZEN_ASSERT(CurrentOpAttachmentsSize <= UsableBlockSize); - NewBlock(std::move(CurrentOpRawHashes), /*CurrentOpAttachmentsSize, */ FetchResolveFunc); - CurrentOpChunkSizes.clear(); - CurrentOpAttachmentsSize = 0; - ComposedBlocks++; - - ZEN_ASSERT(PendingBlockSize < UsableBlockSize); - ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock); - } - else if ((PendingBlockSize + CurrentOpAttachmentsSize) <= UsableBlockSize && - (PendingChunkHashes.size() + CurrentOpAttachmentCount) <= MaxChunksPerBlock) - { - // All attachments for Op fits in the current block... - PendingChunkHashes.insert(PendingChunkHashes.end(), CurrentOpRawHashes.begin(), CurrentOpRawHashes.end()); - PendingBlockSize += CurrentOpAttachmentsSize; - - CurrentOpRawHashes.clear(); - CurrentOpChunkSizes.clear(); - CurrentOpAttachmentsSize = 0; - - ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock); - ZEN_ASSERT(PendingBlockSize <= UsableBlockSize); - if (PendingBlockSize == UsableBlockSize || PendingChunkHashes.size() == MaxChunksPerBlock) - { - NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc); - PendingChunkHashes.clear(); - PendingBlockSize = 0; - ComposedBlocks++; - } - - ZEN_ASSERT(PendingBlockSize < UsableBlockSize); - ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock); - } - else if (PendingBlockSize > (UsableBlockSize * 3) / 4) - { - // Our ops does not fit in the current block and the current block is using 75% of max size so lets move our op - // to the next block... - ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock); - ZEN_ASSERT(PendingBlockSize <= UsableBlockSize); - NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc); - PendingChunkHashes.clear(); - PendingBlockSize = 0; - ComposedBlocks++; - - ZEN_ASSERT(PendingBlockSize < UsableBlockSize); - ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock); - } - else - { - // Fit as many as possible in current block... - // I think this is dead code - either the CurrentOpRawHashes fills an entire block and the - // CurrentOpFillFullBlock is set, or the - - ZEN_ASSERT(PendingBlockSize < UsableBlockSize); - ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock); - - size_t AddedChunkCount = 0; - uint64_t AddedChunkSize = 0; - - for (size_t CurrentChunkIndex = 0; CurrentChunkIndex < CurrentOpRawHashes.size(); CurrentChunkIndex++) - { - uint64_t ChunkSize = CurrentOpChunkSizes[CurrentChunkIndex]; - if (PendingBlockSize + ChunkSize > UsableBlockSize) - { - break; - } - if (PendingChunkHashes.size() == MaxChunksPerBlock) - { - break; - } - PendingBlockSize += ChunkSize; - PendingChunkHashes.push_back(CurrentOpRawHashes[CurrentChunkIndex]); - AddedChunkSize += ChunkSize; - AddedChunkCount++; - - ZEN_ASSERT(PendingBlockSize <= UsableBlockSize); - ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock); - } - - ZEN_ASSERT(PendingBlockSize <= UsableBlockSize); - ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock); - - ZEN_ASSERT(AddedChunkCount < CurrentOpRawHashes.size()); - - NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize,*/ FetchResolveFunc); - PendingChunkHashes.clear(); - PendingBlockSize = 0; - ComposedBlocks++; - - CurrentOpRawHashes.erase(CurrentOpRawHashes.begin(), CurrentOpRawHashes.begin() + AddedChunkCount); - CurrentOpChunkSizes.erase(CurrentOpChunkSizes.begin(), CurrentOpChunkSizes.begin() + AddedChunkCount); - CurrentOpAttachmentsSize -= AddedChunkSize; - - ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size()); - // ZEN_ASSERT(CurrentOpAttachmentsSize == std::accumulate(CurrentOpChunkSizes.begin(), - // CurrentOpChunkSizes.end(), uint64_t(0u))); - - ZEN_ASSERT(PendingBlockSize < UsableBlockSize); - ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock); - } - - ZEN_ASSERT(PendingBlockSize < UsableBlockSize); - ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock); - } + ChunksInBlock.emplace_back(std::make_pair(AttachmentHash, FetchAttachmentResolver(AttachmentHash))); + } - if (ChunksAssembled % 1000 == 0) - { - remotestore_impl::ReportProgress( - OptionalContext, - "Assembling blocks"sv, - fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), - ChunkAssembleCount, - ChunkAssembleCount - ChunksAssembled, - AssembleBlocksProgressTimer.GetElapsedTimeMs()); - } + size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks); + if (BuildBlocks) + { + remotestore_impl::AsyncCreateBlock(WorkerPool, + BlockCreateLatch, + std::move(ChunksInBlock), + BlocksLock, + Blocks, + BlockIndex, + AsyncOnBlock, + RemoteResult); } - if (!PendingChunkHashes.empty()) + else { - ZEN_ASSERT(PendingBlockSize < UsableBlockSize); - ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock); + ZEN_INFO("Bulk group {} attachments", ChunkCount); - size_t PendingChunkCount = PendingChunkHashes.size(); - ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock); - ZEN_ASSERT(PendingBlockSize <= UsableBlockSize); - NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc); - PendingChunkHashes.clear(); - ComposedBlocks++; + // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index + RwLock::SharedLockScope _(BlocksLock); + Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes); + OnBlockChunks(std::move(ChunksInBlock)); + } + + ChunksAssembled += ChunkCount; + ComposedBlocks++; - ChunksAssembled += PendingChunkCount; + if (ChunksAssembled % 1000 == 0) + { + remotestore_impl::ReportProgress( + OptionalContext, + "Assembling blocks"sv, + fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks), + ChunkAssembleCount, + ChunkAssembleCount - ChunksAssembled, + AssembleBlocksProgressTimer.GetElapsedTimeMs()); } }; - Stopwatch AssembleBlocksProgressTimer; { std::vector<IoHash> AttachmentHashes; AttachmentHashes.reserve(SortedUploadAttachments.size()); @@ -3045,15 +3037,12 @@ BuildContainer(CidStore& ChunkStore, } }; - ComposeBlocks(MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - AttachmentHashes, - AttachmentSizes, - AttachmentKeys, - FetchWholeAttachmentResolver, - OptionalContext, - RemoteResult); + Composer.Compose(AttachmentHashes, + AttachmentSizes, + AttachmentKeys, + [&OnNewBlock, &FetchWholeAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) { + OnNewBlock(std::move(ChunkRawHashes), FetchWholeAttachmentResolver); + }); } { @@ -3135,15 +3124,12 @@ BuildContainer(CidStore& ChunkStore, } }; - ComposeBlocks(MaxBlockSize, - MaxChunksPerBlock, - MaxChunkEmbedSize, - AttachmentHashes, - AttachmentSizes, - AttachmentKeys, - ChunkedFileAttachmentResolver, - OptionalContext, - RemoteResult); + Composer.Compose(AttachmentHashes, + AttachmentSizes, + AttachmentKeys, + [&OnNewBlock, &ChunkedFileAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) { + OnNewBlock(std::move(ChunkRawHashes), ChunkedFileAttachmentResolver); + }); } if (ChunkAssembleCount > 0) |