aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-14 12:09:44 +0100
committerDan Engelbrecht <[email protected]>2026-03-14 14:19:08 +0100
commita214747b9339a2701831f01b4034bbbf223ac194 (patch)
tree390f093d570bd31df2d0b451bca41815e0f13fc8 /src
parentexception guards in httpprojectstore (diff)
downloadzen-a214747b9339a2701831f01b4034bbbf223ac194.tar.xz
zen-a214747b9339a2701831f01b4034bbbf223ac194.zip
refactored out block composer
Diffstat (limited to 'src')
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp602
1 files changed, 294 insertions, 308 deletions
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 123e8d3be..ed734aa21 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -66,6 +66,8 @@ namespace zen {
}
*/
namespace remotestore_impl {
+ using namespace std::literals;
+
////////////////////////////// AsyncRemoteResult
struct AsyncRemoteResult
@@ -170,6 +172,220 @@ namespace remotestore_impl {
return BlockIndex;
}
+ class BlockComposer
+ {
+ public:
+ struct Configuration
+ {
+ uint64_t MaxBlockSize = 0;
+ uint64_t MaxChunksPerBlock = 0;
+ uint64_t MaxChunkEmbedSize = 0;
+ std::function<bool()> IsCancelledFunc;
+ };
+
+ explicit BlockComposer(const Configuration& Config) : m_Config(Config), m_UsableBlockSize(CalculateUsableBlockSize(m_Config)) {}
+
+ void Compose(std::span<const IoHash> AttachmentHashes,
+ std::span<const uint64_t> AttachmentSizes,
+ std::span<const Oid> AttachmentKeys,
+ const std::function<void(std::vector<IoHash>&& ChunkRawHashes)>& OnNewBlock)
+ {
+ std::vector<IoHash> PendingChunkHashes;
+ uint64_t PendingBlockSize = 0;
+
+ size_t SortedUploadAttachmentsIndex = 0;
+
+ Stopwatch AssembleBlocksProgressTimer;
+ while (SortedUploadAttachmentsIndex < AttachmentHashes.size())
+ {
+ if (m_Config.IsCancelledFunc && m_Config.IsCancelledFunc())
+ {
+ return;
+ }
+
+ const IoHash& FirstAttachmentHash = AttachmentHashes[SortedUploadAttachmentsIndex];
+ const Oid FirstAttachmentOpKey = AttachmentKeys[SortedUploadAttachmentsIndex];
+ uint64_t CurrentOpAttachmentsSize = AttachmentSizes[SortedUploadAttachmentsIndex];
+
+ std::vector<IoHash> CurrentOpRawHashes;
+ CurrentOpRawHashes.push_back(FirstAttachmentHash);
+
+ std::vector<uint64_t> CurrentOpChunkSizes;
+ CurrentOpChunkSizes.push_back(CurrentOpAttachmentsSize);
+
+ bool CurrentOpFillFullBlock = false;
+
+ while (SortedUploadAttachmentsIndex + CurrentOpRawHashes.size() < AttachmentHashes.size())
+ {
+ size_t NextSortedUploadAttachmentsIndex = SortedUploadAttachmentsIndex + CurrentOpChunkSizes.size();
+ const Oid NextAttachmentOpKey = AttachmentKeys[NextSortedUploadAttachmentsIndex];
+ if (NextAttachmentOpKey != FirstAttachmentOpKey)
+ {
+ break;
+ }
+ const IoHash& NextAttachmentHash = AttachmentHashes[NextSortedUploadAttachmentsIndex];
+ uint64_t NextOpAttachmentSize = AttachmentSizes[NextSortedUploadAttachmentsIndex];
+
+ if (CurrentOpAttachmentsSize + NextOpAttachmentSize > m_UsableBlockSize)
+ {
+ CurrentOpFillFullBlock = true;
+ break;
+ }
+ CurrentOpRawHashes.push_back(NextAttachmentHash);
+ CurrentOpChunkSizes.push_back(NextOpAttachmentSize);
+ CurrentOpAttachmentsSize += NextOpAttachmentSize;
+
+ if (CurrentOpRawHashes.size() == m_Config.MaxChunksPerBlock)
+ {
+ CurrentOpFillFullBlock = true;
+ break;
+ }
+ }
+
+ SortedUploadAttachmentsIndex += CurrentOpChunkSizes.size();
+
+ ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock);
+ ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size());
+ // ZEN_ASSERT(CurrentOpAttachmentsSize == std::accumulate(CurrentOpChunkSizes.begin(),
+ // CurrentOpChunkSizes.end(), uint64_t(0u)));
+
+ while (!CurrentOpChunkSizes.empty())
+ {
+ ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size());
+
+ size_t CurrentOpAttachmentCount = CurrentOpChunkSizes.size();
+ ZEN_ASSERT(CurrentOpAttachmentsSize <= m_UsableBlockSize);
+ ZEN_ASSERT(CurrentOpAttachmentCount <= m_Config.MaxChunksPerBlock);
+
+ if (CurrentOpFillFullBlock)
+ {
+ ZEN_ASSERT(CurrentOpAttachmentsSize <= m_UsableBlockSize);
+ ZEN_ASSERT(CurrentOpRawHashes.size() <= m_Config.MaxChunksPerBlock);
+ OnNewBlock(std::move(CurrentOpRawHashes));
+ CurrentOpChunkSizes.clear();
+ CurrentOpAttachmentsSize = 0;
+
+ ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock);
+ }
+ else if ((PendingBlockSize + CurrentOpAttachmentsSize) <= m_UsableBlockSize &&
+ (PendingChunkHashes.size() + CurrentOpAttachmentCount) <= m_Config.MaxChunksPerBlock)
+ {
+ // All attachments for Op fits in the current block...
+ PendingChunkHashes.insert(PendingChunkHashes.end(), CurrentOpRawHashes.begin(), CurrentOpRawHashes.end());
+ PendingBlockSize += CurrentOpAttachmentsSize;
+
+ CurrentOpRawHashes.clear();
+ CurrentOpChunkSizes.clear();
+ CurrentOpAttachmentsSize = 0;
+
+ ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock);
+ if (PendingBlockSize == m_UsableBlockSize || PendingChunkHashes.size() == m_Config.MaxChunksPerBlock)
+ {
+ OnNewBlock(std::move(PendingChunkHashes));
+ PendingChunkHashes.clear();
+ PendingBlockSize = 0;
+ }
+
+ ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock);
+ }
+ else if (PendingBlockSize > (m_UsableBlockSize * 3) / 4)
+ {
+ // Our ops does not fit in the current block and the current block is using 75% of max size so lets move our op
+ // to the next block...
+ ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock);
+ ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize);
+ OnNewBlock(std::move(PendingChunkHashes));
+ PendingChunkHashes.clear();
+ PendingBlockSize = 0;
+
+ ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock);
+ }
+ else
+ {
+ // Fit as many as possible in current block...
+ // I think this is dead code - either the CurrentOpRawHashes fills an entire block and the
+ // CurrentOpFillFullBlock is set, or the
+
+ ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock);
+
+ size_t AddedChunkCount = 0;
+ uint64_t AddedChunkSize = 0;
+
+ for (size_t CurrentChunkIndex = 0; CurrentChunkIndex < CurrentOpRawHashes.size(); CurrentChunkIndex++)
+ {
+ uint64_t ChunkSize = CurrentOpChunkSizes[CurrentChunkIndex];
+ if (PendingBlockSize + ChunkSize > m_UsableBlockSize)
+ {
+ break;
+ }
+ if (PendingChunkHashes.size() == m_Config.MaxChunksPerBlock)
+ {
+ break;
+ }
+ PendingBlockSize += ChunkSize;
+ PendingChunkHashes.push_back(CurrentOpRawHashes[CurrentChunkIndex]);
+ AddedChunkSize += ChunkSize;
+ AddedChunkCount++;
+
+ ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock);
+ }
+
+ ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock);
+
+ ZEN_ASSERT(AddedChunkCount < CurrentOpRawHashes.size());
+
+ OnNewBlock(std::move(PendingChunkHashes));
+ PendingChunkHashes.clear();
+ PendingBlockSize = 0;
+
+ CurrentOpRawHashes.erase(CurrentOpRawHashes.begin(), CurrentOpRawHashes.begin() + AddedChunkCount);
+ CurrentOpChunkSizes.erase(CurrentOpChunkSizes.begin(), CurrentOpChunkSizes.begin() + AddedChunkCount);
+ CurrentOpAttachmentsSize -= AddedChunkSize;
+
+ ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size());
+ ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock);
+ }
+
+ ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock);
+ }
+ }
+ if (!PendingChunkHashes.empty())
+ {
+ ZEN_ASSERT(PendingBlockSize < m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() < m_Config.MaxChunksPerBlock);
+
+ ZEN_ASSERT(PendingBlockSize <= m_UsableBlockSize);
+ ZEN_ASSERT(PendingChunkHashes.size() <= m_Config.MaxChunksPerBlock);
+ OnNewBlock(std::move(PendingChunkHashes));
+ PendingChunkHashes.clear();
+ }
+ }
+
+ private:
+ static uint64_t CalculateUsableBlockSize(const Configuration& Config)
+ {
+ ZEN_ASSERT(Config.MaxChunksPerBlock > 0);
+ ZEN_ASSERT(Config.MaxChunkEmbedSize > 0);
+ uint64_t MaxHeaderSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + MeasureVarUInt(Config.MaxChunksPerBlock) +
+ MeasureVarUInt(Config.MaxChunkEmbedSize) * Config.MaxChunksPerBlock;
+ ZEN_ASSERT(Config.MaxBlockSize > MaxHeaderSize);
+ return Config.MaxBlockSize - MaxHeaderSize;
+ }
+
+ const Configuration m_Config;
+ const uint64_t m_UsableBlockSize = 0;
+ };
+
RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext)
{
using namespace std::literals;
@@ -1269,14 +1485,14 @@ namespace remotestore_impl {
WorkerThreadPool::EMode::EnableBacklog);
};
- void CreateBlock(WorkerThreadPool& WorkerPool,
- Latch& OpSectionsLatch,
- std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
- RwLock& SectionsLock,
- std::vector<ChunkBlockDescription>& Blocks,
- size_t BlockIndex,
- const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
- AsyncRemoteResult& RemoteResult)
+ void AsyncCreateBlock(WorkerThreadPool& WorkerPool,
+ Latch& OpSectionsLatch,
+ std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
+ RwLock& SectionsLock,
+ std::vector<ChunkBlockDescription>& Blocks,
+ size_t BlockIndex,
+ const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
+ AsyncRemoteResult& RemoteResult)
{
OpSectionsLatch.AddCount(1);
WorkerPool.ScheduleWork(
@@ -2673,297 +2889,73 @@ BuildContainer(CidStore& ChunkStore,
uint64_t CreateBlocksStartMS = Timer.GetElapsedTimeMs();
try
{
- auto NewBlock =
- [&WorkerPool, BuildBlocks, &BlockCreateLatch, &BlocksLock, &Blocks, &AsyncOnBlock, &OnBlockChunks, &RemoteResult](
- std::vector<IoHash>&& ChunkRawHashes,
- const std::function<FetchChunkFunc(const IoHash&)>& FetchResolveFunc) {
- size_t ChunkCount = ChunkRawHashes.size();
- std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock;
- ChunksInBlock.reserve(ChunkCount);
-
- for (const IoHash& AttachmentHash : ChunkRawHashes)
- {
- ChunksInBlock.emplace_back(std::make_pair(AttachmentHash, FetchResolveFunc(AttachmentHash)));
- }
-
- size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
- if (BuildBlocks)
- {
-#if 0
- ChunkBlockDescription Block;
- CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(ChunksInBlock), Block);
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
- ZEN_ASSERT_SLOW(BlockHash != IoHash::Zero);
- ZEN_UNUSED(BlockHash);
- {
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- // RwLock::SharedLockScope __(SectionsLock);
- Blocks[BlockIndex] = Block;
- }
- // uint64_t BlockSize = CompressedBlock.GetCompressedSize();
- ZEN_INFO("Generated block {} with {} chunks ({})", BlockHash, ChunkCount, NiceBytes(CompressedBlock.GetCompressedSize()));
-
- AsyncOnBlock(std::move(CompressedBlock), std::move(Block));
-#else
- remotestore_impl::CreateBlock(WorkerPool,
- BlockCreateLatch,
- std::move(ChunksInBlock),
- BlocksLock,
- Blocks,
- BlockIndex,
- AsyncOnBlock,
- RemoteResult);
- // ComposedBlocks++;
-#endif
- }
- else
- {
- ZEN_INFO("Bulk group {} attachments", ChunkCount);
-
- // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
- RwLock::SharedLockScope _(BlocksLock);
- Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes);
- OnBlockChunks(std::move(ChunksInBlock));
- }
- // uint64_t NowMS = Timer.GetElapsedTimeMs();
- // ZEN_INFO("Assembled block {} with {} chunks in {} ({})",
- // BlockIndex,
- // ChunkCount,
- // NiceTimeSpanMs(NowMS - FetchAttachmentsStartMS),
- // NiceBytes(BlockSize));
- // FetchAttachmentsStartMS = NowMS;
- // ComposedBlocks++;
- };
-
- auto ComposeBlocks = [&ChunksAssembled, &ComposedBlocks, ChunkAssembleCount, &NewBlock](
- uint64_t MaxBlockSize,
- uint64_t MaxChunksPerBlock,
- uint64_t MaxChunkEmbedSize,
- std::vector<IoHash> AttachmentHashes,
- std::vector<uint64_t> AttachmentSizes,
- std::vector<Oid> AttachmentKeys,
- std::function<FetchChunkFunc(const IoHash& AttachmentHash)>&& FetchResolveFunc,
- JobContext* OptionalContext,
- remotestore_impl::AsyncRemoteResult& RemoteResult) {
- ZEN_ASSERT(AttachmentHashes.size() == AttachmentSizes.size());
- ZEN_ASSERT(AttachmentHashes.size() == AttachmentKeys.size());
- ZEN_ASSERT(FetchResolveFunc);
-
- uint64_t MaxHeaderSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + MeasureVarUInt(MaxChunksPerBlock) +
- MeasureVarUInt(MaxChunkEmbedSize) * MaxChunksPerBlock;
- uint64_t UsableBlockSize = MaxBlockSize - MaxHeaderSize;
-
- std::vector<IoHash> PendingChunkHashes;
- uint64_t PendingBlockSize = 0;
-
- size_t SortedUploadAttachmentsIndex = 0;
-
- Stopwatch AssembleBlocksProgressTimer;
- while (SortedUploadAttachmentsIndex < AttachmentHashes.size())
+ Stopwatch AssembleBlocksProgressTimer;
+ remotestore_impl::BlockComposer Composer(remotestore_impl::BlockComposer::Configuration{
+ .MaxBlockSize = MaxBlockSize,
+ .MaxChunksPerBlock = MaxChunksPerBlock,
+ .MaxChunkEmbedSize = MaxChunkEmbedSize,
+ .IsCancelledFunc = [OptionalContext]() { return remotestore_impl::IsCancelled(OptionalContext); }});
+
+ auto OnNewBlock = [&WorkerPool,
+ BuildBlocks,
+ &AssembleBlocksProgressTimer,
+ &BlockCreateLatch,
+ &BlocksLock,
+ &Blocks,
+ &AsyncOnBlock,
+ &OnBlockChunks,
+ ChunkAssembleCount,
+ &ChunksAssembled,
+ &ComposedBlocks,
+ OptionalContext,
+ &RemoteResult](std::vector<IoHash>&& ChunkRawHashes,
+ const std::function<FetchChunkFunc(const IoHash& AttachmentHash)>& FetchAttachmentResolver) {
+ size_t ChunkCount = ChunkRawHashes.size();
+ std::vector<std::pair<IoHash, FetchChunkFunc>> ChunksInBlock;
+ ChunksInBlock.reserve(ChunkCount);
+
+ for (const IoHash& AttachmentHash : ChunkRawHashes)
{
- if (remotestore_impl::IsCancelled(OptionalContext))
- {
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(
- OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- break;
- }
-
- const IoHash& FirstAttachmentHash = AttachmentHashes[SortedUploadAttachmentsIndex];
- const Oid FirstAttachmentOpKey = AttachmentKeys[SortedUploadAttachmentsIndex];
- uint64_t CurrentOpAttachmentsSize = AttachmentSizes[SortedUploadAttachmentsIndex];
-
- std::vector<IoHash> CurrentOpRawHashes;
- CurrentOpRawHashes.push_back(FirstAttachmentHash);
-
- std::vector<uint64_t> CurrentOpChunkSizes;
- CurrentOpChunkSizes.push_back(CurrentOpAttachmentsSize);
-
- bool CurrentOpFillFullBlock = false;
-
- while (SortedUploadAttachmentsIndex + CurrentOpRawHashes.size() < AttachmentHashes.size())
- {
- size_t NextSortedUploadAttachmentsIndex = SortedUploadAttachmentsIndex + CurrentOpChunkSizes.size();
- const Oid NextAttachmentOpKey = AttachmentKeys[NextSortedUploadAttachmentsIndex];
- if (NextAttachmentOpKey != FirstAttachmentOpKey)
- {
- break;
- }
- const IoHash& NextAttachmentHash = AttachmentHashes[NextSortedUploadAttachmentsIndex];
- uint64_t NextOpAttachmentSize = AttachmentSizes[NextSortedUploadAttachmentsIndex];
-
- if (CurrentOpAttachmentsSize + NextOpAttachmentSize > UsableBlockSize)
- {
- CurrentOpFillFullBlock = true;
- break;
- }
- CurrentOpRawHashes.push_back(NextAttachmentHash);
- CurrentOpChunkSizes.push_back(NextOpAttachmentSize);
- CurrentOpAttachmentsSize += NextOpAttachmentSize;
-
- if (CurrentOpRawHashes.size() == MaxChunksPerBlock)
- {
- CurrentOpFillFullBlock = true;
- break;
- }
- }
-
- ChunksAssembled += CurrentOpChunkSizes.size();
- SortedUploadAttachmentsIndex += CurrentOpChunkSizes.size();
-
- ZEN_ASSERT(PendingBlockSize < UsableBlockSize);
- ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock);
- ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size());
- // ZEN_ASSERT(CurrentOpAttachmentsSize == std::accumulate(CurrentOpChunkSizes.begin(),
- // CurrentOpChunkSizes.end(), uint64_t(0u)));
-
- while (!CurrentOpChunkSizes.empty())
- {
- ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size());
- // ZEN_ASSERT(CurrentOpAttachmentsSize == std::accumulate(CurrentOpChunkSizes.begin(),
- // CurrentOpChunkSizes.end(), uint64_t(0u)));
-
- size_t CurrentOpAttachmentCount = CurrentOpChunkSizes.size();
- ZEN_ASSERT(CurrentOpAttachmentsSize <= UsableBlockSize);
- ZEN_ASSERT(CurrentOpAttachmentCount <= MaxChunksPerBlock);
-
- if (CurrentOpFillFullBlock)
- {
- ZEN_ASSERT(CurrentOpRawHashes.size() <= MaxChunksPerBlock);
- ZEN_ASSERT(CurrentOpAttachmentsSize <= UsableBlockSize);
- NewBlock(std::move(CurrentOpRawHashes), /*CurrentOpAttachmentsSize, */ FetchResolveFunc);
- CurrentOpChunkSizes.clear();
- CurrentOpAttachmentsSize = 0;
- ComposedBlocks++;
-
- ZEN_ASSERT(PendingBlockSize < UsableBlockSize);
- ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock);
- }
- else if ((PendingBlockSize + CurrentOpAttachmentsSize) <= UsableBlockSize &&
- (PendingChunkHashes.size() + CurrentOpAttachmentCount) <= MaxChunksPerBlock)
- {
- // All attachments for Op fits in the current block...
- PendingChunkHashes.insert(PendingChunkHashes.end(), CurrentOpRawHashes.begin(), CurrentOpRawHashes.end());
- PendingBlockSize += CurrentOpAttachmentsSize;
-
- CurrentOpRawHashes.clear();
- CurrentOpChunkSizes.clear();
- CurrentOpAttachmentsSize = 0;
-
- ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock);
- ZEN_ASSERT(PendingBlockSize <= UsableBlockSize);
- if (PendingBlockSize == UsableBlockSize || PendingChunkHashes.size() == MaxChunksPerBlock)
- {
- NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc);
- PendingChunkHashes.clear();
- PendingBlockSize = 0;
- ComposedBlocks++;
- }
-
- ZEN_ASSERT(PendingBlockSize < UsableBlockSize);
- ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock);
- }
- else if (PendingBlockSize > (UsableBlockSize * 3) / 4)
- {
- // Our ops does not fit in the current block and the current block is using 75% of max size so lets move our op
- // to the next block...
- ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock);
- ZEN_ASSERT(PendingBlockSize <= UsableBlockSize);
- NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc);
- PendingChunkHashes.clear();
- PendingBlockSize = 0;
- ComposedBlocks++;
-
- ZEN_ASSERT(PendingBlockSize < UsableBlockSize);
- ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock);
- }
- else
- {
- // Fit as many as possible in current block...
- // I think this is dead code - either the CurrentOpRawHashes fills an entire block and the
- // CurrentOpFillFullBlock is set, or the
-
- ZEN_ASSERT(PendingBlockSize < UsableBlockSize);
- ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock);
-
- size_t AddedChunkCount = 0;
- uint64_t AddedChunkSize = 0;
-
- for (size_t CurrentChunkIndex = 0; CurrentChunkIndex < CurrentOpRawHashes.size(); CurrentChunkIndex++)
- {
- uint64_t ChunkSize = CurrentOpChunkSizes[CurrentChunkIndex];
- if (PendingBlockSize + ChunkSize > UsableBlockSize)
- {
- break;
- }
- if (PendingChunkHashes.size() == MaxChunksPerBlock)
- {
- break;
- }
- PendingBlockSize += ChunkSize;
- PendingChunkHashes.push_back(CurrentOpRawHashes[CurrentChunkIndex]);
- AddedChunkSize += ChunkSize;
- AddedChunkCount++;
-
- ZEN_ASSERT(PendingBlockSize <= UsableBlockSize);
- ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock);
- }
-
- ZEN_ASSERT(PendingBlockSize <= UsableBlockSize);
- ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock);
-
- ZEN_ASSERT(AddedChunkCount < CurrentOpRawHashes.size());
-
- NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize,*/ FetchResolveFunc);
- PendingChunkHashes.clear();
- PendingBlockSize = 0;
- ComposedBlocks++;
-
- CurrentOpRawHashes.erase(CurrentOpRawHashes.begin(), CurrentOpRawHashes.begin() + AddedChunkCount);
- CurrentOpChunkSizes.erase(CurrentOpChunkSizes.begin(), CurrentOpChunkSizes.begin() + AddedChunkCount);
- CurrentOpAttachmentsSize -= AddedChunkSize;
-
- ZEN_ASSERT(CurrentOpRawHashes.size() == CurrentOpChunkSizes.size());
- // ZEN_ASSERT(CurrentOpAttachmentsSize == std::accumulate(CurrentOpChunkSizes.begin(),
- // CurrentOpChunkSizes.end(), uint64_t(0u)));
-
- ZEN_ASSERT(PendingBlockSize < UsableBlockSize);
- ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock);
- }
-
- ZEN_ASSERT(PendingBlockSize < UsableBlockSize);
- ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock);
- }
+ ChunksInBlock.emplace_back(std::make_pair(AttachmentHash, FetchAttachmentResolver(AttachmentHash)));
+ }
- if (ChunksAssembled % 1000 == 0)
- {
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Assembling blocks"sv,
- fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
- ChunkAssembleCount,
- ChunkAssembleCount - ChunksAssembled,
- AssembleBlocksProgressTimer.GetElapsedTimeMs());
- }
+ size_t BlockIndex = remotestore_impl::AddBlock(BlocksLock, Blocks);
+ if (BuildBlocks)
+ {
+ remotestore_impl::AsyncCreateBlock(WorkerPool,
+ BlockCreateLatch,
+ std::move(ChunksInBlock),
+ BlocksLock,
+ Blocks,
+ BlockIndex,
+ AsyncOnBlock,
+ RemoteResult);
}
- if (!PendingChunkHashes.empty())
+ else
{
- ZEN_ASSERT(PendingBlockSize < UsableBlockSize);
- ZEN_ASSERT(PendingChunkHashes.size() < MaxChunksPerBlock);
+ ZEN_INFO("Bulk group {} attachments", ChunkCount);
- size_t PendingChunkCount = PendingChunkHashes.size();
- ZEN_ASSERT(PendingChunkHashes.size() <= MaxChunksPerBlock);
- ZEN_ASSERT(PendingBlockSize <= UsableBlockSize);
- NewBlock(std::move(PendingChunkHashes), /*PendingBlockSize, */ FetchResolveFunc);
- PendingChunkHashes.clear();
- ComposedBlocks++;
+ // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
+ RwLock::SharedLockScope _(BlocksLock);
+ Blocks[BlockIndex].ChunkRawHashes = std::move(ChunkRawHashes);
+ OnBlockChunks(std::move(ChunksInBlock));
+ }
+
+ ChunksAssembled += ChunkCount;
+ ComposedBlocks++;
- ChunksAssembled += PendingChunkCount;
+ if (ChunksAssembled % 1000 == 0)
+ {
+ remotestore_impl::ReportProgress(
+ OptionalContext,
+ "Assembling blocks"sv,
+ fmt::format("{} attachments processed, {} blocks assembled", ChunksAssembled, ComposedBlocks),
+ ChunkAssembleCount,
+ ChunkAssembleCount - ChunksAssembled,
+ AssembleBlocksProgressTimer.GetElapsedTimeMs());
}
};
- Stopwatch AssembleBlocksProgressTimer;
{
std::vector<IoHash> AttachmentHashes;
AttachmentHashes.reserve(SortedUploadAttachments.size());
@@ -3045,15 +3037,12 @@ BuildContainer(CidStore& ChunkStore,
}
};
- ComposeBlocks(MaxBlockSize,
- MaxChunksPerBlock,
- MaxChunkEmbedSize,
- AttachmentHashes,
- AttachmentSizes,
- AttachmentKeys,
- FetchWholeAttachmentResolver,
- OptionalContext,
- RemoteResult);
+ Composer.Compose(AttachmentHashes,
+ AttachmentSizes,
+ AttachmentKeys,
+ [&OnNewBlock, &FetchWholeAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) {
+ OnNewBlock(std::move(ChunkRawHashes), FetchWholeAttachmentResolver);
+ });
}
{
@@ -3135,15 +3124,12 @@ BuildContainer(CidStore& ChunkStore,
}
};
- ComposeBlocks(MaxBlockSize,
- MaxChunksPerBlock,
- MaxChunkEmbedSize,
- AttachmentHashes,
- AttachmentSizes,
- AttachmentKeys,
- ChunkedFileAttachmentResolver,
- OptionalContext,
- RemoteResult);
+ Composer.Compose(AttachmentHashes,
+ AttachmentSizes,
+ AttachmentKeys,
+ [&OnNewBlock, &ChunkedFileAttachmentResolver](std::vector<IoHash>&& ChunkRawHashes) {
+ OnNewBlock(std::move(ChunkRawHashes), ChunkedFileAttachmentResolver);
+ });
}
if (ChunkAssembleCount > 0)