aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-31 13:18:59 +0200
committerGitHub Enterprise <[email protected]>2026-03-31 13:18:59 +0200
commit966790fec2457b6c01c7bd51d1e01cb0cf0e069e (patch)
tree7dce63b16ef63da33e00cc4806e80fc20d9046f4 /src
parentfix jupiterbuildstorage concurrency (#906) (diff)
downloadzen-966790fec2457b6c01c7bd51d1e01cb0cf0e069e.tar.xz
zen-966790fec2457b6c01c7bd51d1e01cb0cf0e069e.zip
fix potential race with stats counters missing when to Stop filtered values (#907)
* fix potential race with stats counters missing when to Stop filtered values * fix off by one in PutMultipartBuildBlob retry path * use move operation instead of copy operation PutMultipartBlob * fix filter Stop() for upload operations and fix bug with generateblock count filter
Diffstat (limited to 'src')
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp155
-rw-r--r--src/zenremotestore/builds/jupiterbuildstorage.cpp2
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h12
-rw-r--r--src/zenremotestore/jupiter/jupitersession.cpp2
4 files changed, 92 insertions, 79 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index a04063c4c..ca226444a 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -1149,8 +1149,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp);
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
+ if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
}
@@ -1252,10 +1251,10 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
ScavengedLookups,
ScavengedPaths,
WriteCache);
- WritePartsComplete++;
+ bool WritePartsDone = WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount;
if (!m_AbortFlag)
{
- if (WritePartsComplete == TotalPartWriteCount)
+ if (WritePartsDone)
{
FilteredWrittenBytesPerSecond.Stop();
}
@@ -1334,9 +1333,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
Ec.message());
}
- WritePartsComplete++;
-
- if (WritePartsComplete == TotalPartWriteCount)
+ if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
}
@@ -1389,25 +1386,20 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
BlockRangeStartIndex,
RangeCount,
ExistsResult,
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
[this,
&RemoteChunkIndexNeedsCopyFromSourceFlags,
&SequenceIndexChunksLeftToWriteCounters,
&WritePartsComplete,
&WriteCache,
&Work,
- TotalRequestCount,
TotalPartWriteCount,
- &FilteredDownloadedBytesPerSecond,
&FilteredWrittenBytesPerSecond,
&PartialBlocks](IoBuffer&& InMemoryBuffer,
const std::filesystem::path& OnDiskPath,
size_t BlockRangeStartIndex,
std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) {
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
-
if (!m_AbortFlag)
{
Work.ScheduleWork(
@@ -1483,8 +1475,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
}
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
+ if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
}
@@ -1571,8 +1562,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
uint64_t BlockSize = BlockBuffer.GetSize();
m_DownloadStats.DownloadedBlockCount++;
m_DownloadStats.DownloadedBlockByteCount += BlockSize;
- m_DownloadStats.RequestsCompleteCount++;
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -1683,9 +1673,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
}
}
- WritePartsComplete++;
-
- if (WritePartsComplete == TotalPartWriteCount)
+ if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
}
@@ -2987,8 +2975,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash);
if (!ExistingCompressedChunkPath.empty())
{
- m_DownloadStats.RequestsCompleteCount++;
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -3027,11 +3014,11 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
bool NeedHashVerify =
WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart));
- WritePartsComplete++;
+ bool WritePartsDone = WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount;
if (!AbortFlag)
{
- if (WritePartsComplete == TotalPartWriteCount)
+ if (WritePartsDone)
{
FilteredWrittenBytesPerSecond.Stop();
}
@@ -3085,6 +3072,8 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
DownloadBuildBlob(RemoteChunkIndex,
ExistsResult,
Work,
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
[this,
&ExistsResult,
SequenceIndexChunksLeftToWriteCounters,
@@ -3092,15 +3081,9 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
&Work,
&WritePartsComplete,
TotalPartWriteCount,
- TotalRequestCount,
RemoteChunkIndex,
- &FilteredDownloadedBytesPerSecond,
&FilteredWrittenBytesPerSecond,
ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable {
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
IoBufferFileReference FileRef;
bool EnableBacklog = Payload.GetFileReference(FileRef);
AsyncWriteDownloadedChunk(m_Options.ZenFolderPath,
@@ -3125,6 +3108,8 @@ void
BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkIndex,
const BlobsExistsResult& ExistsResult,
ParallelWork& Work,
+ uint64_t TotalRequestCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond,
std::function<void(IoBuffer&& Payload)>&& OnDownloaded)
{
const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
@@ -3140,37 +3125,48 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
uint64_t BlobSize = BuildBlob.GetSize();
m_DownloadStats.DownloadedChunkCount++;
m_DownloadStats.DownloadedChunkByteCount += BlobSize;
- m_DownloadStats.RequestsCompleteCount++;
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
OnDownloaded(std::move(BuildBlob));
}
else
{
if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= m_Options.LargeAttachmentSize)
{
- DownloadLargeBlob(
- *m_Storage.BuildStorage,
- m_TempDownloadFolderPath,
- m_BuildId,
- ChunkHash,
- m_Options.PreferredMultipartChunkSize,
- Work,
- m_NetworkPool,
- m_DownloadStats.DownloadedChunkByteCount,
- m_DownloadStats.MultipartAttachmentCount,
- [this, &Work, ChunkHash, RemoteChunkIndex, OnDownloaded = std::move(OnDownloaded)](IoBuffer&& Payload) mutable {
- m_DownloadStats.DownloadedChunkCount++;
- m_DownloadStats.RequestsCompleteCount++;
+ DownloadLargeBlob(*m_Storage.BuildStorage,
+ m_TempDownloadFolderPath,
+ m_BuildId,
+ ChunkHash,
+ m_Options.PreferredMultipartChunkSize,
+ Work,
+ m_NetworkPool,
+ m_DownloadStats.DownloadedChunkByteCount,
+ m_DownloadStats.MultipartAttachmentCount,
+ [this,
+ &Work,
+ &FilteredDownloadedBytesPerSecond,
+ ChunkHash,
+ RemoteChunkIndex,
+ TotalRequestCount,
+ OnDownloaded = std::move(OnDownloaded)](IoBuffer&& Payload) mutable {
+ m_DownloadStats.DownloadedChunkCount++;
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache)
- {
- m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
- }
+ if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache)
+ {
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
- OnDownloaded(std::move(Payload));
- });
+ OnDownloaded(std::move(Payload));
+ });
}
else
{
@@ -3193,7 +3189,10 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
uint64_t BlobSize = BuildBlob.GetSize();
m_DownloadStats.DownloadedChunkCount++;
m_DownloadStats.DownloadedChunkByteCount += BlobSize;
- m_DownloadStats.RequestsCompleteCount++;
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
OnDownloaded(std::move(BuildBlob));
}
@@ -3208,6 +3207,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
size_t BlockRangeStartIndex,
size_t BlockRangeCount,
const BlobsExistsResult& ExistsResult,
+ uint64_t TotalRequestCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond,
std::function<void(IoBuffer&& InMemoryBuffer,
const std::filesystem::path& OnDiskPath,
size_t BlockRangeStartIndex,
@@ -3222,6 +3223,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
IoBuffer&& BlockRangeBuffer,
size_t BlockRangeStartIndex,
std::span<const std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths,
+ uint64_t TotalRequestCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond,
const std::function<void(IoBuffer && InMemoryBuffer,
const std::filesystem::path& OnDiskPath,
size_t BlockRangeStartIndex,
@@ -3229,7 +3232,11 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
uint64_t BlockRangeBufferSize = BlockRangeBuffer.GetSize();
m_DownloadStats.DownloadedBlockCount++;
m_DownloadStats.DownloadedBlockByteCount += BlockRangeBufferSize;
- m_DownloadStats.RequestsCompleteCount += BlockOffsetAndLengths.size();
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(BlockOffsetAndLengths.size()) + BlockOffsetAndLengths.size() ==
+ TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
std::filesystem::path BlockChunkPath;
@@ -3337,6 +3344,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
std::move(PayloadBuffer),
SubRangeStartIndex,
std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)},
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
OnDownloaded);
SubRangeCountComplete += SubRangeCount;
continue;
@@ -3361,6 +3370,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
std::move(RangeBuffers.PayloadBuffer),
SubRangeStartIndex,
RangesSpan.subspan(SubRangeCountComplete, SubRangeCount),
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
OnDownloaded);
SubRangeCountComplete += SubRangeCount;
continue;
@@ -3371,6 +3382,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
std::move(RangeBuffers.PayloadBuffer),
SubRangeStartIndex,
RangeBuffers.Ranges,
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
OnDownloaded);
SubRangeCountComplete += SubRangeCount;
continue;
@@ -3413,6 +3426,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
std::move(RangeBuffers.PayloadBuffer),
SubRangeStartIndex,
RangesSpan.subspan(SubRangeCountComplete, SubRangeCount),
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
OnDownloaded);
}
else
@@ -3428,6 +3443,8 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
std::move(RangeBuffers.PayloadBuffer),
SubRangeStartIndex,
RangeBuffers.Ranges,
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
OnDownloaded);
}
}
@@ -4244,8 +4261,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart));
if (!m_AbortFlag)
{
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
+ if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
}
@@ -6111,8 +6127,7 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
TempUploadStats.BlockCount++;
- UploadedBlockCount++;
- if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount)
+ if (UploadedBlockCount.fetch_add(1) + 1 == UploadBlockCount && UploadedChunkCount == UploadChunkCount)
{
FilteredUploadedBytesPerSecond.Stop();
}
@@ -6192,8 +6207,8 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
if (IsComplete)
{
TempUploadStats.ChunkCount++;
- UploadedChunkCount++;
- if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount)
+ if (UploadedChunkCount.fetch_add(1) + 1 == UploadChunkCount &&
+ UploadedBlockCount == UploadBlockCount)
{
FilteredUploadedBytesPerSecond.Stop();
}
@@ -6227,8 +6242,7 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
TempUploadStats.ChunkCount++;
UploadedCompressedChunkSize += Payload.GetSize();
UploadedRawChunkSize += RawSize;
- UploadedChunkCount++;
- if (UploadedChunkCount == UploadChunkCount)
+ if (UploadedChunkCount.fetch_add(1) + 1 == UploadChunkCount && UploadedBlockCount == UploadBlockCount)
{
FilteredUploadedBytesPerSecond.Stop();
}
@@ -6237,8 +6251,6 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
});
};
- std::vector<size_t> GenerateBlockIndexes;
-
std::atomic<uint64_t> GeneratedBlockCount = 0;
std::atomic<uint64_t> GeneratedBlockByteCount = 0;
@@ -6260,9 +6272,9 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
&Lookup,
&NewBlocks,
&NewBlockChunks,
- &GenerateBlockIndexes,
&GeneratedBlockCount,
&GeneratedBlockByteCount,
+ GenerateBlockCount = BlockIndexes.size(),
&AsyncUploadBlock,
&QueuedPendingInMemoryBlocksForUpload](std::atomic<bool>&) {
if (!m_AbortFlag)
@@ -6293,8 +6305,7 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
}
GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
- GeneratedBlockCount++;
- if (GeneratedBlockCount == GenerateBlockIndexes.size())
+ if (GeneratedBlockCount.fetch_add(1) + 1 == GenerateBlockCount)
{
FilteredGenerateBlockBytesPerSecond.Stop();
}
@@ -7005,8 +7016,7 @@ BuildsOperationPrimeCache::Execute()
CompositeBuffer(SharedBuffer(Payload)));
}
}
- CompletedDownloadCount++;
- if (CompletedDownloadCount == BlobCount)
+ if (CompletedDownloadCount.fetch_add(1) + 1 == BlobCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
@@ -7029,8 +7039,7 @@ BuildsOperationPrimeCache::Execute()
CompositeBuffer(SharedBuffer(std::move(Payload))));
}
}
- CompletedDownloadCount++;
- if (CompletedDownloadCount == BlobCount)
+ if (CompletedDownloadCount.fetch_add(1) + 1 == BlobCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
diff --git a/src/zenremotestore/builds/jupiterbuildstorage.cpp b/src/zenremotestore/builds/jupiterbuildstorage.cpp
index fa1a69f18..d837ce07f 100644
--- a/src/zenremotestore/builds/jupiterbuildstorage.cpp
+++ b/src/zenremotestore/builds/jupiterbuildstorage.cpp
@@ -263,7 +263,7 @@ public:
std::vector<std::function<void()>> WorkList;
for (auto& WorkItem : WorkItems)
{
- WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes]() {
+ WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes = std::move(OnSentBytes)]() {
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
bool IsComplete = false;
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index 0d2eded58..27dc9de86 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -261,12 +261,16 @@ private:
void DownloadBuildBlob(uint32_t RemoteChunkIndex,
const BlobsExistsResult& ExistsResult,
ParallelWork& Work,
+ uint64_t TotalRequestCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond,
std::function<void(IoBuffer&& Payload)>&& OnDownloaded);
- void DownloadPartialBlock(std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges,
- size_t BlockRangeIndex,
- size_t BlockRangeCount,
- const BlobsExistsResult& ExistsResult,
+ void DownloadPartialBlock(std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges,
+ size_t BlockRangeIndex,
+ size_t BlockRangeCount,
+ const BlobsExistsResult& ExistsResult,
+ uint64_t TotalRequestCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond,
std::function<void(IoBuffer&& InMemoryBuffer,
const std::filesystem::path& OnDiskPath,
size_t BlockRangeStartIndex,
diff --git a/src/zenremotestore/jupiter/jupitersession.cpp b/src/zenremotestore/jupiter/jupitersession.cpp
index a9788cb4e..d610d1fc8 100644
--- a/src/zenremotestore/jupiter/jupitersession.cpp
+++ b/src/zenremotestore/jupiter/jupitersession.cpp
@@ -673,7 +673,7 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
size_t RetryPartIndex = PartNameToIndex.at(RetryPartId);
const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex];
IoBuffer RetryPartPayload =
- Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1);
+ Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte);
std::string RetryMultipartUploadResponseRequestString =
fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}",
Namespace,