diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-31 13:18:59 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-31 13:18:59 +0200 |
| commit | 966790fec2457b6c01c7bd51d1e01cb0cf0e069e (patch) | |
| tree | 7dce63b16ef63da33e00cc4806e80fc20d9046f4 /src | |
| parent | fix jupiterbuildstorage concurrency (#906) (diff) | |
| download | zen-966790fec2457b6c01c7bd51d1e01cb0cf0e069e.tar.xz zen-966790fec2457b6c01c7bd51d1e01cb0cf0e069e.zip | |
fix potential race with stats counters missing when to Stop filtered values (#907)
* fix potential race with stats counters missing when to Stop filtered values
* fix off by one in PutMultipartBuildBlob retry path
* use move operation instead of copy operation PutMultipartBlob
* fix filter Stop() for upload operations and fix bug with generateblock count filter
Diffstat (limited to 'src')
4 files changed, 92 insertions, 79 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index a04063c4c..ca226444a 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -1149,8 +1149,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) + if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } @@ -1252,10 +1251,10 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) ScavengedLookups, ScavengedPaths, WriteCache); - WritePartsComplete++; + bool WritePartsDone = WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount; if (!m_AbortFlag) { - if (WritePartsComplete == TotalPartWriteCount) + if (WritePartsDone) { FilteredWrittenBytesPerSecond.Stop(); } @@ -1334,9 +1333,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) Ec.message()); } - WritePartsComplete++; - - if (WritePartsComplete == TotalPartWriteCount) + if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } @@ -1389,25 +1386,20 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) BlockRangeStartIndex, RangeCount, ExistsResult, + TotalRequestCount, + FilteredDownloadedBytesPerSecond, [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, &WritePartsComplete, &WriteCache, &Work, - TotalRequestCount, TotalPartWriteCount, - &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, &PartialBlocks](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath, size_t BlockRangeStartIndex, std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) { - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (!m_AbortFlag) { Work.ScheduleWork( @@ -1483,8 +1475,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) + if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } @@ -1571,8 +1562,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) uint64_t BlockSize = BlockBuffer.GetSize(); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += BlockSize; - m_DownloadStats.RequestsCompleteCount++; - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } @@ -1683,9 +1673,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) } } - WritePartsComplete++; - - if (WritePartsComplete == TotalPartWriteCount) + if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } @@ -2987,8 +2975,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); if (!ExistingCompressedChunkPath.empty()) { - m_DownloadStats.RequestsCompleteCount++; - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } @@ -3027,11 +3014,11 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); - WritePartsComplete++; + bool WritePartsDone = WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount; if (!AbortFlag) { - if (WritePartsComplete == TotalPartWriteCount) + if (WritePartsDone) { FilteredWrittenBytesPerSecond.Stop(); } @@ -3085,6 +3072,8 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd DownloadBuildBlob(RemoteChunkIndex, ExistsResult, Work, + TotalRequestCount, + FilteredDownloadedBytesPerSecond, [this, &ExistsResult, SequenceIndexChunksLeftToWriteCounters, @@ -3092,15 +3081,9 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd &Work, &WritePartsComplete, TotalPartWriteCount, - TotalRequestCount, RemoteChunkIndex, - &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable { - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } IoBufferFileReference FileRef; bool EnableBacklog = Payload.GetFileReference(FileRef); AsyncWriteDownloadedChunk(m_Options.ZenFolderPath, @@ -3125,6 +3108,8 @@ void BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkIndex, const BlobsExistsResult& ExistsResult, ParallelWork& Work, + uint64_t TotalRequestCount, + FilteredRate& FilteredDownloadedBytesPerSecond, std::function<void(IoBuffer&& Payload)>&& OnDownloaded) { const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; @@ -3140,37 +3125,48 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde uint64_t BlobSize = BuildBlob.GetSize(); m_DownloadStats.DownloadedChunkCount++; m_DownloadStats.DownloadedChunkByteCount += BlobSize; - m_DownloadStats.RequestsCompleteCount++; + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } OnDownloaded(std::move(BuildBlob)); } else { if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= m_Options.LargeAttachmentSize) { - DownloadLargeBlob( - *m_Storage.BuildStorage, - m_TempDownloadFolderPath, - m_BuildId, - ChunkHash, - m_Options.PreferredMultipartChunkSize, - Work, - m_NetworkPool, - m_DownloadStats.DownloadedChunkByteCount, - m_DownloadStats.MultipartAttachmentCount, - [this, &Work, ChunkHash, RemoteChunkIndex, OnDownloaded = std::move(OnDownloaded)](IoBuffer&& Payload) mutable { - m_DownloadStats.DownloadedChunkCount++; - m_DownloadStats.RequestsCompleteCount++; + DownloadLargeBlob(*m_Storage.BuildStorage, + m_TempDownloadFolderPath, + m_BuildId, + ChunkHash, + m_Options.PreferredMultipartChunkSize, + Work, + m_NetworkPool, + m_DownloadStats.DownloadedChunkByteCount, + m_DownloadStats.MultipartAttachmentCount, + [this, + &Work, + &FilteredDownloadedBytesPerSecond, + ChunkHash, + RemoteChunkIndex, + TotalRequestCount, + OnDownloaded = std::move(OnDownloaded)](IoBuffer&& Payload) mutable { + m_DownloadStats.DownloadedChunkCount++; + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } + if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache) + { + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); + } - OnDownloaded(std::move(Payload)); - }); + OnDownloaded(std::move(Payload)); + }); } else { @@ -3193,7 +3189,10 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde uint64_t BlobSize = BuildBlob.GetSize(); m_DownloadStats.DownloadedChunkCount++; m_DownloadStats.DownloadedChunkByteCount += BlobSize; - m_DownloadStats.RequestsCompleteCount++; + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } OnDownloaded(std::move(BuildBlob)); } @@ -3208,6 +3207,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( size_t BlockRangeStartIndex, size_t BlockRangeCount, const BlobsExistsResult& ExistsResult, + uint64_t TotalRequestCount, + FilteredRate& FilteredDownloadedBytesPerSecond, std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath, size_t BlockRangeStartIndex, @@ -3222,6 +3223,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( IoBuffer&& BlockRangeBuffer, size_t BlockRangeStartIndex, std::span<const std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths, + uint64_t TotalRequestCount, + FilteredRate& FilteredDownloadedBytesPerSecond, const std::function<void(IoBuffer && InMemoryBuffer, const std::filesystem::path& OnDiskPath, size_t BlockRangeStartIndex, @@ -3229,7 +3232,11 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( uint64_t BlockRangeBufferSize = BlockRangeBuffer.GetSize(); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += BlockRangeBufferSize; - m_DownloadStats.RequestsCompleteCount += BlockOffsetAndLengths.size(); + if (m_DownloadStats.RequestsCompleteCount.fetch_add(BlockOffsetAndLengths.size()) + BlockOffsetAndLengths.size() == + TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } std::filesystem::path BlockChunkPath; @@ -3337,6 +3344,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( std::move(PayloadBuffer), SubRangeStartIndex, std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)}, + TotalRequestCount, + FilteredDownloadedBytesPerSecond, OnDownloaded); SubRangeCountComplete += SubRangeCount; continue; @@ -3361,6 +3370,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), + TotalRequestCount, + FilteredDownloadedBytesPerSecond, OnDownloaded); SubRangeCountComplete += SubRangeCount; continue; @@ -3371,6 +3382,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangeBuffers.Ranges, + TotalRequestCount, + FilteredDownloadedBytesPerSecond, OnDownloaded); SubRangeCountComplete += SubRangeCount; continue; @@ -3413,6 +3426,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), + TotalRequestCount, + FilteredDownloadedBytesPerSecond, OnDownloaded); } else @@ -3428,6 +3443,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangeBuffers.Ranges, + TotalRequestCount, + FilteredDownloadedBytesPerSecond, OnDownloaded); } } @@ -4244,8 +4261,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); if (!m_AbortFlag) { - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) + if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } @@ -6111,8 +6127,7 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co TempUploadStats.BlockCount++; - UploadedBlockCount++; - if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) + if (UploadedBlockCount.fetch_add(1) + 1 == UploadBlockCount && UploadedChunkCount == UploadChunkCount) { FilteredUploadedBytesPerSecond.Stop(); } @@ -6192,8 +6207,8 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co if (IsComplete) { TempUploadStats.ChunkCount++; - UploadedChunkCount++; - if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) + if (UploadedChunkCount.fetch_add(1) + 1 == UploadChunkCount && + UploadedBlockCount == UploadBlockCount) { FilteredUploadedBytesPerSecond.Stop(); } @@ -6227,8 +6242,7 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co TempUploadStats.ChunkCount++; UploadedCompressedChunkSize += Payload.GetSize(); UploadedRawChunkSize += RawSize; - UploadedChunkCount++; - if (UploadedChunkCount == UploadChunkCount) + if (UploadedChunkCount.fetch_add(1) + 1 == UploadChunkCount && UploadedBlockCount == UploadBlockCount) { FilteredUploadedBytesPerSecond.Stop(); } @@ -6237,8 +6251,6 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co }); }; - std::vector<size_t> GenerateBlockIndexes; - std::atomic<uint64_t> GeneratedBlockCount = 0; std::atomic<uint64_t> GeneratedBlockByteCount = 0; @@ -6260,9 +6272,9 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co &Lookup, &NewBlocks, &NewBlockChunks, - &GenerateBlockIndexes, &GeneratedBlockCount, &GeneratedBlockByteCount, + GenerateBlockCount = BlockIndexes.size(), &AsyncUploadBlock, &QueuedPendingInMemoryBlocksForUpload](std::atomic<bool>&) { if (!m_AbortFlag) @@ -6293,8 +6305,7 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co } GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; - GeneratedBlockCount++; - if (GeneratedBlockCount == GenerateBlockIndexes.size()) + if (GeneratedBlockCount.fetch_add(1) + 1 == GenerateBlockCount) { FilteredGenerateBlockBytesPerSecond.Stop(); } @@ -7005,8 +7016,7 @@ BuildsOperationPrimeCache::Execute() CompositeBuffer(SharedBuffer(Payload))); } } - CompletedDownloadCount++; - if (CompletedDownloadCount == BlobCount) + if (CompletedDownloadCount.fetch_add(1) + 1 == BlobCount) { FilteredDownloadedBytesPerSecond.Stop(); } @@ -7029,8 +7039,7 @@ BuildsOperationPrimeCache::Execute() CompositeBuffer(SharedBuffer(std::move(Payload)))); } } - CompletedDownloadCount++; - if (CompletedDownloadCount == BlobCount) + if (CompletedDownloadCount.fetch_add(1) + 1 == BlobCount) { FilteredDownloadedBytesPerSecond.Stop(); } diff --git a/src/zenremotestore/builds/jupiterbuildstorage.cpp b/src/zenremotestore/builds/jupiterbuildstorage.cpp index fa1a69f18..d837ce07f 100644 --- a/src/zenremotestore/builds/jupiterbuildstorage.cpp +++ b/src/zenremotestore/builds/jupiterbuildstorage.cpp @@ -263,7 +263,7 @@ public: std::vector<std::function<void()>> WorkList; for (auto& WorkItem : WorkItems) { - WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes]() { + WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes = std::move(OnSentBytes)]() { Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); bool IsComplete = false; diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 0d2eded58..27dc9de86 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -261,12 +261,16 @@ private: void DownloadBuildBlob(uint32_t RemoteChunkIndex, const BlobsExistsResult& ExistsResult, ParallelWork& Work, + uint64_t TotalRequestCount, + FilteredRate& FilteredDownloadedBytesPerSecond, std::function<void(IoBuffer&& Payload)>&& OnDownloaded); - void DownloadPartialBlock(std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges, - size_t BlockRangeIndex, - size_t BlockRangeCount, - const BlobsExistsResult& ExistsResult, + void DownloadPartialBlock(std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges, + size_t BlockRangeIndex, + size_t BlockRangeCount, + const BlobsExistsResult& ExistsResult, + uint64_t TotalRequestCount, + FilteredRate& FilteredDownloadedBytesPerSecond, std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath, size_t BlockRangeStartIndex, diff --git a/src/zenremotestore/jupiter/jupitersession.cpp b/src/zenremotestore/jupiter/jupitersession.cpp index a9788cb4e..d610d1fc8 100644 --- a/src/zenremotestore/jupiter/jupitersession.cpp +++ b/src/zenremotestore/jupiter/jupitersession.cpp @@ -673,7 +673,7 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, size_t RetryPartIndex = PartNameToIndex.at(RetryPartId); const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex]; IoBuffer RetryPartPayload = - Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1); + Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte); std::string RetryMultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", Namespace, |