From da4826d560a66b8a5f09158a93c83caa12348c7b Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 20 Feb 2026 10:32:32 +0100 Subject: move partial chunk block anailsys to chunkblock.h/cpp (#767) --- .../builds/buildstorageoperations.cpp | 882 ++++++--------------- src/zenremotestore/chunking/chunkblock.cpp | 540 ++++++++++++- .../zenremotestore/builds/buildstorageoperations.h | 50 +- .../include/zenremotestore/chunking/chunkblock.h | 94 +++ 4 files changed, 887 insertions(+), 679 deletions(-) diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 72e06767a..4f1b07c37 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -899,343 +899,213 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) CheckRequiredDiskSpace(RemotePathToRemoteIndex); + BlobsExistsResult ExistsResult; { - ZEN_TRACE_CPU("WriteChunks"); - - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount); + ChunkBlockAnalyser BlockAnalyser(m_LogOutput, + m_BlockDescriptions, + ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, .IsVerbose = m_Options.IsVerbose}); - Stopwatch WriteTimer; + std::vector NeededBlocks = BlockAnalyser.GetNeeded( + m_RemoteLookup.ChunkHashToChunkIndex, + [&](uint32_t RemoteChunkIndex) -> bool { return RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]; }); - FilteredRate FilteredDownloadedBytesPerSecond; - FilteredRate FilteredWrittenBytesPerSecond; - - std::unique_ptr WriteProgressBarPtr( - m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing")); - OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - struct LooseChunkHashWorkData - { - std::vector ChunkTargetPtrs; - uint32_t RemoteChunkIndex = (uint32_t)-1; - }; - - std::vector LooseChunkHashWorks; - TotalPartWriteCount += CopyChunkDatas.size(); - TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); + std::vector FetchBlockIndexes; + std::vector CachedChunkBlockIndexes; - for (const IoHash ChunkHash : m_LooseChunkHashes) { - auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); - const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; - if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + ZEN_TRACE_CPU("BlockCacheFileExists"); + for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks) { - if (m_Options.IsVerbose) + if (m_Options.PrimeCacheOnly) { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash); - } - continue; - } - bool NeedsCopy = true; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) - { - std::vector ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - - if (ChunkTargetPtrs.empty()) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash); - } + FetchBlockIndexes.push_back(NeededBlock.BlockIndex); } else { - TotalRequestCount++; - TotalPartWriteCount++; - LooseChunkHashWorks.push_back( - LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); - } - } - } - - uint32_t BlockCount = gsl::narrow(m_BlockDescriptions.size()); - - std::vector ChunkIsPickedUpByBlock(m_RemoteContent.ChunkedContent.ChunkHashes.size(), false); - auto GetNeededChunkBlockIndexes = [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &ChunkIsPickedUpByBlock]( - const ChunkBlockDescription& BlockDescription) { - ZEN_TRACE_CPU("GetNeededChunkBlockIndexes"); - std::vector NeededBlockChunkIndexes; - for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) - { - const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; - if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t RemoteChunkIndex = It->second; - if (!ChunkIsPickedUpByBlock[RemoteChunkIndex]) + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; + bool UsingCachedBlock = false; + if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) { - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) + TotalPartWriteCount++; + + std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); + if (IsFile(BlockPath)) { - ChunkIsPickedUpByBlock[RemoteChunkIndex] = true; - NeededBlockChunkIndexes.push_back(ChunkBlockIndex); + CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex); + UsingCachedBlock = true; } } - } - else - { - ZEN_DEBUG("Chunk {} not found in block {}", ChunkHash, BlockDescription.BlockHash); + if (!UsingCachedBlock) + { + FetchBlockIndexes.push_back(NeededBlock.BlockIndex); + } } } - return NeededBlockChunkIndexes; - }; + } - std::vector CachedChunkBlockIndexes; - std::vector FetchBlockIndexes; - std::vector> AllBlockChunkIndexNeeded; + std::vector NeededLooseChunkIndexes; - for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - - std::vector BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); - if (!BlockChunkIndexNeeded.empty()) + NeededLooseChunkIndexes.reserve(m_LooseChunkHashes.size()); + for (uint32_t LooseChunkIndex = 0; LooseChunkIndex < m_LooseChunkHashes.size(); LooseChunkIndex++) { - if (m_Options.PrimeCacheOnly) + const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; + auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); + const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; + + if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { - FetchBlockIndexes.push_back(BlockIndex); + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Skipping chunk {} due to cache reuse", + m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); + } + continue; } - else + + bool NeedsCopy = true; + if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) { - bool UsingCachedBlock = false; - if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) + uint64_t WriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + if (WriteCount == 0) { - TotalPartWriteCount++; - - std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - if (IsFile(BlockPath)) + if (m_Options.IsVerbose) { - CachedChunkBlockIndexes.push_back(BlockIndex); - UsingCachedBlock = true; + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Skipping chunk {} due to cache reuse", + m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); } } - if (!UsingCachedBlock) + else { - FetchBlockIndexes.push_back(BlockIndex); + NeededLooseChunkIndexes.push_back(LooseChunkIndex); } } } - AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded)); } - BlobsExistsResult ExistsResult; - if (m_Storage.BuildCacheStorage) { ZEN_TRACE_CPU("BlobCacheExistCheck"); Stopwatch Timer; - tsl::robin_set BlobHashesSet; + std::vector BlobHashes; + BlobHashes.reserve(NeededLooseChunkIndexes.size() + FetchBlockIndexes.size()); - BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size()); - for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks) + for (const uint32_t LooseChunkIndex : NeededLooseChunkIndexes) { - BlobHashesSet.insert(m_RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]); + BlobHashes.push_back(m_LooseChunkHashes[LooseChunkIndex]); } + for (uint32_t BlockIndex : FetchBlockIndexes) { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - BlobHashesSet.insert(BlockDescription.BlockHash); + BlobHashes.push_back(m_BlockDescriptions[BlockIndex].BlockHash); } - if (!BlobHashesSet.empty()) - { - const std::vector BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end()); - const std::vector CacheExistsResult = - m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); + const std::vector CacheExistsResult = + m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); - if (CacheExistsResult.size() == BlobHashes.size()) + if (CacheExistsResult.size() == BlobHashes.size()) + { + ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); + for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) { - ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); - for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) + if (CacheExistsResult[BlobIndex].HasBody) { - if (CacheExistsResult[BlobIndex].HasBody) - { - ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); - } + ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); } } - ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); - if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Remote cache : Found {} out of {} needed blobs in {}", - ExistsResult.ExistingBlobs.size(), - BlobHashes.size(), - NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); - } + } + ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Remote cache : Found {} out of {} needed blobs in {}", + ExistsResult.ExistingBlobs.size(), + BlobHashes.size(), + NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); } } - std::vector BlockRangeWorks; - std::vector FullBlockWorks; + std::vector BlockPartialDownloadModes; + if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off) { - Stopwatch Timer; - - std::vector PartialBlockIndexes; - - for (uint32_t BlockIndex : FetchBlockIndexes) + BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + } + else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All) + { + BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::On); + } + else + { + BlockPartialDownloadModes.reserve(m_BlockDescriptions.size()); + for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - - const std::vector BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]); - if (!BlockChunkIndexNeeded.empty()) + const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash); + if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly) { - bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); - bool CanDoPartialBlockDownload = - (BlockDescription.HeaderSize > 0) && - (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); - - bool AllowedToDoPartialRequest = false; - bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); - switch (m_Options.PartialBlockRequestMode) - { - case EPartialBlockRequestMode::Off: - break; - case EPartialBlockRequestMode::ZenCacheOnly: - AllowedToDoPartialRequest = BlockExistInCache; - break; - case EPartialBlockRequestMode::Mixed: - case EPartialBlockRequestMode::All: - AllowedToDoPartialRequest = true; - break; - default: - ZEN_ASSERT(false); - break; - } + BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::On + : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + } + else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed) + { + BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::On + : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange); + } + } + } + ZEN_ASSERT(BlockPartialDownloadModes.size() == m_BlockDescriptions.size()); - const uint32_t ChunkStartOffsetInBlock = - gsl::narrow(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + ChunkBlockAnalyser::BlockResult PartialBlocks = + BlockAnalyser.CalculatePartialBlockDownloads(NeededBlocks, BlockPartialDownloadModes); - const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(ChunkStartOffsetInBlock)); + struct LooseChunkHashWorkData + { + std::vector ChunkTargetPtrs; + uint32_t RemoteChunkIndex = (uint32_t)-1; + }; - if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) - { - ZEN_TRACE_CPU("PartialBlockAnalysis"); - - bool LimitToSingleRange = - BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed; - uint64_t TotalWantedChunksSize = 0; - std::optional> MaybeBlockRanges = - CalculateBlockRanges(BlockIndex, - BlockDescription, - BlockChunkIndexNeeded, - LimitToSingleRange, - ChunkStartOffsetInBlock, - TotalBlockSize, - TotalWantedChunksSize); - ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize); - - if (MaybeBlockRanges.has_value()) - { - const std::vector& BlockRanges = MaybeBlockRanges.value(); - ZEN_ASSERT(!BlockRanges.empty()); - BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end()); - TotalRequestCount += BlockRanges.size(); - TotalPartWriteCount += BlockRanges.size(); - - uint64_t RequestedSize = std::accumulate( - BlockRanges.begin(), - BlockRanges.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - PartialBlockIndexes.push_back(BlockIndex); - - if (RequestedSize > TotalWantedChunksSize) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO( - m_LogOutput, - "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})", - BlockChunkIndexNeeded.size(), - NiceBytes(RequestedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - BlockRanges.size(), - NiceBytes(RequestedSize - TotalWantedChunksSize)); - } - } - } - else - { - FullBlockWorks.push_back(BlockIndex); - TotalRequestCount++; - TotalPartWriteCount++; - } - } - else - { - FullBlockWorks.push_back(BlockIndex); - TotalRequestCount++; - TotalPartWriteCount++; - } - } - } + TotalRequestCount += NeededLooseChunkIndexes.size(); + TotalPartWriteCount += NeededLooseChunkIndexes.size(); + TotalRequestCount += PartialBlocks.BlockRanges.size(); + TotalPartWriteCount += PartialBlocks.BlockRanges.size(); + TotalRequestCount += PartialBlocks.FullBlockIndexes.size(); + TotalPartWriteCount += PartialBlocks.FullBlockIndexes.size(); - if (!PartialBlockIndexes.empty()) - { - uint64_t TotalFullBlockRequestBytes = 0; - for (uint32_t BlockIndex : FullBlockWorks) - { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - uint32_t CurrentOffset = - gsl::narrow(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + std::vector LooseChunkHashWorks; + for (uint32_t LooseChunkIndex : NeededLooseChunkIndexes) + { + const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; + auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); + const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; - TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(CurrentOffset)); - } + std::vector ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - uint64_t TotalPartialBlockBytes = 0; - for (uint32_t BlockIndex : PartialBlockIndexes) - { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - uint32_t CurrentOffset = - gsl::narrow(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + ZEN_ASSERT(!ChunkTargetPtrs.empty()); + LooseChunkHashWorks.push_back( + LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); + } - TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(CurrentOffset)); - } + ZEN_TRACE_CPU("WriteChunks"); - uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes; + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount); - const uint64_t TotalPartialBlockRequestBytes = - std::accumulate(BlockRangeWorks.begin(), - BlockRangeWorks.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size(); + Stopwatch WriteTimer; - uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes; - double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes; + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredWrittenBytesPerSecond; - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Analysis of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra " - "requests. Completed in {}", - NiceBytes(TotalSavedBlocksSize), - NiceBytes(NonPartialTotalBlockBytes), - SavedSizePercent, - TotalExtraPartialBlocksRequests, - NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); - } - } - } + std::unique_ptr WriteProgressBarPtr( + m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing")); + OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + TotalPartWriteCount += CopyChunkDatas.size(); + TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); BufferedWriteFileCache WriteCache; @@ -1465,13 +1335,23 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) }); } - for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) + for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();) { ZEN_ASSERT(!m_Options.PrimeCacheOnly); if (m_AbortFlag) { break; } + + size_t RangeCount = 1; + size_t RangesLeft = PartialBlocks.BlockRanges.size() - BlockRangeIndex; + const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocks.BlockRanges[BlockRangeIndex]; + while (RangeCount < RangesLeft && + CurrentBlockRange.BlockIndex == PartialBlocks.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) + { + RangeCount++; + } + Work.ScheduleWork( m_NetworkPool, [this, @@ -1485,119 +1365,127 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) TotalPartWriteCount, &FilteredWrittenBytesPerSecond, &Work, - &BlockRangeWorks, - BlockRangeIndex](std::atomic&) { + &PartialBlocks, + BlockRangeStartIndex = BlockRangeIndex, + RangeCount](std::atomic&) { if (!m_AbortFlag) { - ZEN_TRACE_CPU("Async_GetPartialBlock"); - - const BlockRangeDescriptor& BlockRange = BlockRangeWorks[BlockRangeIndex]; + ZEN_TRACE_CPU("Async_GetPartialBlockRanges"); FilteredDownloadedBytesPerSecond.Start(); - DownloadPartialBlock( - BlockRange, - ExistsResult, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalRequestCount, - TotalPartWriteCount, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - if (!m_AbortFlag) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - &BlockRange, - BlockChunkPath = std::filesystem::path(OnDiskPath), - BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WritePartialBlock"); + for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + RangeCount; + BlockRangeIndex++) + { + ZEN_TRACE_CPU("GetPartialBlock"); - const uint32_t BlockIndex = BlockRange.BlockIndex; + const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = PartialBlocks.BlockRanges[BlockRangeIndex]; - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + DownloadPartialBlock( + BlockRange, + ExistsResult, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WritePartsComplete, + &WriteCache, + &Work, + TotalRequestCount, + TotalPartWriteCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockPartialBuffer); - } - else + if (!m_AbortFlag) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WritePartsComplete, + &WriteCache, + &Work, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + &BlockRange, + BlockChunkPath = std::filesystem::path(OnDiskPath), + BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic&) mutable { + if (!m_AbortFlag) { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) + ZEN_TRACE_CPU("Async_WritePartialBlock"); + + const uint32_t BlockIndex = BlockRange.BlockIndex; + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + if (BlockChunkPath.empty()) { - throw std::runtime_error( - fmt::format("Could not open downloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) + { + throw std::runtime_error( + fmt::format("Could not open downloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } } - } - - FilteredWrittenBytesPerSecond.Start(); - if (!WritePartialBlockChunksToCache( - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - CompositeBuffer(std::move(BlockPartialBuffer)), - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); - } + FilteredWrittenBytesPerSecond.Start(); + + if (!WritePartialBlockChunksToCache( + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + CompositeBuffer(std::move(BlockPartialBuffer)), + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } - std::error_code Ec = TryRemoveFile(BlockChunkPath); - if (Ec) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - BlockChunkPath, - Ec.value(), - Ec.message()); - } + std::error_code Ec = TryRemoveFile(BlockChunkPath); + if (Ec) + { + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, + "Failed removing file '{}', reason: ({}) {}", + BlockChunkPath, + Ec.value(), + Ec.message()); + } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - } - }, - OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog - : WorkerThreadPool::EMode::EnableBacklog); - } - }); + }, + OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog + : WorkerThreadPool::EMode::EnableBacklog); + } + }); + } } }); + BlockRangeIndex += RangeCount; } - for (uint32_t BlockIndex : FullBlockWorks) + for (uint32_t BlockIndex : PartialBlocks.FullBlockIndexes) { if (m_AbortFlag) { @@ -3282,271 +3170,9 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde } } -BuildsOperationUpdateFolder::BlockRangeDescriptor -BuildsOperationUpdateFolder::MergeBlockRanges(std::span Ranges) -{ - ZEN_ASSERT(Ranges.size() > 1); - const BlockRangeDescriptor& First = Ranges.front(); - const BlockRangeDescriptor& Last = Ranges.back(); - - return BlockRangeDescriptor{.BlockIndex = First.BlockIndex, - .RangeStart = First.RangeStart, - .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart, - .ChunkBlockIndexStart = First.ChunkBlockIndexStart, - .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart}; -} - -std::optional> -BuildsOperationUpdateFolder::MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range) -{ - if (Range.RangeLength == TotalBlockSize) - { - return {}; - } - else - { - return std::vector{Range}; - } -}; - -const BuildsOperationUpdateFolder::BlockRangeLimit* -BuildsOperationUpdateFolder::GetBlockRangeLimitForRange(std::span Limits, - uint64_t TotalBlockSize, - std::span Ranges) -{ - if (Ranges.size() > 1) - { - const std::uint64_t WantedSize = - std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { - return Current + Range.RangeLength; - }); - - const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize; - - for (const BlockRangeLimit& Limit : Limits) - { - if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount) - { - return &Limit; - } - } - } - return nullptr; -}; - -std::vector -BuildsOperationUpdateFolder::CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span BlockRanges) -{ - ZEN_ASSERT(BlockRanges.size() > 1); - std::vector CollapsedBlockRanges; - - auto BlockRangesIt = BlockRanges.begin(); - CollapsedBlockRanges.push_back(*BlockRangesIt++); - for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++) - { - BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); - - const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength; - - const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); - if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap)) - { - LastRange.ChunkBlockIndexCount = - (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; - LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart; - } - else - { - CollapsedBlockRanges.push_back(*BlockRangesIt); - } - } - - return CollapsedBlockRanges; -}; - -uint64_t -BuildsOperationUpdateFolder::CalculateNextGap(std::span BlockRanges) -{ - ZEN_ASSERT(BlockRanges.size() > 1); - uint64_t AcceptableGap = (uint64_t)-1; - for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++) - { - const BlockRangeDescriptor& Range = BlockRanges[RangeIndex]; - const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1]; - - const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength); - AcceptableGap = Min(Gap, AcceptableGap); - } - AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u); - return AcceptableGap; -}; - -std::optional> -BuildsOperationUpdateFolder::CalculateBlockRanges(uint32_t BlockIndex, - const ChunkBlockDescription& BlockDescription, - std::span BlockChunkIndexNeeded, - bool LimitToSingleRange, - const uint64_t ChunkStartOffsetInBlock, - const uint64_t TotalBlockSize, - uint64_t& OutTotalWantedChunksSize) -{ - ZEN_TRACE_CPU("CalculateBlockRanges"); - - std::vector BlockRanges; - { - uint64_t CurrentOffset = ChunkStartOffsetInBlock; - uint32_t ChunkBlockIndex = 0; - uint32_t NeedBlockChunkIndexOffset = 0; - BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; - while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) - { - const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; - if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) - { - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - NextRange = {.BlockIndex = BlockIndex}; - } - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - } - else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) - { - if (NextRange.RangeLength == 0) - { - NextRange.RangeStart = CurrentOffset; - NextRange.ChunkBlockIndexStart = ChunkBlockIndex; - } - NextRange.RangeLength += ChunkCompressedLength; - NextRange.ChunkBlockIndexCount++; - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - NeedBlockChunkIndexOffset++; - } - else - { - ZEN_ASSERT(false); - } - } - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - } - } - ZEN_ASSERT(!BlockRanges.empty()); - - OutTotalWantedChunksSize = - std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { - return Current + Range.RangeLength; - }); - - double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize; - - if (BlockRanges.size() == 1) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Range request of {} ({:.2f}%) using single range from block {} ({}) as is", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize)); - } - return BlockRanges; - } - - if (LimitToSingleRange) - { - const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); - if (m_Options.IsVerbose) - { - const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; - const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; - - ZEN_OPERATION_LOG_INFO( - m_LogOutput, - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting " - "{:.2f}% ({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - NiceBytes(MergedRange.RangeLength), - RangeRequestedPercent, - WastedPercent, - NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); - } - return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); - } - - if (RangeWantedPercent > FullBlockRangePercentLimit) - { - const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); - if (m_Options.IsVerbose) - { - const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; - const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; - - ZEN_OPERATION_LOG_INFO( - m_LogOutput, - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} " - "({:.2f}%) wasting {:.2f}% ({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - FullBlockRangePercentLimit, - NiceBytes(MergedRange.RangeLength), - RangeRequestedPercent, - WastedPercent, - NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); - } - return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); - } - - std::vector CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges); - while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges)) - { - CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges); - } - - const std::uint64_t WantedCollapsedSize = - std::accumulate(CollapsedBlockRanges.begin(), - CollapsedBlockRanges.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - - const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize; - - if (m_Options.IsVerbose) - { - const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize; - - ZEN_OPERATION_LOG_INFO( - m_LogOutput, - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% " - "({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - NiceBytes(WantedCollapsedSize), - CollapsedRangeRequestedPercent, - CollapsedBlockRanges.size(), - WastedPercent, - NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize)); - } - return CollapsedBlockRanges; -} - void BuildsOperationUpdateFolder::DownloadPartialBlock( - const BlockRangeDescriptor BlockRange, + const ChunkBlockAnalyser::BlockRangeDescriptor BlockRange, const BlobsExistsResult& ExistsResult, std::function&& OnDownloaded) { diff --git a/src/zenremotestore/chunking/chunkblock.cpp b/src/zenremotestore/chunking/chunkblock.cpp index c4d8653f4..06cedae3f 100644 --- a/src/zenremotestore/chunking/chunkblock.cpp +++ b/src/zenremotestore/chunking/chunkblock.cpp @@ -10,18 +10,17 @@ #include +#include #include ZEN_THIRD_PARTY_INCLUDES_START #include +#include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include - -# include -# include #endif // ZEN_WITH_TESTS namespace zen { @@ -455,6 +454,537 @@ FindReuseBlocks(OperationLogOutput& Output, return FilteredReuseBlockIndexes; } +ChunkBlockAnalyser::ChunkBlockAnalyser(OperationLogOutput& LogOutput, + std::span BlockDescriptions, + const Options& Options) +: m_LogOutput(LogOutput) +, m_BlockDescriptions(BlockDescriptions) +, m_Options(Options) +{ +} + +std::vector +ChunkBlockAnalyser::GetNeeded(const tsl::robin_map& ChunkHashToChunkIndex, + std::function&& NeedsBlockChunk) +{ + ZEN_TRACE_CPU("ChunkBlockAnalyser::GetNeeded"); + + std::vector Result; + + std::vector ChunkIsNeeded(ChunkHashToChunkIndex.size()); + for (uint32_t ChunkIndex = 0; ChunkIndex < ChunkHashToChunkIndex.size(); ChunkIndex++) + { + ChunkIsNeeded[ChunkIndex] = NeedsBlockChunk(ChunkIndex); + } + + std::vector BlockSlack(m_BlockDescriptions.size(), 0u); + for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + uint64_t BlockUsedSize = 0; + uint64_t BlockSize = 0; + + for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) + { + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + if (auto It = ChunkHashToChunkIndex.find(ChunkHash); It != ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = It->second; + if (ChunkIsNeeded[RemoteChunkIndex]) + { + BlockUsedSize += BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + } + } + BlockSize += BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + } + BlockSlack[BlockIndex] = BlockSize - BlockUsedSize; + } + + std::vector BlockOrder(m_BlockDescriptions.size()); + std::iota(BlockOrder.begin(), BlockOrder.end(), 0); + + std::sort(BlockOrder.begin(), BlockOrder.end(), [&BlockSlack](uint32_t Lhs, uint32_t Rhs) { + return BlockSlack[Lhs] < BlockSlack[Rhs]; + }); + + std::vector ChunkIsPickedUp(ChunkHashToChunkIndex.size(), false); + + for (uint32_t BlockIndex : BlockOrder) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + std::vector BlockChunkIndexNeeded; + + for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) + { + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + if (auto It = ChunkHashToChunkIndex.find(ChunkHash); It != ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = It->second; + if (ChunkIsNeeded[RemoteChunkIndex]) + { + if (!ChunkIsPickedUp[RemoteChunkIndex]) + { + ChunkIsPickedUp[RemoteChunkIndex] = true; + BlockChunkIndexNeeded.push_back(ChunkBlockIndex); + } + } + } + else + { + ZEN_DEBUG("Chunk {} not found in block {}", ChunkHash, BlockDescription.BlockHash); + } + } + + if (!BlockChunkIndexNeeded.empty()) + { + Result.push_back(NeededBlock{.BlockIndex = BlockIndex, .ChunkIndexes = std::move(BlockChunkIndexNeeded)}); + } + } + return Result; +} + +ChunkBlockAnalyser::BlockResult +ChunkBlockAnalyser::CalculatePartialBlockDownloads(std::span NeededBlocks, + std::span BlockPartialDownloadModes) +{ + ZEN_TRACE_CPU("ChunkBlockAnalyser::CalculatePartialBlockDownloads"); + + Stopwatch PartialAnalisysTimer; + + ChunkBlockAnalyser::BlockResult Result; + + uint64_t IdealDownloadTotalSize = 0; + uint64_t AllBlocksTotalBlocksSize = 0; + + for (const NeededBlock& NeededBlock : NeededBlocks) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; + + std::span BlockChunkIndexNeeded(NeededBlock.ChunkIndexes); + if (!NeededBlock.ChunkIndexes.empty()) + { + bool WantsToDoPartialBlockDownload = NeededBlock.ChunkIndexes.size() < BlockDescription.ChunkRawHashes.size(); + bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && + (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); + + EPartialBlockDownloadMode PartialBlockDownloadMode = BlockPartialDownloadModes[NeededBlock.BlockIndex]; + + const uint32_t ChunkStartOffsetInBlock = + gsl::narrow(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(ChunkStartOffsetInBlock)); + + AllBlocksTotalBlocksSize += TotalBlockSize; + + if ((PartialBlockDownloadMode != EPartialBlockDownloadMode::Off) && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) + { + ZEN_TRACE_CPU("PartialBlockAnalysis"); + + uint64_t TotalWantedChunksSize = 0; + std::optional> MaybeBlockRanges = CalculateBlockRanges(NeededBlock.BlockIndex, + BlockDescription, + NeededBlock.ChunkIndexes, + PartialBlockDownloadMode, + ChunkStartOffsetInBlock, + TotalBlockSize, + TotalWantedChunksSize); + ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize); + IdealDownloadTotalSize += TotalWantedChunksSize; + + if (MaybeBlockRanges.has_value()) + { + const std::vector& BlockRanges = MaybeBlockRanges.value(); + ZEN_ASSERT(!BlockRanges.empty()); + + uint64_t RequestedSize = + std::accumulate(BlockRanges.begin(), + BlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + + if ((PartialBlockDownloadMode != EPartialBlockDownloadMode::Exact) && ((RequestedSize * 100) / TotalBlockSize) >= 200) + { + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Requesting {} chunks ({}) from block {} ({}) using full block request (extra bytes {})", + NeededBlock.ChunkIndexes.size(), + NiceBytes(RequestedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(TotalBlockSize - TotalWantedChunksSize)); + } + Result.FullBlockIndexes.push_back(NeededBlock.BlockIndex); + } + else + { + Result.BlockRanges.insert(Result.BlockRanges.end(), BlockRanges.begin(), BlockRanges.end()); + + if (RequestedSize > TotalWantedChunksSize) + { + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})", + NeededBlock.ChunkIndexes.size(), + NiceBytes(RequestedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + BlockRanges.size(), + NiceBytes(RequestedSize - TotalWantedChunksSize)); + } + } + } + } + else + { + Result.FullBlockIndexes.push_back(NeededBlock.BlockIndex); + } + } + else + { + Result.FullBlockIndexes.push_back(NeededBlock.BlockIndex); + IdealDownloadTotalSize += TotalBlockSize; + } + } + } + + if (!Result.BlockRanges.empty() && !m_Options.IsQuiet) + { + tsl::robin_set PartialBlockIndexes; + uint64_t PartialBlocksTotalSize = std::accumulate(Result.BlockRanges.begin(), + Result.BlockRanges.end(), + uint64_t(0u), + [&](uint64_t Current, const BlockRangeDescriptor& Range) { + PartialBlockIndexes.insert(Range.BlockIndex); + return Current + Range.RangeLength; + }); + + uint64_t FullBlocksTotalSize = + std::accumulate(Result.FullBlockIndexes.begin(), + Result.FullBlockIndexes.end(), + uint64_t(0u), + [&](uint64_t Current, uint32_t BlockIndex) { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + uint32_t CurrentOffset = + gsl::narrow(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + return Current + std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(CurrentOffset)); + }); + + uint64_t PartialBlockRequestCount = Result.BlockRanges.size(); + uint64_t PartialBlockCount = PartialBlockIndexes.size(); + + uint64_t TotalExtraPartialBlocksRequestCount = PartialBlockRequestCount - PartialBlockCount; + uint64_t ActualPartialDownloadTotalSize = FullBlocksTotalSize + PartialBlocksTotalSize; + + uint64_t IdealSkippedSize = AllBlocksTotalBlocksSize - IdealDownloadTotalSize; + uint64_t ActualSkippedSize = AllBlocksTotalBlocksSize - ActualPartialDownloadTotalSize; + + double PercentOfIdealPartialSkippedSize = (ActualSkippedSize * 100.0) / IdealSkippedSize; + + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Analysis of partial block requests saves download of {} out of {}, {:.1f}% of possible {} using {} extra " + "requests. Completed in {}", + NiceBytes(ActualSkippedSize), + NiceBytes(AllBlocksTotalBlocksSize), + PercentOfIdealPartialSkippedSize, + NiceBytes(IdealSkippedSize), + TotalExtraPartialBlocksRequestCount, + NiceTimeSpanMs(PartialAnalisysTimer.GetElapsedTimeMs())); + } + + return Result; +} + +ChunkBlockAnalyser::BlockRangeDescriptor +ChunkBlockAnalyser::MergeBlockRanges(std::span Ranges) +{ + ZEN_ASSERT(Ranges.size() > 1); + const BlockRangeDescriptor& First = Ranges.front(); + const BlockRangeDescriptor& Last = Ranges.back(); + + return BlockRangeDescriptor{.BlockIndex = First.BlockIndex, + .RangeStart = First.RangeStart, + .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart, + .ChunkBlockIndexStart = First.ChunkBlockIndexStart, + .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart}; +} + +std::optional> +ChunkBlockAnalyser::MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range) +{ + if (Range.RangeLength == TotalBlockSize) + { + return {}; + } + else + { + return std::vector{Range}; + } +}; + +const ChunkBlockAnalyser::BlockRangeLimit* +ChunkBlockAnalyser::GetBlockRangeLimitForRange(std::span Limits, + uint64_t TotalBlockSize, + std::span Ranges) +{ + if (Ranges.size() > 1) + { + const std::uint64_t WantedSize = + std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { + return Current + Range.RangeLength; + }); + + const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize; + + for (const BlockRangeLimit& Limit : Limits) + { + if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount) + { + return &Limit; + } + } + } + return nullptr; +}; + +std::vector +ChunkBlockAnalyser::CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span BlockRanges) +{ + ZEN_ASSERT(BlockRanges.size() > 1); + std::vector CollapsedBlockRanges; + + auto BlockRangesIt = BlockRanges.begin(); + CollapsedBlockRanges.push_back(*BlockRangesIt++); + for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++) + { + BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); + + const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength; + + const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); + if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap)) + { + LastRange.ChunkBlockIndexCount = + (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; + LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart; + } + else + { + CollapsedBlockRanges.push_back(*BlockRangesIt); + } + } + + return CollapsedBlockRanges; +}; + +uint64_t +ChunkBlockAnalyser::CalculateNextGap(std::span BlockRanges) +{ + ZEN_ASSERT(BlockRanges.size() > 1); + uint64_t AcceptableGap = (uint64_t)-1; + for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++) + { + const BlockRangeDescriptor& Range = BlockRanges[RangeIndex]; + const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1]; + + const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength); + AcceptableGap = Min(Gap, AcceptableGap); + } + AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u); + return AcceptableGap; +}; + +std::optional> +ChunkBlockAnalyser::CalculateBlockRanges(uint32_t BlockIndex, + const ChunkBlockDescription& BlockDescription, + std::span BlockChunkIndexNeeded, + EPartialBlockDownloadMode PartialBlockDownloadMode, + const uint64_t ChunkStartOffsetInBlock, + const uint64_t TotalBlockSize, + uint64_t& OutTotalWantedChunksSize) +{ + ZEN_TRACE_CPU("CalculateBlockRanges"); + + if (PartialBlockDownloadMode == EPartialBlockDownloadMode::Off) + { + return {}; + } + + std::vector BlockRanges; + { + uint64_t CurrentOffset = ChunkStartOffsetInBlock; + uint32_t ChunkBlockIndex = 0; + uint32_t NeedBlockChunkIndexOffset = 0; + BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; + while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) + { + const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + NextRange = {.BlockIndex = BlockIndex}; + } + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + } + else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + if (NextRange.RangeLength == 0) + { + NextRange.RangeStart = CurrentOffset; + NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + } + NextRange.RangeLength += ChunkCompressedLength; + NextRange.ChunkBlockIndexCount++; + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + NeedBlockChunkIndexOffset++; + } + else + { + ZEN_ASSERT(false); + } + } + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + } + } + ZEN_ASSERT(!BlockRanges.empty()); + + OutTotalWantedChunksSize = + std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { + return Current + Range.RangeLength; + }); + + double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize; + + if (BlockRanges.size() == 1) + { + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Range request of {} ({:.2f}%) using single range from block {} ({}) as is", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize)); + } + return BlockRanges; + } + + if (PartialBlockDownloadMode == EPartialBlockDownloadMode::Exact) + { + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize)); + } + return BlockRanges; + } + + if (PartialBlockDownloadMode == EPartialBlockDownloadMode::SingleRange) + { + const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); + if (m_Options.IsVerbose) + { + const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; + const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; + + ZEN_OPERATION_LOG_INFO( + m_LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting " + "{:.2f}% ({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(MergedRange.RangeLength), + RangeRequestedPercent, + WastedPercent, + NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); + } + return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); + } + + if (RangeWantedPercent > FullBlockRangePercentLimit) + { + const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); + if (m_Options.IsVerbose) + { + const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; + const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; + + ZEN_OPERATION_LOG_INFO( + m_LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} " + "({:.2f}%) wasting {:.2f}% ({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + FullBlockRangePercentLimit, + NiceBytes(MergedRange.RangeLength), + RangeRequestedPercent, + WastedPercent, + NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); + } + return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); + } + + std::vector CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges); + while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges)) + { + CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges); + } + + const std::uint64_t WantedCollapsedSize = + std::accumulate(CollapsedBlockRanges.begin(), + CollapsedBlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + + const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize; + + if (m_Options.IsVerbose) + { + const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize; + + ZEN_OPERATION_LOG_INFO( + m_LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% " + "({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(WantedCollapsedSize), + CollapsedRangeRequestedPercent, + CollapsedBlockRanges.size(), + WastedPercent, + NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize)); + } + return CollapsedBlockRanges; +} + #if ZEN_WITH_TESTS namespace testutils { @@ -476,7 +1006,7 @@ namespace testutils { } // namespace testutils -TEST_CASE("project.store.block") +TEST_CASE("chunkblock.block") { using namespace std::literals; using namespace testutils; @@ -504,7 +1034,7 @@ TEST_CASE("project.store.block") HeaderSize)); } -TEST_CASE("project.store.reuseblocks") +TEST_CASE("chunkblock.reuseblocks") { using namespace std::literals; using namespace testutils; diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 9e5bf8d91..6800444e0 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -218,33 +219,6 @@ private: uint64_t ElapsedTimeMs = 0; }; - struct BlockRangeDescriptor - { - uint32_t BlockIndex = (uint32_t)-1; - uint64_t RangeStart = 0; - uint64_t RangeLength = 0; - uint32_t ChunkBlockIndexStart = 0; - uint32_t ChunkBlockIndexCount = 0; - }; - - struct BlockRangeLimit - { - uint16_t SizePercent; - uint16_t MaxRangeCount; - }; - - static constexpr uint16_t FullBlockRangePercentLimit = 95; - - static constexpr BuildsOperationUpdateFolder::BlockRangeLimit ForceMergeLimits[] = { - {.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1}, - {.SizePercent = 90, .MaxRangeCount = 2}, - {.SizePercent = 85, .MaxRangeCount = 8}, - {.SizePercent = 80, .MaxRangeCount = 16}, - {.SizePercent = 70, .MaxRangeCount = 32}, - {.SizePercent = 60, .MaxRangeCount = 48}, - {.SizePercent = 2, .MaxRangeCount = 56}, - {.SizePercent = 0, .MaxRangeCount = 64}}; - void ScanCacheFolder(tsl::robin_map& OutCachedChunkHashesFound, tsl::robin_map& OutCachedSequenceHashesFound); void ScanTempBlocksFolder(tsl::robin_map& OutCachedBlocksFound); @@ -299,25 +273,9 @@ private: ParallelWork& Work, std::function&& OnDownloaded); - BlockRangeDescriptor MergeBlockRanges(std::span Ranges); - std::optional> MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, - const BlockRangeDescriptor& Range); - const BlockRangeLimit* GetBlockRangeLimitForRange(std::span Limits, - uint64_t TotalBlockSize, - std::span Ranges); - std::vector CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, - std::span BlockRanges); - uint64_t CalculateNextGap(std::span BlockRanges); - std::optional> CalculateBlockRanges(uint32_t BlockIndex, - const ChunkBlockDescription& BlockDescription, - std::span BlockChunkIndexNeeded, - bool LimitToSingleRange, - const uint64_t ChunkStartOffsetInBlock, - const uint64_t TotalBlockSize, - uint64_t& OutTotalWantedChunksSize); - void DownloadPartialBlock(const BlockRangeDescriptor BlockRange, - const BlobsExistsResult& ExistsResult, - std::function&& OnDownloaded); + void DownloadPartialBlock(const ChunkBlockAnalyser::BlockRangeDescriptor BlockRange, + const BlobsExistsResult& ExistsResult, + std::function&& OnDownloaded); std::vector WriteLocalChunkToCache(CloneQueryInterface* CloneQuery, const CopyChunkData& CopyData, diff --git a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h index d339b0f94..57710fcf5 100644 --- a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h +++ b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h @@ -7,6 +7,10 @@ #include #include +ZEN_THIRD_PARTY_INCLUDES_START +#include +ZEN_THIRD_PARTY_INCLUDES_END + #include #include @@ -73,6 +77,96 @@ std::vector FindReuseBlocks(OperationLogOutput& Output, std::span ChunkIndexes, std::vector& OutUnusedChunkIndexes); +class ChunkBlockAnalyser +{ +public: + struct Options + { + bool IsQuiet = false; + bool IsVerbose = false; + }; + + ChunkBlockAnalyser(OperationLogOutput& LogOutput, std::span BlockDescriptions, const Options& Options); + + struct BlockRangeDescriptor + { + uint32_t BlockIndex = (uint32_t)-1; + uint64_t RangeStart = 0; + uint64_t RangeLength = 0; + uint32_t ChunkBlockIndexStart = 0; + uint32_t ChunkBlockIndexCount = 0; + }; + + struct NeededBlock + { + uint32_t BlockIndex; + std::vector ChunkIndexes; + }; + + std::vector GetNeeded(const tsl::robin_map& ChunkHashToChunkIndex, + std::function&& NeedsBlockChunk); + + enum EPartialBlockDownloadMode + { + Off, + SingleRange, + On, + Exact + }; + + struct BlockResult + { + std::vector BlockRanges; + std::vector FullBlockIndexes; + }; + + BlockResult CalculatePartialBlockDownloads(std::span NeededBlocks, + std::span BlockPartialDownloadModes); + +private: + struct BlockRangeLimit + { + uint16_t SizePercent; + uint16_t MaxRangeCount; + }; + + static constexpr uint16_t FullBlockRangePercentLimit = 95; + + static constexpr BlockRangeLimit ForceMergeLimits[] = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1}, + {.SizePercent = 90, .MaxRangeCount = 2}, + {.SizePercent = 85, .MaxRangeCount = 8}, + {.SizePercent = 80, .MaxRangeCount = 16}, + {.SizePercent = 75, .MaxRangeCount = 32}, + {.SizePercent = 70, .MaxRangeCount = 48}, + {.SizePercent = 4, .MaxRangeCount = 82}, + {.SizePercent = 0, .MaxRangeCount = 96}}; + + BlockRangeDescriptor MergeBlockRanges(std::span Ranges); + std::optional> MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, + const BlockRangeDescriptor& Range); + const BlockRangeLimit* GetBlockRangeLimitForRange(std::span Limits, + uint64_t TotalBlockSize, + std::span Ranges); + std::vector CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, + std::span BlockRanges); + uint64_t CalculateNextGap(std::span BlockRanges); + std::optional> CalculateBlockRanges(uint32_t BlockIndex, + const ChunkBlockDescription& BlockDescription, + std::span BlockChunkIndexNeeded, + EPartialBlockDownloadMode PartialBlockDownloadMode, + const uint64_t ChunkStartOffsetInBlock, + const uint64_t TotalBlockSize, + uint64_t& OutTotalWantedChunksSize); + + OperationLogOutput& m_LogOutput; + const std::span m_BlockDescriptions; + const Options m_Options; +}; + +#if ZEN_WITH_TESTS + +class CbWriter; void chunkblock_forcelink(); +#endif // ZEN_WITH_TESTS } // namespace zen -- cgit v1.2.3