diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-16 09:55:35 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-16 09:55:35 +0200 |
| commit | f426eab5d5a361bb4661b814ceb7ed97113130cd (patch) | |
| tree | d273892e7d86871eabe5c85fac6f0f7b20a6bf70 | |
| parent | move builds state functions to buildsavedstate.h/cpp (#577) (diff) | |
| download | zen-f426eab5d5a361bb4661b814ceb7ed97113130cd.tar.xz zen-f426eab5d5a361bb4661b814ceb7ed97113130cd.zip | |
refactor builds cmd part4 (#579)
* move lambdas to class functions
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 1428 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h | 116 |
2 files changed, 804 insertions, 740 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 469d29e0d..ef180935e 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -195,7 +195,8 @@ namespace { const std::uint64_t PreferredMultipartChunkSize, ParallelWork& Work, WorkerThreadPool& NetworkPool, - DownloadStatistics& DownloadStats, + std::atomic<uint64_t>& DownloadedChunkByteCount, + std::atomic<uint64_t>& MultipartAttachmentCount, std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete) { ZEN_TRACE_CPU("DownloadLargeBlob"); @@ -217,8 +218,8 @@ namespace { BuildId, ChunkHash, PreferredMultipartChunkSize, - [&Work, Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) { - DownloadStats.DownloadedChunkByteCount += Chunk.GetSize(); + [&Work, Workload, &DownloadedChunkByteCount](uint64_t Offset, const IoBuffer& Chunk) { + DownloadedChunkByteCount += Chunk.GetSize(); if (!Work.IsAborted()) { @@ -226,8 +227,7 @@ namespace { Workload->TempFile.Write(Chunk.GetView(), Offset); } }, - [&Work, Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() { - DownloadStats.DownloadedChunkCount++; + [&Work, Workload, &DownloadedChunkByteCount, OnDownloadComplete = std::move(OnDownloadComplete)]() { if (!Work.IsAborted()) { ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete"); @@ -242,7 +242,7 @@ namespace { }); if (!WorkItems.empty()) { - DownloadStats.MultipartAttachmentCount++; + MultipartAttachmentCount++; } for (auto& WorkItem : WorkItems) { @@ -344,293 +344,6 @@ namespace { } // namespace -struct BlockRangeDescriptor -{ - uint32_t BlockIndex = (uint32_t)-1; - uint64_t RangeStart = 0; - uint64_t RangeLength = 0; - uint32_t ChunkBlockIndexStart = 0; - uint32_t ChunkBlockIndexCount = 0; -}; - -BlockRangeDescriptor -MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges) -{ - ZEN_ASSERT(Ranges.size() > 1); - const BlockRangeDescriptor& First = Ranges.front(); - const BlockRangeDescriptor& Last = Ranges.back(); - - return BlockRangeDescriptor{.BlockIndex = First.BlockIndex, - .RangeStart = First.RangeStart, - .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart, - .ChunkBlockIndexStart = First.ChunkBlockIndexStart, - .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart}; -} - -std::optional<std::vector<BlockRangeDescriptor>> -MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range) -{ - if (Range.RangeLength == TotalBlockSize) - { - return {}; - } - else - { - return std::vector<BlockRangeDescriptor>{Range}; - } -}; - -struct BlockRangeLimit -{ - uint16_t SizePercent; - uint16_t MaxRangeCount; -}; - -static const uint16_t FullBlockRangePercentLimit = 95; - -const std::vector<BlockRangeLimit> ForceMergeLimits = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1}, - {.SizePercent = 90, .MaxRangeCount = 2}, - {.SizePercent = 85, .MaxRangeCount = 8}, - {.SizePercent = 80, .MaxRangeCount = 16}, - {.SizePercent = 70, .MaxRangeCount = 32}, - {.SizePercent = 60, .MaxRangeCount = 48}, - {.SizePercent = 2, .MaxRangeCount = 56}, - {.SizePercent = 0, .MaxRangeCount = 64}}; - -const BlockRangeLimit* -GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits, uint64_t TotalBlockSize, std::span<const BlockRangeDescriptor> Ranges) -{ - if (Ranges.size() > 1) - { - const std::uint64_t WantedSize = - std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { - return Current + Range.RangeLength; - }); - - const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize; - - for (const BlockRangeLimit& Limit : Limits) - { - if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount) - { - return &Limit; - } - } - } - return nullptr; -}; - -std::vector<BlockRangeDescriptor> -CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges) -{ - ZEN_ASSERT(BlockRanges.size() > 1); - std::vector<BlockRangeDescriptor> CollapsedBlockRanges; - - auto BlockRangesIt = BlockRanges.begin(); - CollapsedBlockRanges.push_back(*BlockRangesIt++); - for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++) - { - BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); - - const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength; - - const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); - if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap)) - { - LastRange.ChunkBlockIndexCount = - (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; - LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart; - } - else - { - CollapsedBlockRanges.push_back(*BlockRangesIt); - } - } - - return CollapsedBlockRanges; -}; - -uint64_t -CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges) -{ - ZEN_ASSERT(BlockRanges.size() > 1); - uint64_t AcceptableGap = (uint64_t)-1; - for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++) - { - const BlockRangeDescriptor& Range = BlockRanges[RangeIndex]; - const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1]; - - const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength); - AcceptableGap = Min(Gap, AcceptableGap); - } - AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u); - return AcceptableGap; -}; - -std::optional<std::vector<BlockRangeDescriptor>> -CalculateBlockRanges(BuildOpLogOutput& LogOutput, - bool IsVerbose, - uint32_t BlockIndex, - const ChunkBlockDescription& BlockDescription, - std::span<const uint32_t> BlockChunkIndexNeeded, - bool LimitToSingleRange, - const uint64_t ChunkStartOffsetInBlock, - const uint64_t TotalBlockSize, - uint64_t& OutTotalWantedChunksSize) -{ - ZEN_TRACE_CPU("CalculateBlockRanges"); - - std::vector<BlockRangeDescriptor> BlockRanges; - { - uint64_t CurrentOffset = ChunkStartOffsetInBlock; - uint32_t ChunkBlockIndex = 0; - uint32_t NeedBlockChunkIndexOffset = 0; - BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; - while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) - { - const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; - if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) - { - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - NextRange = {.BlockIndex = BlockIndex}; - } - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - } - else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) - { - if (NextRange.RangeLength == 0) - { - NextRange.RangeStart = CurrentOffset; - NextRange.ChunkBlockIndexStart = ChunkBlockIndex; - } - NextRange.RangeLength += ChunkCompressedLength; - NextRange.ChunkBlockIndexCount++; - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - NeedBlockChunkIndexOffset++; - } - else - { - ZEN_ASSERT(false); - } - } - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - } - } - ZEN_ASSERT(!BlockRanges.empty()); - - OutTotalWantedChunksSize = - std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { - return Current + Range.RangeLength; - }); - - double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize; - - if (BlockRanges.size() == 1) - { - if (IsVerbose) - { - LOG_OUTPUT(LogOutput, - "Range request of {} ({:.2f}%) using single range from block {} ({}) as is", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize)); - } - return BlockRanges; - } - - if (LimitToSingleRange) - { - const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); - if (IsVerbose) - { - const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; - const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; - - LOG_OUTPUT( - LogOutput, - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting " - "{:.2f}% ({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - NiceBytes(MergedRange.RangeLength), - RangeRequestedPercent, - WastedPercent, - NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); - } - return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); - } - - if (RangeWantedPercent > FullBlockRangePercentLimit) - { - const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); - if (IsVerbose) - { - const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; - const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; - - LOG_OUTPUT(LogOutput, - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} " - "({:.2f}%) wasting {:.2f}% ({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - FullBlockRangePercentLimit, - NiceBytes(MergedRange.RangeLength), - RangeRequestedPercent, - WastedPercent, - NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); - } - return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); - } - - std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges); - while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges)) - { - CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges); - } - - const std::uint64_t WantedCollapsedSize = - std::accumulate(CollapsedBlockRanges.begin(), - CollapsedBlockRanges.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - - const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize; - - if (IsVerbose) - { - const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize; - - LOG_OUTPUT( - LogOutput, - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% " - "({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - NiceBytes(WantedCollapsedSize), - CollapsedRangeRequestedPercent, - CollapsedBlockRanges.size(), - WastedPercent, - NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize)); - } - return CollapsedBlockRanges; -} - bool IsSingleFileChunk(const ChunkedFolderContent& RemoteContent, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) @@ -1517,12 +1230,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded)); } - struct BlobsExistsResult - { - tsl::robin_set<IoHash> ExistingBlobs; - uint64_t ElapsedTimeMs = 0; - }; - BlobsExistsResult ExistsResult; if (m_Storage.BuildCacheStorage) @@ -1622,9 +1329,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) bool LimitToSingleRange = BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed; uint64_t TotalWantedChunksSize = 0; - std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = CalculateBlockRanges(m_LogOutput, - m_Options.IsQuiet, - BlockIndex, + std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = CalculateBlockRanges(BlockIndex, BlockDescription, BlockChunkIndexNeeded, LimitToSingleRange, @@ -1780,274 +1485,47 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) break; } - LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; - - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = std::move(LooseChunkHashWork.ChunkTargetPtrs); - const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; - - if (m_Options.PrimeCacheOnly && - ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) + if (m_Options.PrimeCacheOnly) { - m_DownloadStats.RequestsCompleteCount++; - continue; + const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; + if (ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) + { + m_DownloadStats.RequestsCompleteCount++; + continue; + } } - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &ExistsResult, - &WritePartsComplete, - RemoteChunkIndex, - ChunkTargetPtrs, - TotalRequestCount, - TotalPartWriteCount, - &WriteCache, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk"); - - std::filesystem::path ExistingCompressedChunkPath; - if (!m_Options.PrimeCacheOnly) - { - const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); - if (!ExistingCompressedChunkPath.empty()) - { - m_DownloadStats.RequestsCompleteCount++; - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - } - } - if (!m_AbortFlag) - - { - if (!ExistingCompressedChunkPath.empty()) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs, - CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("Async_WritePreDownloadedChunk"); - - FilteredWrittenBytesPerSecond.Start(); - - const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - - IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) - { - throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", - ChunkHash, - CompressedChunkPath)); - } - - bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, - ChunkTargetPtrs, - WriteCache, - std::move(CompressedPart)); - WritePartsComplete++; - - if (!AbortFlag) - { - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - - std::error_code Ec = TryRemoveFile(CompressedChunkPath); - if (Ec) - { - LOG_OUTPUT_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - CompressedChunkPath, - Ec.value(), - Ec.message()); - } - - std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); - WriteCache.Close(CompletedSequences); - if (NeedHashVerify) - { - VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); - } - else - { - FinalizeChunkSequences(CompletedSequences); - } - } - } - }); - } - else - { - Work.ScheduleWork( - m_NetworkPool, - [this, - &ExistsResult, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &WritePartsComplete, - TotalPartWriteCount, - TotalRequestCount, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_DownloadChunk"); - - const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BuildBlob; - const bool ExistsInCache = - m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); - if (ExistsInCache) - { - BuildBlob = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, ChunkHash); - } - if (BuildBlob) - { - uint64_t BlobSize = BuildBlob.GetSize(); - m_DownloadStats.DownloadedChunkCount++; - m_DownloadStats.DownloadedChunkByteCount += BlobSize; - m_DownloadStats.RequestsCompleteCount++; - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(m_Options.ZenFolderPath, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - WriteCache, - Work, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond); - } - else - { - if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= - m_Options.LargeAttachmentSize) - { - DownloadLargeBlob(*m_Storage.BuildStorage, - m_TempDownloadFolderPath, - m_BuildId, - ChunkHash, - m_Options.PreferredMultipartChunkSize, - Work, - m_NetworkPool, - m_DownloadStats, - [this, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - ChunkHash, - TotalPartWriteCount, - TotalRequestCount, - &WritePartsComplete, - &FilteredWrittenBytesPerSecond, - &FilteredDownloadedBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs](IoBuffer&& Payload) mutable { - m_DownloadStats.RequestsCompleteCount++; - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (Payload && m_Storage.BuildCacheStorage) - { - m_Storage.BuildCacheStorage->PutBuildBlob( - m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } - if (!m_Options.PrimeCacheOnly) - { - if (!m_AbortFlag) - { - AsyncWriteDownloadedChunk( - m_Options.ZenFolderPath, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - WriteCache, - Work, - std::move(Payload), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond); - } - } - }); - } - else - { - BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash); - if (BuildBlob && m_Storage.BuildCacheStorage) - { - m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BuildBlob))); - } - if (!BuildBlob) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); - } - if (!m_Options.PrimeCacheOnly) - { - if (!m_AbortFlag) - { - uint64_t BlobSize = BuildBlob.GetSize(); - m_DownloadStats.DownloadedChunkCount++; - m_DownloadStats.DownloadedChunkByteCount += BlobSize; - m_DownloadStats.RequestsCompleteCount++; - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(m_Options.ZenFolderPath, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - WriteCache, - Work, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond); - } - } - } - } - } - }); - } - } - } - }); + Work.ScheduleWork(m_IOWorkerPool, + [this, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &ExistsResult, + &WritePartsComplete, + &LooseChunkHashWorks, + LooseChunkHashWorkIndex, + TotalRequestCount, + TotalPartWriteCount, + &WriteCache, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { + ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk"); + if (!m_AbortFlag) + { + LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; + const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; + WriteLooseChunk(RemoteChunkIndex, + ExistsResult, + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + std::move(LooseChunkHashWork.ChunkTargetPtrs), + WriteCache, + Work, + TotalRequestCount, + TotalPartWriteCount, + FilteredDownloadedBytesPerSecond, + FilteredWrittenBytesPerSecond); + } + }); } std::unique_ptr<CloneQueryInterface> CloneQuery; @@ -2191,9 +1669,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { break; } - const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex]; - ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1); - const uint32_t BlockIndex = BlockRange.BlockIndex; Work.ScheduleWork( m_NetworkPool, [this, @@ -2207,168 +1682,109 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) TotalPartWriteCount, &FilteredWrittenBytesPerSecond, &Work, - BlockIndex, - BlockRange](std::atomic<bool>&) { + &BlockRangeWorks, + BlockRangeIndex](std::atomic<bool>&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_GetPartialBlock"); - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer; - if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) - { - BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - } - if (!BlockBuffer) - { - BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - } - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeStart + BlockRange.RangeLength)); - } - if (!m_AbortFlag) - { - uint64_t BlockSize = BlockBuffer.GetSize(); - m_DownloadStats.DownloadedBlockCount++; - m_DownloadStats.DownloadedBlockByteCount += BlockSize; - m_DownloadStats.RequestsCompleteCount++; - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) + const BlockRangeDescriptor& BlockRange = BlockRangeWorks[BlockRangeIndex]; + + DownloadPartialBlock( + BlockRange, + ExistsResult, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WritePartsComplete, + &WriteCache, + &Work, + TotalRequestCount, + TotalPartWriteCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) { - ZEN_TRACE_CPU("MoveTempPartialBlock"); - - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{:x}_{:x}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); - } - } + FilteredDownloadedBytesPerSecond.Stop(); } - } - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("WriteTempPartialBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = - m_TempBlockFolderPath / - fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } + if (!m_AbortFlag) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WritePartsComplete, + &WriteCache, + &Work, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + &BlockRange, + BlockChunkPath = std::move(OnDiskPath), + BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WritePartialBlock"); - if (!m_AbortFlag) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - BlockIndex, - BlockRange, - BlockChunkPath, - BlockPartialBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WritePartialBlock"); + const uint32_t BlockIndex = BlockRange.BlockIndex; - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockPartialBuffer); - } - else - { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) + if (BlockChunkPath.empty()) { - throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) + { + throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } } - } - FilteredWrittenBytesPerSecond.Start(); - - if (!WritePartialBlockChunksToCache( - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - CompositeBuffer(std::move(BlockPartialBuffer)), - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); - } + FilteredWrittenBytesPerSecond.Start(); - std::error_code Ec = TryRemoveFile(BlockChunkPath); - if (Ec) - { - LOG_OUTPUT_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - BlockChunkPath, - Ec.value(), - Ec.message()); - } + if (!WritePartialBlockChunksToCache( + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + CompositeBuffer(std::move(BlockPartialBuffer)), + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + std::error_code Ec = TryRemoveFile(BlockChunkPath); + if (Ec) + { + LOG_OUTPUT_DEBUG(m_LogOutput, + "Failed removing file '{}', reason: ({}) {}", + BlockChunkPath, + Ec.value(), + Ec.message()); + } + + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - } - }); - } - } + }); + } + }); } }); } @@ -3716,6 +3132,580 @@ BuildsOperationUpdateFolder::WriteScavengedSequenceToCache(const std::filesystem RenameFile(TempFilePath, CacheFilePath); } +void +BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::atomic<uint64_t>& WritePartsComplete, + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, + BufferedWriteFileCache& WriteCache, + ParallelWork& Work, + uint64_t TotalRequestCount, + uint64_t TotalPartWriteCount, + FilteredRate& FilteredDownloadedBytesPerSecond, + FilteredRate& FilteredWrittenBytesPerSecond) +{ + std::filesystem::path ExistingCompressedChunkPath; + if (!m_Options.PrimeCacheOnly) + { + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); + if (!ExistingCompressedChunkPath.empty()) + { + m_DownloadStats.RequestsCompleteCount++; + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + } + } + if (!m_AbortFlag) + { + if (!ExistingCompressedChunkPath.empty()) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs = std::move(ChunkTargetPtrs), + CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("Async_WritePreDownloadedChunk"); + + FilteredWrittenBytesPerSecond.Start(); + + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + + IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); + } + + bool NeedHashVerify = + WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); + WritePartsComplete++; + + if (!AbortFlag) + { + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + std::error_code Ec = TryRemoveFile(CompressedChunkPath); + if (Ec) + { + LOG_OUTPUT_DEBUG(m_LogOutput, + "Failed removing file '{}', reason: ({}) {}", + CompressedChunkPath, + Ec.value(), + Ec.message()); + } + + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + WriteCache.Close(CompletedSequences); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); + } + else + { + FinalizeChunkSequences(CompletedSequences); + } + } + } + }); + } + else + { + Work.ScheduleWork(m_NetworkPool, + [this, + &ExistsResult, + SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &WritePartsComplete, + TotalPartWriteCount, + TotalRequestCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs = std::move(ChunkTargetPtrs)](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_DownloadChunk"); + + FilteredDownloadedBytesPerSecond.Start(); + DownloadBuildBlob(RemoteChunkIndex, + ExistsResult, + Work, + [this, + &ExistsResult, + SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &WritePartsComplete, + TotalPartWriteCount, + TotalRequestCount, + RemoteChunkIndex, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable { + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(m_Options.ZenFolderPath, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + WriteCache, + Work, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond); + }); + } + }); + } + } +} + +void +BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, + ParallelWork& Work, + std::function<void(IoBuffer&& Payload)>&& OnDownloaded) +{ + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + // FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BuildBlob; + const bool ExistsInCache = m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + if (ExistsInCache) + { + BuildBlob = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, ChunkHash); + } + if (BuildBlob) + { + uint64_t BlobSize = BuildBlob.GetSize(); + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.DownloadedChunkByteCount += BlobSize; + m_DownloadStats.RequestsCompleteCount++; + OnDownloaded(std::move(BuildBlob)); + } + else + { + if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= m_Options.LargeAttachmentSize) + { + DownloadLargeBlob( + *m_Storage.BuildStorage, + m_TempDownloadFolderPath, + m_BuildId, + ChunkHash, + m_Options.PreferredMultipartChunkSize, + Work, + m_NetworkPool, + m_DownloadStats.DownloadedChunkByteCount, + m_DownloadStats.MultipartAttachmentCount, + [this, &Work, ChunkHash, RemoteChunkIndex, OnDownloaded = std::move(OnDownloaded)](IoBuffer&& Payload) mutable { + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.RequestsCompleteCount++; + + if (Payload && m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); + } + + OnDownloaded(std::move(Payload)); + }); + } + else + { + BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash); + if (BuildBlob && m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(BuildBlob))); + } + if (!BuildBlob) + { + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + if (!m_Options.PrimeCacheOnly) + { + if (!m_AbortFlag) + { + uint64_t BlobSize = BuildBlob.GetSize(); + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.DownloadedChunkByteCount += BlobSize; + m_DownloadStats.RequestsCompleteCount++; + + OnDownloaded(std::move(BuildBlob)); + } + } + } + } +} + +BuildsOperationUpdateFolder::BlockRangeDescriptor +BuildsOperationUpdateFolder::MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges) +{ + ZEN_ASSERT(Ranges.size() > 1); + const BlockRangeDescriptor& First = Ranges.front(); + const BlockRangeDescriptor& Last = Ranges.back(); + + return BlockRangeDescriptor{.BlockIndex = First.BlockIndex, + .RangeStart = First.RangeStart, + .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart, + .ChunkBlockIndexStart = First.ChunkBlockIndexStart, + .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart}; +} + +std::optional<std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>> +BuildsOperationUpdateFolder::MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range) +{ + if (Range.RangeLength == TotalBlockSize) + { + return {}; + } + else + { + return std::vector<BlockRangeDescriptor>{Range}; + } +}; + +const BuildsOperationUpdateFolder::BlockRangeLimit* +BuildsOperationUpdateFolder::GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits, + uint64_t TotalBlockSize, + std::span<const BlockRangeDescriptor> Ranges) +{ + if (Ranges.size() > 1) + { + const std::uint64_t WantedSize = + std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { + return Current + Range.RangeLength; + }); + + const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize; + + for (const BlockRangeLimit& Limit : Limits) + { + if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount) + { + return &Limit; + } + } + } + return nullptr; +}; + +std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor> +BuildsOperationUpdateFolder::CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges) +{ + ZEN_ASSERT(BlockRanges.size() > 1); + std::vector<BlockRangeDescriptor> CollapsedBlockRanges; + + auto BlockRangesIt = BlockRanges.begin(); + CollapsedBlockRanges.push_back(*BlockRangesIt++); + for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++) + { + BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); + + const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength; + + const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); + if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap)) + { + LastRange.ChunkBlockIndexCount = + (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; + LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart; + } + else + { + CollapsedBlockRanges.push_back(*BlockRangesIt); + } + } + + return CollapsedBlockRanges; +}; + +uint64_t +BuildsOperationUpdateFolder::CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges) +{ + ZEN_ASSERT(BlockRanges.size() > 1); + uint64_t AcceptableGap = (uint64_t)-1; + for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++) + { + const BlockRangeDescriptor& Range = BlockRanges[RangeIndex]; + const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1]; + + const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength); + AcceptableGap = Min(Gap, AcceptableGap); + } + AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u); + return AcceptableGap; +}; + +std::optional<std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>> +BuildsOperationUpdateFolder::CalculateBlockRanges(uint32_t BlockIndex, + const ChunkBlockDescription& BlockDescription, + std::span<const uint32_t> BlockChunkIndexNeeded, + bool LimitToSingleRange, + const uint64_t ChunkStartOffsetInBlock, + const uint64_t TotalBlockSize, + uint64_t& OutTotalWantedChunksSize) +{ + ZEN_TRACE_CPU("CalculateBlockRanges"); + + std::vector<BlockRangeDescriptor> BlockRanges; + { + uint64_t CurrentOffset = ChunkStartOffsetInBlock; + uint32_t ChunkBlockIndex = 0; + uint32_t NeedBlockChunkIndexOffset = 0; + BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; + while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) + { + const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + NextRange = {.BlockIndex = BlockIndex}; + } + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + } + else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + if (NextRange.RangeLength == 0) + { + NextRange.RangeStart = CurrentOffset; + NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + } + NextRange.RangeLength += ChunkCompressedLength; + NextRange.ChunkBlockIndexCount++; + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + NeedBlockChunkIndexOffset++; + } + else + { + ZEN_ASSERT(false); + } + } + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + } + } + ZEN_ASSERT(!BlockRanges.empty()); + + OutTotalWantedChunksSize = + std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { + return Current + Range.RangeLength; + }); + + double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize; + + if (BlockRanges.size() == 1) + { + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "Range request of {} ({:.2f}%) using single range from block {} ({}) as is", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize)); + } + return BlockRanges; + } + + if (LimitToSingleRange) + { + const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); + if (m_Options.IsVerbose) + { + const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; + const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; + + LOG_OUTPUT( + m_LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting " + "{:.2f}% ({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(MergedRange.RangeLength), + RangeRequestedPercent, + WastedPercent, + NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); + } + return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); + } + + if (RangeWantedPercent > FullBlockRangePercentLimit) + { + const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); + if (m_Options.IsVerbose) + { + const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; + const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; + + LOG_OUTPUT(m_LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} " + "({:.2f}%) wasting {:.2f}% ({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + FullBlockRangePercentLimit, + NiceBytes(MergedRange.RangeLength), + RangeRequestedPercent, + WastedPercent, + NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); + } + return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); + } + + std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges); + while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges)) + { + CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges); + } + + const std::uint64_t WantedCollapsedSize = + std::accumulate(CollapsedBlockRanges.begin(), + CollapsedBlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + + const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize; + + if (m_Options.IsVerbose) + { + const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize; + + LOG_OUTPUT( + m_LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% " + "({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(WantedCollapsedSize), + CollapsedRangeRequestedPercent, + CollapsedBlockRanges.size(), + WastedPercent, + NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize)); + } + return CollapsedBlockRanges; +} + +void +BuildsOperationUpdateFolder::DownloadPartialBlock( + const BlockRangeDescriptor BlockRange, + const BlobsExistsResult& ExistsResult, + std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath)>&& OnDownloaded) +{ + const uint32_t BlockIndex = BlockRange.BlockIndex; + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + IoBuffer BlockBuffer; + if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) + { + BlockBuffer = + m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + } + if (!BlockBuffer) + { + BlockBuffer = + m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + } + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeStart + BlockRange.RangeLength)); + } + if (!m_AbortFlag) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += BlockSize; + m_DownloadStats.RequestsCompleteCount++; + + std::filesystem::path BlockChunkPath; + + // Check if the dowloaded block is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("MoveTempPartialBlock"); + + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = m_TempBlockFolderPath / + fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + RenameFile(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; + + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } + } + } + } + + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("WriteTempPartialBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = m_TempBlockFolderPath / + fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } + if (!m_AbortFlag) + { + OnDownloaded(std::move(BlockBuffer), std::move(BlockChunkPath)); + } + } +} + std::vector<uint32_t> BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* CloneQuery, const CopyChunkData& CopyData, @@ -5739,7 +5729,6 @@ BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); FilteredGeneratedBytesPerSecond.Start(); - // TODO: Convert ScheduleWork body to function Stopwatch GenerateTimer; CompressedBuffer CompressedBlock = @@ -5816,7 +5805,6 @@ BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); FilteredUploadedBytesPerSecond.Start(); - // TODO: Convert ScheduleWork body to function const CbObject BlockMetaData = BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], @@ -6890,13 +6878,15 @@ BuildsOperationValidateBuildPart::Execute() PreferredMultipartChunkSize, Work, m_NetworkPool, - m_DownloadStats, + m_DownloadStats.DownloadedChunkByteCount, + m_DownloadStats.MultipartAttachmentCount, [this, &Work, AttachmentsToVerifyCount, &FilteredDownloadedBytesPerSecond, &FilteredVerifiedBytesPerSecond, ChunkHash = ChunkAttachment](IoBuffer&& Payload) { + m_DownloadStats.DownloadedChunkCount++; Payload.SetContentType(ZenContentType::kCompressedBinary); if (!m_AbortFlag) { diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index be6720d5d..fcf880e48 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -12,6 +12,10 @@ #include <atomic> #include <memory> +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + namespace zen { class CloneQueryInterface; @@ -240,6 +244,61 @@ private: std::filesystem::path Path; }; + struct ScavengedSequenceCopyOperation + { + uint32_t ScavengedContentIndex = (uint32_t)-1; + uint32_t ScavengedPathIndex = (uint32_t)-1; + uint32_t RemoteSequenceIndex = (uint32_t)-1; + uint64_t RawSize = (uint32_t)-1; + }; + + struct CopyChunkData + { + uint32_t ScavengeSourceIndex = (uint32_t)-1; + uint32_t SourceSequenceIndex = (uint32_t)-1; + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> TargetChunkLocationPtrs; + struct ChunkTarget + { + uint32_t TargetChunkLocationCount = (uint32_t)-1; + uint32_t RemoteChunkIndex = (uint32_t)-1; + uint64_t CacheFileOffset = (uint64_t)-1; + }; + std::vector<ChunkTarget> ChunkTargets; + }; + + struct BlobsExistsResult + { + tsl::robin_set<IoHash> ExistingBlobs; + uint64_t ElapsedTimeMs = 0; + }; + + struct BlockRangeDescriptor + { + uint32_t BlockIndex = (uint32_t)-1; + uint64_t RangeStart = 0; + uint64_t RangeLength = 0; + uint32_t ChunkBlockIndexStart = 0; + uint32_t ChunkBlockIndexCount = 0; + }; + + struct BlockRangeLimit + { + uint16_t SizePercent; + uint16_t MaxRangeCount; + }; + + static constexpr uint16_t FullBlockRangePercentLimit = 95; + + static constexpr BuildsOperationUpdateFolder::BlockRangeLimit ForceMergeLimits[] = { + {.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1}, + {.SizePercent = 90, .MaxRangeCount = 2}, + {.SizePercent = 85, .MaxRangeCount = 8}, + {.SizePercent = 80, .MaxRangeCount = 16}, + {.SizePercent = 70, .MaxRangeCount = 32}, + {.SizePercent = 60, .MaxRangeCount = 48}, + {.SizePercent = 2, .MaxRangeCount = 56}, + {.SizePercent = 0, .MaxRangeCount = 64}}; + void ScanCacheFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedChunkHashesFound, tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedSequenceHashesFound); void ScanTempBlocksFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedBlocksFound); @@ -265,31 +324,46 @@ private: uint64_t GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, uint32_t ChunkIndex); - struct ScavengedSequenceCopyOperation - { - uint32_t ScavengedContentIndex = (uint32_t)-1; - uint32_t ScavengedPathIndex = (uint32_t)-1; - uint32_t RemoteSequenceIndex = (uint32_t)-1; - uint64_t RawSize = (uint32_t)-1; - }; - void WriteScavengedSequenceToCache(const std::filesystem::path& ScavengeRootPath, const ChunkedFolderContent& ScavengedContent, const ScavengedSequenceCopyOperation& ScavengeOp); - struct CopyChunkData - { - uint32_t ScavengeSourceIndex = (uint32_t)-1; - uint32_t SourceSequenceIndex = (uint32_t)-1; - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> TargetChunkLocationPtrs; - struct ChunkTarget - { - uint32_t TargetChunkLocationCount = (uint32_t)-1; - uint32_t RemoteChunkIndex = (uint32_t)-1; - uint64_t CacheFileOffset = (uint64_t)-1; - }; - std::vector<ChunkTarget> ChunkTargets; - }; + void WriteLooseChunk(const uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::atomic<uint64_t>& WritePartsComplete, + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, + BufferedWriteFileCache& WriteCache, + ParallelWork& Work, + uint64_t TotalRequestCount, + uint64_t TotalPartWriteCount, + FilteredRate& FilteredDownloadedBytesPerSecond, + FilteredRate& FilteredWrittenBytesPerSecond); + + void DownloadBuildBlob(uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, + ParallelWork& Work, + std::function<void(IoBuffer&& Payload)>&& OnDownloaded); + + BlockRangeDescriptor MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges); + std::optional<std::vector<BlockRangeDescriptor>> MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, + const BlockRangeDescriptor& Range); + const BlockRangeLimit* GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits, + uint64_t TotalBlockSize, + std::span<const BlockRangeDescriptor> Ranges); + std::vector<BlockRangeDescriptor> CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, + std::span<const BlockRangeDescriptor> BlockRanges); + uint64_t CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges); + std::optional<std::vector<BlockRangeDescriptor>> CalculateBlockRanges(uint32_t BlockIndex, + const ChunkBlockDescription& BlockDescription, + std::span<const uint32_t> BlockChunkIndexNeeded, + bool LimitToSingleRange, + const uint64_t ChunkStartOffsetInBlock, + const uint64_t TotalBlockSize, + uint64_t& OutTotalWantedChunksSize); + void DownloadPartialBlock(const BlockRangeDescriptor BlockRange, + const BlobsExistsResult& ExistsResult, + std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath)>&& OnDownloaded); std::vector<uint32_t> WriteLocalChunkToCache(CloneQueryInterface* CloneQuery, const CopyChunkData& CopyData, |