diff options
| author | Dan Engelbrecht <[email protected]> | 2026-04-13 14:14:51 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-13 14:14:51 +0200 |
| commit | d7220a07a16f1fc927d7a6eeeff49c7a6bd79a5b (patch) | |
| tree | b1e691bfe4c24f37cf5ba3cc96b719594b0479c7 /src | |
| parent | Some minor polish from tourist branch (#949) (diff) | |
| download | zen-d7220a07a16f1fc927d7a6eeeff49c7a6bd79a5b.tar.xz zen-d7220a07a16f1fc927d7a6eeeff49c7a6bd79a5b.zip | |
silence errors due to abort (#950)
* silence exceptions in threaded requests to build storage if already aborted
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 521 |
1 files changed, 333 insertions, 188 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 1f8b96cc4..3a41cd7eb 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -1664,21 +1664,34 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) } if (!BlockBuffer) { - BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); - if (BlockBuffer && m_Storage.CacheStorage && m_Options.PopulateCache) + try + { + BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); + } + catch (const std::exception&) + { + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } + } + } + if (!m_AbortFlag) + { + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + + if (m_Storage.CacheStorage && m_Options.PopulateCache) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockDescription.BlockHash, ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(BlockBuffer))); } - } - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); - } - if (!m_AbortFlag) - { + uint64_t BlockSize = BlockBuffer.GetSize(); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += BlockSize; @@ -3293,31 +3306,45 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde } else { - BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash); - if (BuildBlob && m_Storage.CacheStorage && m_Options.PopulateCache) + try { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BuildBlob))); + BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash); } - if (!BuildBlob) + catch (const std::exception&) { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } } - if (!m_Options.PrimeCacheOnly) + if (!m_AbortFlag) { - if (!m_AbortFlag) + if (BuildBlob && m_Storage.CacheStorage && m_Options.PopulateCache) { - uint64_t BlobSize = BuildBlob.GetSize(); - m_DownloadStats.DownloadedChunkCount++; - m_DownloadStats.DownloadedChunkByteCount += BlobSize; - if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(BuildBlob))); + } + if (!BuildBlob) + { + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + if (!m_Options.PrimeCacheOnly) + { + if (!m_AbortFlag) { - FilteredDownloadedBytesPerSecond.Stop(); - } + uint64_t BlobSize = BuildBlob.GetSize(); + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.DownloadedChunkByteCount += BlobSize; + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - OnDownloaded(std::move(BuildBlob)); + OnDownloaded(std::move(BuildBlob)); + } } } } @@ -3519,64 +3546,77 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); - BuildStorageBase::BuildBlobRanges RangeBuffers = - m_Storage.BuildStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges); - if (m_AbortFlag) + BuildStorageBase::BuildBlobRanges RangeBuffers; + + try { - break; + RangeBuffers = m_Storage.BuildStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges); } - if (RangeBuffers.PayloadBuffer) + catch (const std::exception&) { - if (RangeBuffers.Ranges.empty()) + // Silence http errors due to abort + if (!m_AbortFlag) { - // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 - // Upload to cache (if enabled) and use the whole payload for the remaining ranges + throw; + } + } - if (m_Storage.CacheStorage && m_Options.PopulateCache) + if (!m_AbortFlag) + { + if (RangeBuffers.PayloadBuffer) + { + if (RangeBuffers.Ranges.empty()) { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - BlockDescription.BlockHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer})); - if (m_AbortFlag) + // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 + // Upload to cache (if enabled) and use the whole payload for the remaining ranges + + if (m_Storage.CacheStorage && m_Options.PopulateCache) { - break; + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer})); + if (m_AbortFlag) + { + break; + } } - } - SubRangeCount = Ranges.size() - SubRangeCountComplete; - ProcessDownload(BlockDescription, - std::move(RangeBuffers.PayloadBuffer), - SubRangeStartIndex, - RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), - TotalRequestCount, - FilteredDownloadedBytesPerSecond, - OnDownloaded); + SubRangeCount = Ranges.size() - SubRangeCountComplete; + ProcessDownload(BlockDescription, + std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), + TotalRequestCount, + FilteredDownloadedBytesPerSecond, + OnDownloaded); + } + else + { + if (RangeBuffers.Ranges.size() != SubRanges.size()) + { + throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", + SubRanges.size(), + BlockDescription.BlockHash, + RangeBuffers.Ranges.size())); + } + ProcessDownload(BlockDescription, + std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangeBuffers.Ranges, + TotalRequestCount, + FilteredDownloadedBytesPerSecond, + OnDownloaded); + } } else { - if (RangeBuffers.Ranges.size() != SubRanges.size()) - { - throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", - SubRanges.size(), - BlockDescription.BlockHash, - RangeBuffers.Ranges.size())); - } - ProcessDownload(BlockDescription, - std::move(RangeBuffers.PayloadBuffer), - SubRangeStartIndex, - RangeBuffers.Ranges, - TotalRequestCount, - FilteredDownloadedBytesPerSecond, - OnDownloaded); + throw std::runtime_error( + fmt::format("Block {} is missing when fetching {} ranges", BlockDescription.BlockHash, SubRangeCount)); } - } - else - { - throw std::runtime_error(fmt::format("Block {} is missing when fetching {} ranges", BlockDescription.BlockHash, SubRangeCount)); - } - SubRangeCountComplete += SubRangeCount; + SubRangeCountComplete += SubRangeCount; + } } } @@ -5150,48 +5190,80 @@ BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& Payload.GetCompressed()); } - m_Storage.BuildStorage->PutBuildBlob(m_BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - std::move(Payload).GetCompressed()); - UploadStats.BlocksBytes += CompressedBlockSize; - - if (m_Options.IsVerbose) + try { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Uploaded block {} ({}) containing {} chunks", - BlockHash, - NiceBytes(CompressedBlockSize), - OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); + m_Storage.BuildStorage->PutBuildBlob(m_BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + std::move(Payload).GetCompressed()); } - - if (m_Storage.CacheStorage && m_Options.PopulateCache) + catch (const std::exception&) { - m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } } - bool MetadataSucceeded = - m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); - if (MetadataSucceeded) + if (!m_AbortFlag) { + UploadStats.BlocksBytes += CompressedBlockSize; + if (m_Options.IsVerbose) { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Uploaded block {} metadata ({})", - BlockHash, - NiceBytes(BlockMetaData.GetSize())); + ZEN_OPERATION_LOG_INFO( + m_LogOutput, + "Uploaded block {} ({}) containing {} chunks", + BlockHash, + NiceBytes(CompressedBlockSize), + OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); } - OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - UploadStats.BlocksBytes += BlockMetaData.GetSize(); - } + if (m_Storage.CacheStorage && m_Options.PopulateCache) + { + m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } - UploadStats.BlockCount++; - if (UploadStats.BlockCount == NewBlockCount) - { - FilteredUploadedBytesPerSecond.Stop(); + bool MetadataSucceeded = false; + try + { + MetadataSucceeded = + m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); + } + catch (const std::exception&) + { + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } + } + + if (!m_AbortFlag) + { + if (MetadataSucceeded) + { + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Uploaded block {} metadata ({})", + BlockHash, + NiceBytes(BlockMetaData.GetSize())); + } + + OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; + UploadStats.BlocksBytes += BlockMetaData.GetSize(); + } + + UploadStats.BlockCount++; + if (UploadStats.BlockCount == NewBlockCount) + { + FilteredUploadedBytesPerSecond.Stop(); + } + } } } } @@ -6215,44 +6287,76 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); } - m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); - if (m_Options.IsVerbose) + + try { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Uploaded block {} ({}) containing {} chunks", - BlockHash, - NiceBytes(PayloadSize), - NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); + m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); } - UploadedBlockSize += PayloadSize; - TempUploadStats.BlocksBytes += PayloadSize; - - if (m_Storage.CacheStorage && m_Options.PopulateCache) + catch (const std::exception&) { - m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } } - bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); - if (MetadataSucceeded) + + if (!m_AbortFlag) { if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Uploaded block {} metadata ({})", + "Uploaded block {} ({}) containing {} chunks", BlockHash, - NiceBytes(BlockMetaData.GetSize())); + NiceBytes(PayloadSize), + NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); } + UploadedBlockSize += PayloadSize; + TempUploadStats.BlocksBytes += PayloadSize; - NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - TempUploadStats.BlocksBytes += BlockMetaData.GetSize(); - } + if (m_Storage.CacheStorage && m_Options.PopulateCache) + { + m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + + bool MetadataSucceeded = false; + try + { + MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); + } + catch (const std::exception&) + { + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } + } + if (!m_AbortFlag) + { + if (MetadataSucceeded) + { + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Uploaded block {} metadata ({})", + BlockHash, + NiceBytes(BlockMetaData.GetSize())); + } + + NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; + TempUploadStats.BlocksBytes += BlockMetaData.GetSize(); + } - TempUploadStats.BlockCount++; + TempUploadStats.BlockCount++; - if (UploadedBlockCount.fetch_add(1) + 1 == UploadBlockCount && UploadedChunkCount == UploadChunkCount) - { - FilteredUploadedBytesPerSecond.Stop(); + if (UploadedBlockCount.fetch_add(1) + 1 == UploadBlockCount && UploadedChunkCount == UploadChunkCount) + { + FilteredUploadedBytesPerSecond.Stop(); + } + } } } }); @@ -6302,72 +6406,100 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart"); TempUploadStats.MultipartAttachmentCount++; - std::vector<std::function<void()>> MultipartWork = m_Storage.BuildStorage->PutLargeBuildBlob( - m_BuildId, - RawHash, - ZenContentType::kCompressedBinary, - PayloadSize, - [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset, - uint64_t Size) mutable -> IoBuffer { - FilteredUploadedBytesPerSecond.Start(); - - IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer(); - PartPayload.SetContentType(ZenContentType::kBinary); - return PartPayload; - }, - [RawSize, - &TempUploadStats, - &UploadedCompressedChunkSize, - &UploadChunkPool, - &UploadedBlockCount, - UploadBlockCount, - &UploadedChunkCount, - UploadChunkCount, - &FilteredUploadedBytesPerSecond, - &UploadedRawChunkSize](uint64_t SentBytes, bool IsComplete) { - TempUploadStats.ChunksBytes += SentBytes; - UploadedCompressedChunkSize += SentBytes; - if (IsComplete) - { - TempUploadStats.ChunkCount++; - if (UploadedChunkCount.fetch_add(1) + 1 == UploadChunkCount && - UploadedBlockCount == UploadBlockCount) + try + { + std::vector<std::function<void()>> MultipartWork = m_Storage.BuildStorage->PutLargeBuildBlob( + m_BuildId, + RawHash, + ZenContentType::kCompressedBinary, + PayloadSize, + [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset, + uint64_t Size) mutable -> IoBuffer { + FilteredUploadedBytesPerSecond.Start(); + + IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer(); + PartPayload.SetContentType(ZenContentType::kBinary); + return PartPayload; + }, + [RawSize, + &TempUploadStats, + &UploadedCompressedChunkSize, + &UploadChunkPool, + &UploadedBlockCount, + UploadBlockCount, + &UploadedChunkCount, + UploadChunkCount, + &FilteredUploadedBytesPerSecond, + &UploadedRawChunkSize](uint64_t SentBytes, bool IsComplete) { + TempUploadStats.ChunksBytes += SentBytes; + UploadedCompressedChunkSize += SentBytes; + if (IsComplete) { - FilteredUploadedBytesPerSecond.Stop(); + TempUploadStats.ChunkCount++; + if (UploadedChunkCount.fetch_add(1) + 1 == UploadChunkCount && + UploadedBlockCount == UploadBlockCount) + { + FilteredUploadedBytesPerSecond.Stop(); + } + UploadedRawChunkSize += RawSize; } - UploadedRawChunkSize += RawSize; - } - }); - for (auto& WorkPart : MultipartWork) - { - Work.ScheduleWork(UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>& AbortFlag) { - ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work"); - if (!AbortFlag) - { - Work(); - } - }); + }); + for (auto& WorkPart : MultipartWork) + { + Work.ScheduleWork(UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>& AbortFlag) { + ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work"); + if (!AbortFlag) + { + Work(); + } + }); + } + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Uploaded multipart chunk {} ({})", + RawHash, + NiceBytes(PayloadSize)); + } } - if (m_Options.IsVerbose) + catch (const std::exception&) { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize)); + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } } } else { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart"); - m_Storage.BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); - if (m_Options.IsVerbose) + try { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); + m_Storage.BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); + } + catch (const std::exception&) + { + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } } - TempUploadStats.ChunksBytes += Payload.GetSize(); - TempUploadStats.ChunkCount++; - UploadedCompressedChunkSize += Payload.GetSize(); - UploadedRawChunkSize += RawSize; - if (UploadedChunkCount.fetch_add(1) + 1 == UploadChunkCount && UploadedBlockCount == UploadBlockCount) + if (!m_AbortFlag) { - FilteredUploadedBytesPerSecond.Stop(); + if (m_Options.IsVerbose) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); + } + TempUploadStats.ChunksBytes += Payload.GetSize(); + TempUploadStats.ChunkCount++; + UploadedCompressedChunkSize += Payload.GetSize(); + UploadedRawChunkSize += RawSize; + if (UploadedChunkCount.fetch_add(1) + 1 == UploadChunkCount && UploadedBlockCount == UploadBlockCount) + { + FilteredUploadedBytesPerSecond.Stop(); + } } } } @@ -7147,10 +7279,23 @@ BuildsOperationPrimeCache::Execute() } else { - IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash); - m_DownloadStats.DownloadedBlockCount++; - m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); - m_DownloadStats.RequestsCompleteCount++; + IoBuffer Payload; + try + { + Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash); + + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); + m_DownloadStats.RequestsCompleteCount++; + } + catch (const std::exception&) + { + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } + } if (!m_AbortFlag) { @@ -7161,10 +7306,10 @@ BuildsOperationPrimeCache::Execute() ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(std::move(Payload)))); } - } - if (CompletedDownloadCount.fetch_add(1) + 1 == BlobCount) - { - FilteredDownloadedBytesPerSecond.Stop(); + if (CompletedDownloadCount.fetch_add(1) + 1 == BlobCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } } } } |