diff options
Diffstat (limited to 'src/zenremotestore/builds/buildstorageoperations.cpp')
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 889 |
1 files changed, 254 insertions, 635 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index ade431393..4f1b07c37 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -579,13 +579,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) CreateDirectories(m_TempDownloadFolderPath); CreateDirectories(m_TempBlockFolderPath); - Stopwatch IndexTimer; - - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs())); - } - Stopwatch CacheMappingTimer; std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size()); @@ -906,343 +899,213 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) CheckRequiredDiskSpace(RemotePathToRemoteIndex); + BlobsExistsResult ExistsResult; { - ZEN_TRACE_CPU("WriteChunks"); + ChunkBlockAnalyser BlockAnalyser(m_LogOutput, + m_BlockDescriptions, + ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, .IsVerbose = m_Options.IsVerbose}); - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount); - - Stopwatch WriteTimer; - - FilteredRate FilteredDownloadedBytesPerSecond; - FilteredRate FilteredWrittenBytesPerSecond; + std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = BlockAnalyser.GetNeeded( + m_RemoteLookup.ChunkHashToChunkIndex, + [&](uint32_t RemoteChunkIndex) -> bool { return RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]; }); - std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr( - m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing")); - OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + std::vector<uint32_t> FetchBlockIndexes; + std::vector<uint32_t> CachedChunkBlockIndexes; - struct LooseChunkHashWorkData { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; - uint32_t RemoteChunkIndex = (uint32_t)-1; - }; - - std::vector<LooseChunkHashWorkData> LooseChunkHashWorks; - TotalPartWriteCount += CopyChunkDatas.size(); - TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); - - for (const IoHash ChunkHash : m_LooseChunkHashes) - { - auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); - const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; - if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash); - } - continue; - } - bool NeedsCopy = true; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) + ZEN_TRACE_CPU("BlockCacheFileExists"); + for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks) { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - - if (ChunkTargetPtrs.empty()) + if (m_Options.PrimeCacheOnly) { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash); - } + FetchBlockIndexes.push_back(NeededBlock.BlockIndex); } else { - TotalRequestCount++; - TotalPartWriteCount++; - LooseChunkHashWorks.push_back( - LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); - } - } - } - - uint32_t BlockCount = gsl::narrow<uint32_t>(m_BlockDescriptions.size()); - - std::vector<bool> ChunkIsPickedUpByBlock(m_RemoteContent.ChunkedContent.ChunkHashes.size(), false); - auto GetNeededChunkBlockIndexes = [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &ChunkIsPickedUpByBlock]( - const ChunkBlockDescription& BlockDescription) { - ZEN_TRACE_CPU("GetNeededChunkBlockIndexes"); - std::vector<uint32_t> NeededBlockChunkIndexes; - for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) - { - const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; - if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t RemoteChunkIndex = It->second; - if (!ChunkIsPickedUpByBlock[RemoteChunkIndex]) + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; + bool UsingCachedBlock = false; + if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) { - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) + TotalPartWriteCount++; + + std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); + if (IsFile(BlockPath)) { - ChunkIsPickedUpByBlock[RemoteChunkIndex] = true; - NeededBlockChunkIndexes.push_back(ChunkBlockIndex); + CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex); + UsingCachedBlock = true; } } - } - else - { - ZEN_DEBUG("Chunk {} not found in block {}", ChunkHash, BlockDescription.BlockHash); + if (!UsingCachedBlock) + { + FetchBlockIndexes.push_back(NeededBlock.BlockIndex); + } } } - return NeededBlockChunkIndexes; - }; + } - std::vector<uint32_t> CachedChunkBlockIndexes; - std::vector<uint32_t> FetchBlockIndexes; - std::vector<std::vector<uint32_t>> AllBlockChunkIndexNeeded; + std::vector<uint32_t> NeededLooseChunkIndexes; - for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - - std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); - if (!BlockChunkIndexNeeded.empty()) + NeededLooseChunkIndexes.reserve(m_LooseChunkHashes.size()); + for (uint32_t LooseChunkIndex = 0; LooseChunkIndex < m_LooseChunkHashes.size(); LooseChunkIndex++) { - if (m_Options.PrimeCacheOnly) + const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; + auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); + const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; + + if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { - FetchBlockIndexes.push_back(BlockIndex); + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Skipping chunk {} due to cache reuse", + m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); + } + continue; } - else + + bool NeedsCopy = true; + if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) { - bool UsingCachedBlock = false; - if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) + uint64_t WriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + if (WriteCount == 0) { - TotalPartWriteCount++; - - std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - if (IsFile(BlockPath)) + if (m_Options.IsVerbose) { - CachedChunkBlockIndexes.push_back(BlockIndex); - UsingCachedBlock = true; + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Skipping chunk {} due to cache reuse", + m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); } } - if (!UsingCachedBlock) + else { - FetchBlockIndexes.push_back(BlockIndex); + NeededLooseChunkIndexes.push_back(LooseChunkIndex); } } } - AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded)); } - BlobsExistsResult ExistsResult; - if (m_Storage.BuildCacheStorage) { ZEN_TRACE_CPU("BlobCacheExistCheck"); Stopwatch Timer; - tsl::robin_set<IoHash> BlobHashesSet; + std::vector<IoHash> BlobHashes; + BlobHashes.reserve(NeededLooseChunkIndexes.size() + FetchBlockIndexes.size()); - BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size()); - for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks) + for (const uint32_t LooseChunkIndex : NeededLooseChunkIndexes) { - BlobHashesSet.insert(m_RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]); + BlobHashes.push_back(m_LooseChunkHashes[LooseChunkIndex]); } + for (uint32_t BlockIndex : FetchBlockIndexes) { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - BlobHashesSet.insert(BlockDescription.BlockHash); + BlobHashes.push_back(m_BlockDescriptions[BlockIndex].BlockHash); } - if (!BlobHashesSet.empty()) - { - const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end()); - const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = - m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); + const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = + m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); - if (CacheExistsResult.size() == BlobHashes.size()) + if (CacheExistsResult.size() == BlobHashes.size()) + { + ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); + for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) { - ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); - for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) + if (CacheExistsResult[BlobIndex].HasBody) { - if (CacheExistsResult[BlobIndex].HasBody) - { - ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); - } + ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); } } - ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); - if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Remote cache : Found {} out of {} needed blobs in {}", - ExistsResult.ExistingBlobs.size(), - BlobHashes.size(), - NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); - } + } + ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Remote cache : Found {} out of {} needed blobs in {}", + ExistsResult.ExistingBlobs.size(), + BlobHashes.size(), + NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); } } - std::vector<BlockRangeDescriptor> BlockRangeWorks; - std::vector<uint32_t> FullBlockWorks; + std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> BlockPartialDownloadModes; + if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off) { - Stopwatch Timer; - - std::vector<uint32_t> PartialBlockIndexes; - - for (uint32_t BlockIndex : FetchBlockIndexes) + BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + } + else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All) + { + BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::On); + } + else + { + BlockPartialDownloadModes.reserve(m_BlockDescriptions.size()); + for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - - const std::vector<uint32_t> BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]); - if (!BlockChunkIndexNeeded.empty()) + const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash); + if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly) { - bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); - bool CanDoPartialBlockDownload = - (BlockDescription.HeaderSize > 0) && - (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); - - bool AllowedToDoPartialRequest = false; - bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); - switch (m_Options.PartialBlockRequestMode) - { - case EPartialBlockRequestMode::Off: - break; - case EPartialBlockRequestMode::ZenCacheOnly: - AllowedToDoPartialRequest = BlockExistInCache; - break; - case EPartialBlockRequestMode::Mixed: - case EPartialBlockRequestMode::All: - AllowedToDoPartialRequest = true; - break; - default: - ZEN_ASSERT(false); - break; - } + BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::On + : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + } + else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed) + { + BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::On + : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange); + } + } + } + ZEN_ASSERT(BlockPartialDownloadModes.size() == m_BlockDescriptions.size()); - const uint32_t ChunkStartOffsetInBlock = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + ChunkBlockAnalyser::BlockResult PartialBlocks = + BlockAnalyser.CalculatePartialBlockDownloads(NeededBlocks, BlockPartialDownloadModes); - const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(ChunkStartOffsetInBlock)); + struct LooseChunkHashWorkData + { + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; + uint32_t RemoteChunkIndex = (uint32_t)-1; + }; - if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) - { - ZEN_TRACE_CPU("PartialBlockAnalysis"); - - bool LimitToSingleRange = - BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed; - uint64_t TotalWantedChunksSize = 0; - std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = - CalculateBlockRanges(BlockIndex, - BlockDescription, - BlockChunkIndexNeeded, - LimitToSingleRange, - ChunkStartOffsetInBlock, - TotalBlockSize, - TotalWantedChunksSize); - ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize); - - if (MaybeBlockRanges.has_value()) - { - const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value(); - ZEN_ASSERT(!BlockRanges.empty()); - BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end()); - TotalRequestCount += BlockRanges.size(); - TotalPartWriteCount += BlockRanges.size(); - - uint64_t RequestedSize = std::accumulate( - BlockRanges.begin(), - BlockRanges.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - PartialBlockIndexes.push_back(BlockIndex); - - if (RequestedSize > TotalWantedChunksSize) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO( - m_LogOutput, - "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})", - BlockChunkIndexNeeded.size(), - NiceBytes(RequestedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - BlockRanges.size(), - NiceBytes(RequestedSize - TotalWantedChunksSize)); - } - } - } - else - { - FullBlockWorks.push_back(BlockIndex); - TotalRequestCount++; - TotalPartWriteCount++; - } - } - else - { - FullBlockWorks.push_back(BlockIndex); - TotalRequestCount++; - TotalPartWriteCount++; - } - } - } + TotalRequestCount += NeededLooseChunkIndexes.size(); + TotalPartWriteCount += NeededLooseChunkIndexes.size(); + TotalRequestCount += PartialBlocks.BlockRanges.size(); + TotalPartWriteCount += PartialBlocks.BlockRanges.size(); + TotalRequestCount += PartialBlocks.FullBlockIndexes.size(); + TotalPartWriteCount += PartialBlocks.FullBlockIndexes.size(); - if (!PartialBlockIndexes.empty()) - { - uint64_t TotalFullBlockRequestBytes = 0; - for (uint32_t BlockIndex : FullBlockWorks) - { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - uint32_t CurrentOffset = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + std::vector<LooseChunkHashWorkData> LooseChunkHashWorks; + for (uint32_t LooseChunkIndex : NeededLooseChunkIndexes) + { + const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; + auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); + const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; - TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(CurrentOffset)); - } + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - uint64_t TotalPartialBlockBytes = 0; - for (uint32_t BlockIndex : PartialBlockIndexes) - { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - uint32_t CurrentOffset = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + ZEN_ASSERT(!ChunkTargetPtrs.empty()); + LooseChunkHashWorks.push_back( + LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); + } - TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(CurrentOffset)); - } + ZEN_TRACE_CPU("WriteChunks"); + + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount); - uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes; + Stopwatch WriteTimer; - const uint64_t TotalPartialBlockRequestBytes = - std::accumulate(BlockRangeWorks.begin(), - BlockRangeWorks.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size(); + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredWrittenBytesPerSecond; - uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes; - double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes; + std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr( + m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing")); + OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Analysis of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra " - "requests. Completed in {}", - NiceBytes(TotalSavedBlocksSize), - NiceBytes(NonPartialTotalBlockBytes), - SavedSizePercent, - TotalExtraPartialBlocksRequests, - NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); - } - } - } + TotalPartWriteCount += CopyChunkDatas.size(); + TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); BufferedWriteFileCache WriteCache; @@ -1472,13 +1335,23 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) }); } - for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) + for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();) { ZEN_ASSERT(!m_Options.PrimeCacheOnly); if (m_AbortFlag) { break; } + + size_t RangeCount = 1; + size_t RangesLeft = PartialBlocks.BlockRanges.size() - BlockRangeIndex; + const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocks.BlockRanges[BlockRangeIndex]; + while (RangeCount < RangesLeft && + CurrentBlockRange.BlockIndex == PartialBlocks.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) + { + RangeCount++; + } + Work.ScheduleWork( m_NetworkPool, [this, @@ -1492,119 +1365,127 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) TotalPartWriteCount, &FilteredWrittenBytesPerSecond, &Work, - &BlockRangeWorks, - BlockRangeIndex](std::atomic<bool>&) { + &PartialBlocks, + BlockRangeStartIndex = BlockRangeIndex, + RangeCount](std::atomic<bool>&) { if (!m_AbortFlag) { - ZEN_TRACE_CPU("Async_GetPartialBlock"); - - const BlockRangeDescriptor& BlockRange = BlockRangeWorks[BlockRangeIndex]; + ZEN_TRACE_CPU("Async_GetPartialBlockRanges"); FilteredDownloadedBytesPerSecond.Start(); - DownloadPartialBlock( - BlockRange, - ExistsResult, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalRequestCount, - TotalPartWriteCount, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - if (!m_AbortFlag) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - &BlockRange, - BlockChunkPath = std::filesystem::path(OnDiskPath), - BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WritePartialBlock"); + for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + RangeCount; + BlockRangeIndex++) + { + ZEN_TRACE_CPU("GetPartialBlock"); - const uint32_t BlockIndex = BlockRange.BlockIndex; + const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = PartialBlocks.BlockRanges[BlockRangeIndex]; - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + DownloadPartialBlock( + BlockRange, + ExistsResult, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WritePartsComplete, + &WriteCache, + &Work, + TotalRequestCount, + TotalPartWriteCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockPartialBuffer); - } - else + if (!m_AbortFlag) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WritePartsComplete, + &WriteCache, + &Work, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + &BlockRange, + BlockChunkPath = std::filesystem::path(OnDiskPath), + BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable { + if (!m_AbortFlag) { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) + ZEN_TRACE_CPU("Async_WritePartialBlock"); + + const uint32_t BlockIndex = BlockRange.BlockIndex; + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + if (BlockChunkPath.empty()) { - throw std::runtime_error( - fmt::format("Could not open downloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) + { + throw std::runtime_error( + fmt::format("Could not open downloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } } - } - - FilteredWrittenBytesPerSecond.Start(); - if (!WritePartialBlockChunksToCache( - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - CompositeBuffer(std::move(BlockPartialBuffer)), - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); - } + FilteredWrittenBytesPerSecond.Start(); + + if (!WritePartialBlockChunksToCache( + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + CompositeBuffer(std::move(BlockPartialBuffer)), + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } - std::error_code Ec = TryRemoveFile(BlockChunkPath); - if (Ec) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - BlockChunkPath, - Ec.value(), - Ec.message()); - } + std::error_code Ec = TryRemoveFile(BlockChunkPath); + if (Ec) + { + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, + "Failed removing file '{}', reason: ({}) {}", + BlockChunkPath, + Ec.value(), + Ec.message()); + } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - } - }, - OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog - : WorkerThreadPool::EMode::EnableBacklog); - } - }); + }, + OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog + : WorkerThreadPool::EMode::EnableBacklog); + } + }); + } } }); + BlockRangeIndex += RangeCount; } - for (uint32_t BlockIndex : FullBlockWorks) + for (uint32_t BlockIndex : PartialBlocks.FullBlockIndexes) { if (m_AbortFlag) { @@ -3289,271 +3170,9 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde } } -BuildsOperationUpdateFolder::BlockRangeDescriptor -BuildsOperationUpdateFolder::MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges) -{ - ZEN_ASSERT(Ranges.size() > 1); - const BlockRangeDescriptor& First = Ranges.front(); - const BlockRangeDescriptor& Last = Ranges.back(); - - return BlockRangeDescriptor{.BlockIndex = First.BlockIndex, - .RangeStart = First.RangeStart, - .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart, - .ChunkBlockIndexStart = First.ChunkBlockIndexStart, - .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart}; -} - -std::optional<std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>> -BuildsOperationUpdateFolder::MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range) -{ - if (Range.RangeLength == TotalBlockSize) - { - return {}; - } - else - { - return std::vector<BlockRangeDescriptor>{Range}; - } -}; - -const BuildsOperationUpdateFolder::BlockRangeLimit* -BuildsOperationUpdateFolder::GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits, - uint64_t TotalBlockSize, - std::span<const BlockRangeDescriptor> Ranges) -{ - if (Ranges.size() > 1) - { - const std::uint64_t WantedSize = - std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { - return Current + Range.RangeLength; - }); - - const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize; - - for (const BlockRangeLimit& Limit : Limits) - { - if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount) - { - return &Limit; - } - } - } - return nullptr; -}; - -std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor> -BuildsOperationUpdateFolder::CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges) -{ - ZEN_ASSERT(BlockRanges.size() > 1); - std::vector<BlockRangeDescriptor> CollapsedBlockRanges; - - auto BlockRangesIt = BlockRanges.begin(); - CollapsedBlockRanges.push_back(*BlockRangesIt++); - for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++) - { - BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); - - const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength; - - const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); - if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap)) - { - LastRange.ChunkBlockIndexCount = - (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; - LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart; - } - else - { - CollapsedBlockRanges.push_back(*BlockRangesIt); - } - } - - return CollapsedBlockRanges; -}; - -uint64_t -BuildsOperationUpdateFolder::CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges) -{ - ZEN_ASSERT(BlockRanges.size() > 1); - uint64_t AcceptableGap = (uint64_t)-1; - for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++) - { - const BlockRangeDescriptor& Range = BlockRanges[RangeIndex]; - const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1]; - - const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength); - AcceptableGap = Min(Gap, AcceptableGap); - } - AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u); - return AcceptableGap; -}; - -std::optional<std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>> -BuildsOperationUpdateFolder::CalculateBlockRanges(uint32_t BlockIndex, - const ChunkBlockDescription& BlockDescription, - std::span<const uint32_t> BlockChunkIndexNeeded, - bool LimitToSingleRange, - const uint64_t ChunkStartOffsetInBlock, - const uint64_t TotalBlockSize, - uint64_t& OutTotalWantedChunksSize) -{ - ZEN_TRACE_CPU("CalculateBlockRanges"); - - std::vector<BlockRangeDescriptor> BlockRanges; - { - uint64_t CurrentOffset = ChunkStartOffsetInBlock; - uint32_t ChunkBlockIndex = 0; - uint32_t NeedBlockChunkIndexOffset = 0; - BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; - while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) - { - const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; - if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) - { - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - NextRange = {.BlockIndex = BlockIndex}; - } - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - } - else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) - { - if (NextRange.RangeLength == 0) - { - NextRange.RangeStart = CurrentOffset; - NextRange.ChunkBlockIndexStart = ChunkBlockIndex; - } - NextRange.RangeLength += ChunkCompressedLength; - NextRange.ChunkBlockIndexCount++; - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - NeedBlockChunkIndexOffset++; - } - else - { - ZEN_ASSERT(false); - } - } - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - } - } - ZEN_ASSERT(!BlockRanges.empty()); - - OutTotalWantedChunksSize = - std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { - return Current + Range.RangeLength; - }); - - double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize; - - if (BlockRanges.size() == 1) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Range request of {} ({:.2f}%) using single range from block {} ({}) as is", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize)); - } - return BlockRanges; - } - - if (LimitToSingleRange) - { - const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); - if (m_Options.IsVerbose) - { - const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; - const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; - - ZEN_OPERATION_LOG_INFO( - m_LogOutput, - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting " - "{:.2f}% ({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - NiceBytes(MergedRange.RangeLength), - RangeRequestedPercent, - WastedPercent, - NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); - } - return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); - } - - if (RangeWantedPercent > FullBlockRangePercentLimit) - { - const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); - if (m_Options.IsVerbose) - { - const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; - const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; - - ZEN_OPERATION_LOG_INFO( - m_LogOutput, - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} " - "({:.2f}%) wasting {:.2f}% ({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - FullBlockRangePercentLimit, - NiceBytes(MergedRange.RangeLength), - RangeRequestedPercent, - WastedPercent, - NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); - } - return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); - } - - std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges); - while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges)) - { - CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges); - } - - const std::uint64_t WantedCollapsedSize = - std::accumulate(CollapsedBlockRanges.begin(), - CollapsedBlockRanges.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - - const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize; - - if (m_Options.IsVerbose) - { - const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize; - - ZEN_OPERATION_LOG_INFO( - m_LogOutput, - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% " - "({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - NiceBytes(WantedCollapsedSize), - CollapsedRangeRequestedPercent, - CollapsedBlockRanges.size(), - WastedPercent, - NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize)); - } - return CollapsedBlockRanges; -} - void BuildsOperationUpdateFolder::DownloadPartialBlock( - const BlockRangeDescriptor BlockRange, + const ChunkBlockAnalyser::BlockRangeDescriptor BlockRange, const BlobsExistsResult& ExistsResult, std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath)>&& OnDownloaded) { |