diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-19 22:25:58 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-19 22:25:58 +0200 |
| commit | 49701314f570da3622f11eb37cc889c7d39d9a93 (patch) | |
| tree | 6159bfc2ba7974a453ded7a58813134e523e9a62 /src | |
| parent | parallel work handle dispatch exception (#400) (diff) | |
| download | zen-49701314f570da3622f11eb37cc889c7d39d9a93.tar.xz zen-49701314f570da3622f11eb37cc889c7d39d9a93.zip | |
handle exception with batch work (#401)
* use ParallelWork in rpc playback
* use ParallelWork in projectstore
* use ParallelWork in buildstore
* use ParallelWork in cachedisklayer
* use ParallelWork in compactcas
* use ParallelWork in filecas
* don't set abort flag in ParallelWork destructor
* add PrepareFileForScatteredWrite for temp files in httpclient
* Use PrepareFileForScatteredWrite when stream-decompressing files
* be more relaxed when deleting temp files
* allow explicit zen-cache when using direct host url without resolving
* fix lambda capture when writing loose chunks
* no delay when attempting to remove temp files
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 542 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 6 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 17 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 40 | ||||
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 58 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 25 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 128 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 70 | ||||
| -rw-r--r-- | src/zenutil/parallelwork.cpp | 1 |
9 files changed, 463 insertions, 424 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 815bb7597..78d362fdb 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -244,6 +244,24 @@ namespace { } } + void TryRemoveFile(const std::filesystem::path& Path) + { + std::error_code Ec; + RemoveFile(Path, Ec); + if (Ec) + { + if (IsFile(Path, Ec)) + { + Ec.clear(); + RemoveFile(Path, Ec); + if (Ec) + { + ZEN_DEBUG("Failed removing file '{}', reason: {}", Path, Ec.message()); + } + } + } + } + void RemoveFileWithRetry(const std::filesystem::path& Path) { std::error_code Ec; @@ -4864,6 +4882,7 @@ namespace { { throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); } + PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize); IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream( @@ -5055,7 +5074,10 @@ namespace { FilteredWrittenBytesPerSecond.Stop(); } - RemoveFileWithRetry(CompressedChunkPath); + if (!CompressedChunkPath.empty()) + { + TryRemoveFile(CompressedChunkPath); + } std::vector<uint32_t> CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); @@ -5304,7 +5326,7 @@ namespace { } } } - RemoveFileWithRetry(CacheDirContent.Files[Index]); + TryRemoveFile(CacheDirContent.Files[Index]); } CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); } @@ -5353,7 +5375,7 @@ namespace { } } } - RemoveFileWithRetry(BlockDirContent.Files[Index]); + TryRemoveFile(BlockDirContent.Files[Index]); } CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); @@ -6227,279 +6249,284 @@ namespace { continue; } - Work.ScheduleWork(WritePool, [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); - std::filesystem::path ExistingCompressedChunkPath; - if (!PrimeCacheOnly) + Work.ScheduleWork( + WritePool, + [&, RemoteChunkIndex, ChunkTargetPtrs, BuildId, TotalRequestCount, TotalPartWriteCount](std::atomic<bool>&) mutable { + if (!AbortFlag) { - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - std::filesystem::path CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); - if (IsFile(CompressedChunkPath)) + ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); + std::filesystem::path ExistingCompressedChunkPath; + if (!PrimeCacheOnly) { - IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath); - if (ExistingCompressedPart) + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + std::filesystem::path CompressedChunkPath = + ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); + if (IsFile(CompressedChunkPath)) { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) + IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath); + if (ExistingCompressedPart) { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) { - FilteredDownloadedBytesPerSecond.Stop(); + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + ExistingCompressedChunkPath = std::move(CompressedChunkPath); + } + else + { + std::error_code DummyEc; + RemoveFile(CompressedChunkPath, DummyEc); } - ExistingCompressedChunkPath = std::move(CompressedChunkPath); - } - else - { - std::error_code DummyEc; - RemoveFile(CompressedChunkPath, DummyEc); } } } - } - if (!AbortFlag) + if (!AbortFlag) - { - if (!ExistingCompressedChunkPath.empty()) { - Work.ScheduleWork( - WritePool, - [&Path, - &ZenFolderPath, - &RemoteContent, - &RemoteLookup, - &CacheFolderPath, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &WritePool, - &DiskStats, - &WriteChunkStats, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs, - CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); - - FilteredWrittenBytesPerSecond.Start(); - - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - - IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) + if (!ExistingCompressedChunkPath.empty()) + { + Work.ScheduleWork( + WritePool, + [&Path, + &ZenFolderPath, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs, + CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { + if (!AbortFlag) { - throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", - ChunkHash, - CompressedChunkPath)); - } + ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); - std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); - bool NeedHashVerify = WriteCompressedChunk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkHash, - ChunkTargetPtrs, - std::move(CompressedPart), - DiskStats); - WritePartsComplete++; + FilteredWrittenBytesPerSecond.Start(); - if (!AbortFlag) - { - if (WritePartsComplete == TotalPartWriteCount) + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + + IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) { - FilteredWrittenBytesPerSecond.Stop(); + throw std::runtime_error( + fmt::format("Could not open dowloaded compressed chunk {} from {}", + ChunkHash, + CompressedChunkPath)); } - RemoveFileWithRetry(CompressedChunkPath); + std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); + bool NeedHashVerify = WriteCompressedChunk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkHash, + ChunkTargetPtrs, + std::move(CompressedPart), + DiskStats); + WritePartsComplete++; - std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); - if (NeedHashVerify) - { - VerifyAndCompleteChunkSequencesAsync(TargetFolder, - RemoteContent, - RemoteLookup, - CompletedSequences, - Work, - WritePool); - } - else + if (!AbortFlag) { - FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + TryRemoveFile(CompressedChunkPath); + + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(TargetFolder, + RemoteContent, + RemoteLookup, + CompletedSequences, + Work, + WritePool); + } + else + { + FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + } } } - } - }); - } - else - { - Work.ScheduleWork( - NetworkPool, - [&Path, - &ZenFolderPath, - &Storage, - BuildId, - &PrimeCacheOnly, - &RemoteContent, - &RemoteLookup, - &ExistsResult, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &WritePool, - &NetworkPool, - &DiskStats, - &WriteChunkStats, - &WritePartsComplete, - TotalPartWriteCount, - TotalRequestCount, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - LargeAttachmentSize, - PreferredMultipartChunkSize, - RemoteChunkIndex, - ChunkTargetPtrs, - &DownloadStats](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BuildBlob; - const bool ExistsInCache = - Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); - if (ExistsInCache) - { - BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); - } - if (BuildBlob) + }); + } + else + { + Work.ScheduleWork( + NetworkPool, + [&Path, + &ZenFolderPath, + &Storage, + BuildId, + &PrimeCacheOnly, + &RemoteContent, + &RemoteLookup, + &ExistsResult, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &NetworkPool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + TotalRequestCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + LargeAttachmentSize, + PreferredMultipartChunkSize, + RemoteChunkIndex, + ChunkTargetPtrs, + &DownloadStats](std::atomic<bool>&) mutable { + if (!AbortFlag) { - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BuildBlob; + const bool ExistsInCache = + Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + if (ExistsInCache) { - FilteredDownloadedBytesPerSecond.Stop(); + BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); } - AsyncWriteDownloadedChunk(ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); - } - else - { - if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) + if (BuildBlob) { - ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); - DownloadLargeBlob( - *Storage.BuildStorage, - ZenTempDownloadFolderPath(ZenFolderPath), - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs]( - IoBuffer&& Payload) mutable { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (Payload && Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob( - BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } - if (!PrimeCacheOnly) - { - if (!AbortFlag) - { - AsyncWriteDownloadedChunk(ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(Payload), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); - } - } - }); + uint64_t BlobSize = BuildBlob.GetSize(); + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); } else { - ZEN_TRACE_CPU("UpdateFolder_GetChunk"); - BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash); - if (BuildBlob && Storage.BuildCacheStorage) + if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { - Storage.BuildCacheStorage->PutBuildBlob(BuildId, - ChunkHash, - BuildBlob.GetContentType(), - CompositeBuffer(SharedBuffer(BuildBlob))); - } - if (!BuildBlob) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); + DownloadLargeBlob( + *Storage.BuildStorage, + ZenTempDownloadFolderPath(ZenFolderPath), + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + Work, + NetworkPool, + DownloadStats, + [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs]( + IoBuffer&& Payload) mutable { + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (Payload && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob( + BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); + } + if (!PrimeCacheOnly) + { + if (!AbortFlag) + { + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } + } + }); } - if (!PrimeCacheOnly) + else { - if (!AbortFlag) + ZEN_TRACE_CPU("UpdateFolder_GetChunk"); + BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash); + if (BuildBlob && Storage.BuildCacheStorage) { - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + Storage.BuildCacheStorage->PutBuildBlob( + BuildId, + ChunkHash, + BuildBlob.GetContentType(), + CompositeBuffer(SharedBuffer(BuildBlob))); + } + if (!BuildBlob) + { + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + if (!PrimeCacheOnly) + { + if (!AbortFlag) { - FilteredDownloadedBytesPerSecond.Stop(); + uint64_t BlobSize = BuildBlob.GetSize(); + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); } - AsyncWriteDownloadedChunk(ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); } } } } - } - }); + }); + } } } - } - }); + }); } for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) @@ -6728,8 +6755,11 @@ namespace { RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } + + TryRemoveFile(BlockChunkPath); + WritePartsComplete++; - RemoveFileWithRetry(BlockChunkPath); + if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); @@ -6884,7 +6914,7 @@ namespace { if (!BlockChunkPath.empty()) { - RemoveFileWithRetry(BlockChunkPath); + TryRemoveFile(BlockChunkPath); } WritePartsComplete++; @@ -7059,7 +7089,7 @@ namespace { if (!BlockChunkPath.empty()) { - RemoveFileWithRetry(BlockChunkPath); + TryRemoveFile(BlockChunkPath); } WritePartsComplete++; @@ -9456,6 +9486,22 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) std::string CloudHost; + auto TestCacheEndpoint = [](std::string_view BaseUrl, const bool AssumeHttp2) -> std::pair<bool, std::string> { + HttpClientSettings TestClientSettings{.LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{1000}, + .Timeout = std::chrono::milliseconds{2000}, + .AssumeHttp2 = AssumeHttp2, + .AllowResume = true, + .RetryCount = 0}; + HttpClient TestHttpClient(BaseUrl, TestClientSettings); + HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds"); + if (TestResponse.IsSuccess()) + { + return {true, ""}; + } + return {false, TestResponse.ErrorMessage("")}; + }; + if (!m_Host.empty()) { if (m_OverrideHost.empty() || m_ZenCacheHost.empty()) @@ -9535,22 +9581,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::runtime_error(fmt::format("Host {} could not be reached. Reason: {}", m_OverrideHost, TestResult.second)); } - auto TestCacheEndpoint = [](std::string_view BaseUrl, const bool AssumeHttp2) -> std::pair<bool, std::string> { - HttpClientSettings TestClientSettings{.LogCategory = "httpcacheclient", - .ConnectTimeout = std::chrono::milliseconds{1000}, - .Timeout = std::chrono::milliseconds{2000}, - .AssumeHttp2 = AssumeHttp2, - .AllowResume = true, - .RetryCount = 0}; - HttpClient TestHttpClient(BaseUrl, TestClientSettings); - HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds"); - if (TestResponse.IsSuccess()) - { - return {true, ""}; - } - return {false, TestResponse.ErrorMessage("")}; - }; - if (m_ZenCacheHost.empty()) { CbArrayView CacheEndpointsArray = ResponseObjectView["cacheEndpoints"sv].AsArrayView(); @@ -9680,7 +9710,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { CacheDescription += fmt::format(" Bucket '{}'", m_Bucket); } - Result.CacheName = BuildCacheName; + Result.CacheName = BuildCacheName.empty() ? m_ZenCacheHost : BuildCacheName; } ZEN_CONSOLE("Remote: {}", StorageDescription); if (!Result.CacheName.empty()) diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index ca1b820c9..a6e4d9290 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -86,7 +86,7 @@ namespace detail { } } - std::error_code Open(const std::filesystem::path& TempFolderPath) + std::error_code Open(const std::filesystem::path& TempFolderPath, uint64_t FinalSize) { ZEN_TRACE_CPU("TempPayloadFile::Open"); ZEN_ASSERT(m_FileHandle == nullptr); @@ -128,6 +128,8 @@ namespace detail { #endif // ZEN_PLATFORM_WINDOWS m_FileHandle = FileHandle; + PrepareFileForScatteredWrite(m_FileHandle, FinalSize); + return {}; } @@ -1211,7 +1213,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold if (ContentLength.value() > 1024 * 1024) { PayloadFile = std::make_unique<detail::TempPayloadFile>(); - std::error_code Ec = PayloadFile->Open(TempFolderPath); + std::error_code Ec = PayloadFile->Open(TempFolderPath, ContentLength.value()); if (Ec) { ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index b9a9ca380..f7e63433b 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -25,6 +25,7 @@ #include <zenutil/cache/cacherequests.h> #include <zenutil/cache/rpcrecording.h> #include <zenutil/jupiter/jupiterclient.h> +#include <zenutil/parallelwork.h> #include <zenutil/workerpools.h> #include "upstream/upstreamcache.h" @@ -1585,12 +1586,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co WorkerThreadPool WorkerPool(ThreadCount); uint64_t RequestCount = Replayer.GetRequestCount(); Stopwatch Timer; - auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); - Latch JobLatch(RequestCount); + auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { - WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() { + Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>&) { IoBuffer Body; zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); @@ -1634,16 +1636,15 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } } } - JobLatch.CountDown(); }); } - while (!JobLatch.Wait(10000)) - { + Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted); ZEN_INFO("Replayed {} of {} requests, elapsed {}", - RequestCount - JobLatch.Remaining(), + RequestCount - PendingWork, RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - } + }); } void diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 6a55efdb7..7d22da717 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -22,6 +22,7 @@ #include <zenstore/scrubcontext.h> #include <zenutil/cache/rpcrecording.h> #include <zenutil/openprocesscache.h> +#include <zenutil/parallelwork.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> @@ -1588,17 +1589,14 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo } }; - Latch WorkLatch(1); - + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) { if (OptionalWorkerPool) { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&, Index = OpIndex]() { + Work.ScheduleWork(*OptionalWorkerPool, [&, Index = OpIndex](std::atomic<bool>&) { ZEN_MEMSCOPE(GetProjectstoreTag()); - - auto _ = MakeGuard([&WorkLatch] { WorkLatch.CountDown(); }); ValidateOne(Index); }); } @@ -1607,9 +1605,7 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo ValidateOne(OpIndex); } } - - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); { // Check if we were deleted while we were checking the references without a lock... @@ -2106,8 +2102,9 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } if (OptionalWorkerPool) { - std::atomic_bool Result = true; - Latch WorkLatch(1); + std::atomic_bool Result = true; + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) { @@ -2115,10 +2112,10 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, { break; } - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork( - [this, &WorkLatch, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]( + std::atomic<bool>&) { if (Result.load() == false) { return; @@ -2162,8 +2159,7 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, LargeSizeLimit); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); return Result.load(); } @@ -3886,13 +3882,12 @@ ProjectStore::Flush() } WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (const Ref<Project>& Project : Projects) { - WorkLatch.AddCount(1); - WorkerPool.ScheduleWork([this, &WorkLatch, Project]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) { try { Project->Flush(); @@ -3904,8 +3899,7 @@ ProjectStore::Flush() }); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } void diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index b4891a742..6eb01dfc4 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -9,6 +9,7 @@ #include <zencore/scopeguard.h> #include <zencore/trace.h> #include <zencore/workthreadpool.h> +#include <zenutil/parallelwork.h> #include <zencore/uid.h> #include <zencore/xxhash.h> @@ -22,6 +23,7 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/compress.h> # include <zencore/testing.h> # include <zencore/testutils.h> +# include <zenutil/workerpools.h> #endif // ZEN_WITH_TESTS namespace zen { @@ -480,11 +482,12 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O if (!MetaLocations.empty()) { - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); m_MetadataBlockStore.IterateChunks( MetaLocations, - [this, OptionalWorkerPool, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock, &WorkLatch]( + [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock]( uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) -> bool { ZEN_UNUSED(BlockIndex); @@ -496,40 +499,31 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O { ZEN_ASSERT(OptionalWorkerPool != nullptr); std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - WorkLatch.AddCount(1); - try - { - OptionalWorkerPool->ScheduleWork([this, - &Result, - &MetaLocations, - &MetaLocationResultIndexes, - DoOneBlock, - &WorkLatch, - ChunkIndexes = std::move(TmpChunkIndexes)]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &Result, &MetaLocations, &MetaLocationResultIndexes, DoOneBlock, ChunkIndexes = std::move(TmpChunkIndexes)]( + std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } try { - DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result); + if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result)) + { + AbortFlag.store(true); + } } catch (const std::exception& Ex) { ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); } }); - } - catch (const std::exception& Ex) - { - WorkLatch.CountDown(); - ZEN_ERROR("Failed dispatching async work to fetch metadata for {} chunks. Reason: {}", - ChunkIndexes.size(), - Ex.what()); - } - return true; + return !Work.IsAborted(); } }); - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } for (size_t Index = 0; Index < Result.size(); Index++) { @@ -1661,6 +1655,8 @@ TEST_CASE("BuildStore.Metadata") ScopedTemporaryDirectory _; + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); + BuildStoreConfig Config; Config.RootDirectory = _.Path() / "build_store"; @@ -1678,7 +1674,7 @@ TEST_CASE("BuildStore.Metadata") } Store.PutMetadatas(BlobHashes, MetaPayloads); - std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) { @@ -1689,7 +1685,7 @@ TEST_CASE("BuildStore.Metadata") { GcManager Gc; BuildStore Store(Config, Gc); - std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, nullptr); + std::vector<IoBuffer> ValidateMetaPayloads = Store.GetMetadatas(BlobHashes, &WorkerPool); CHECK(ValidateMetaPayloads.size() == MetaPayloads.size()); for (size_t I = 0; I < ValidateMetaPayloads.size(); I++) { @@ -1715,7 +1711,7 @@ TEST_CASE("BuildStore.Metadata") Store.PutBlob(CompressedBlobsHashes.back(), Payload); } - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); for (const auto& MetadataIt : MetadataPayloads) { CHECK(!MetadataIt); @@ -1741,7 +1737,7 @@ TEST_CASE("BuildStore.Metadata") } Store.PutMetadatas(CompressedBlobsHashes, BlobMetaPayloads); - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { @@ -1754,7 +1750,7 @@ TEST_CASE("BuildStore.Metadata") GcManager Gc; BuildStore Store(Config, Gc); - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { @@ -1783,7 +1779,7 @@ TEST_CASE("BuildStore.Metadata") GcManager Gc; BuildStore Store(Config, Gc); - std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, nullptr); + std::vector<IoBuffer> MetadataPayloads = Store.GetMetadatas(CompressedBlobsHashes, &WorkerPool); CHECK(MetadataPayloads.size() == BlobMetaPayloads.size()); for (size_t I = 0; I < MetadataPayloads.size(); I++) { diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 91bd9cba8..d80da6ea6 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -14,6 +14,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zencore/xxhash.h> +#include <zenutil/parallelwork.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> @@ -3936,14 +3937,13 @@ ZenCacheDiskLayer::DiscoverBuckets() RwLock SyncLock; WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (auto& BucketPath : FoundBucketDirectories) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([this, &WorkLatch, &SyncLock, BucketPath]() { + Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) { ZEN_MEMSCOPE(GetCacheDiskTag()); - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); const std::string BucketName = PathToUtf8(BucketPath.stem()); try { @@ -3984,8 +3984,7 @@ ZenCacheDiskLayer::DiscoverBuckets() } }); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } bool @@ -4062,16 +4061,15 @@ ZenCacheDiskLayer::Flush() } { WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); try { for (auto& Bucket : Buckets) { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&WorkLatch, Bucket]() { + Work.ScheduleWork(Pool, [Bucket](std::atomic<bool>&) { ZEN_MEMSCOPE(GetCacheDiskTag()); - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); try { Bucket->Flush(); @@ -4087,11 +4085,8 @@ ZenCacheDiskLayer::Flush() { ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } - WorkLatch.CountDown(); - while (!WorkLatch.Wait(1000)) - { - ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", WorkLatch.Remaining(), m_RootDir); - } + Work.Wait(1000, + [&](std::ptrdiff_t Remaining, bool) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", Remaining, m_RootDir); }); } } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 8cf241e34..15bea272b 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -15,6 +15,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenstore/scrubcontext.h> +#include <zenutil/parallelwork.h> #include <gsl/gsl-lite.hpp> @@ -393,64 +394,75 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas LargeSizeLimit); }; - Latch WorkLatch(1); - std::atomic_bool AsyncContinue = true; - bool Continue = m_BlockStore.IterateChunks( - FoundChunkLocations, - [this, - &AsyncContinue, - &WorkLatch, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - &FoundChunkIndexes, - &FoundChunkLocations, - OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { - if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) - { - std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([this, - &AsyncContinue, - &WorkLatch, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - BlockIndex, - &FoundChunkIndexes, - &FoundChunkLocations, - ChunkIndexes = std::move(TmpChunkIndexes)]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (!AsyncContinue) - { - return; - } - try - { - bool Continue = DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - if (!Continue) - { - AsyncContinue.store(false); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", - m_RootDirectory, - BlockIndex, - Ex.what()); - } - }); - return AsyncContinue.load(); - } - else - { - return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - } - }); - WorkLatch.CountDown(); - WorkLatch.Wait(); - return AsyncContinue.load() && Continue; + std::atomic<bool> AsyncContinue = true; + { + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); + const bool Continue = m_BlockStore.IterateChunks( + FoundChunkLocations, + [this, + &Work, + &AsyncContinue, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + &FoundChunkIndexes, + &FoundChunkLocations, + OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { + if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) + { + std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, + &AsyncContinue, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + BlockIndex, + &FoundChunkIndexes, + &FoundChunkLocations, + ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + AsyncContinue.store(false); + } + if (!AsyncContinue) + { + return; + } + try + { + bool Continue = + DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); + if (!Continue) + { + AsyncContinue.store(false); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", + m_RootDirectory, + BlockIndex, + Ex.what()); + AsyncContinue.store(false); + } + }); + return AsyncContinue.load(); + } + else + { + return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); + } + }); + if (!Continue) + { + AsyncContinue.store(false); + } + Work.Wait(); + } + return AsyncContinue.load(); } void diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 4911bffb9..6354edf70 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -20,6 +20,7 @@ #include <zencore/workthreadpool.h> #include <zenstore/gc.h> #include <zenstore/scrubcontext.h> +#include <zenutil/parallelwork.h> #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> @@ -632,10 +633,11 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, } } } - std::atomic_bool Continue = true; + std::atomic<bool> AsyncContinue = true; if (!FoundChunkIndexes.empty()) { - auto ProcessOne = [this, &ChunkHashes, &Continue, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) { + auto ProcessOne = [this, &ChunkHashes, &AsyncCallback](size_t ChunkIndex, uint64_t ExpectedSize) { + ZEN_ASSERT(ChunkIndex < ChunkHashes.size()); const IoHash& ChunkHash = ChunkHashes[ChunkIndex]; IoBuffer Payload = SafeOpenChunk(ChunkHash, ExpectedSize); if (!AsyncCallback(ChunkIndex, std::move(Payload))) @@ -645,49 +647,57 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, return true; }; - Latch WorkLatch(1); + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { - size_t ChunkIndex = FoundChunkIndexes[Index]; - uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; - if (!Continue) + if (!AsyncContinue) { break; } + size_t ChunkIndex = FoundChunkIndexes[Index]; + uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; if (OptionalWorkerPool) { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([this, &WorkLatch, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &Continue]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (!Continue) - { - return; - } - try - { - if (!ProcessOne(ChunkIndex, ExpectedSize)) + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) { + if (AbortFlag) { - Continue = false; + AsyncContinue.store(false); } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", - m_RootDirectory, - ChunkHashes[ChunkIndex], - Ex.what()); - } - }); + if (!AsyncContinue) + { + return; + } + try + { + if (!ProcessOne(ChunkIndex, ExpectedSize)) + { + AsyncContinue.store(false); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", + m_RootDirectory, + ChunkHashes[ChunkIndex], + Ex.what()); + AsyncContinue.store(false); + } + }); } else { - Continue = Continue && ProcessOne(ChunkIndex, ExpectedSize); + if (!ProcessOne(ChunkIndex, ExpectedSize)) + { + AsyncContinue.store(false); + } } } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } - return Continue; + return AsyncContinue.load(); } void diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index 516d70e28..91591375a 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -30,7 +30,6 @@ ParallelWork::~ParallelWork() "to complete"); m_PendingWork.CountDown(); } - m_AbortFlag.store(true); m_PendingWork.Wait(); ZEN_ASSERT(m_PendingWork.Remaining() == 0); } |