diff options
| author | Dan Engelbrecht <[email protected]> | 2025-06-16 13:17:54 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-06-16 13:17:54 +0200 |
| commit | d000167e12c6dde651ef86be9f67552291ff1b7d (patch) | |
| tree | 17fb42c4c7d61b3064c33d6aa6f8787bef329586 /src | |
| parent | fix build store range check (#437) (diff) | |
| download | zen-d000167e12c6dde651ef86be9f67552291ff1b7d.tar.xz zen-d000167e12c6dde651ef86be9f67552291ff1b7d.zip | |
graceful wait in parallelwork destructor (#438)
* exception safety when issuing ParallelWork
* add asserts to Latch usage to catch usage errors
* extended error messaging and recovery handling in ParallelWork destructor to help find issues
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/thread.h | 4 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 11 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 171 | ||||
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 82 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 80 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 137 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 86 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallelwork.h | 3 | ||||
| -rw-r--r-- | src/zenutil/parallelwork.cpp | 29 |
9 files changed, 359 insertions, 244 deletions
diff --git a/src/zencore/include/zencore/thread.h b/src/zencore/include/zencore/thread.h index 8fb781571..d9fb5c023 100644 --- a/src/zencore/include/zencore/thread.h +++ b/src/zencore/include/zencore/thread.h @@ -183,6 +183,7 @@ public: void CountDown() { std::ptrdiff_t Old = Counter.fetch_sub(1); + ZEN_ASSERT(Old > 0); if (Old == 1) { Complete.Set(); @@ -197,8 +198,7 @@ public: void AddCount(std::ptrdiff_t Count) { std::atomic_ptrdiff_t Old = Counter.fetch_add(Count); - ZEN_UNUSED(Old); - ZEN_ASSERT_SLOW(Old > 0); + ZEN_ASSERT(Old > 0); } bool Wait(int TimeoutMs = -1) diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 9f2e826d6..bb0c55618 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1593,10 +1593,19 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { - Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>&) { + if (AbortFlag) + { + break; + } + Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>& AbortFlag) { IoBuffer Body; zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); + if (AbortFlag) + { + return; + } + if (Body) { uint32_t AcceptMagic = 0; diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 6359b9db9..53e687983 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1599,20 +1599,36 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) + try { - if (OptionalWorkerPool) - { - Work.ScheduleWork(*OptionalWorkerPool, [&, Index = OpIndex](std::atomic<bool>&) { - ZEN_MEMSCOPE(GetProjectstoreTag()); - ValidateOne(Index); - }); - } - else + for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) { - ValidateOne(OpIndex); + if (AbortFlag) + { + break; + } + if (OptionalWorkerPool) + { + Work.ScheduleWork(*OptionalWorkerPool, [&ValidateOne, Index = OpIndex](std::atomic<bool>& AbortFlag) { + ZEN_MEMSCOPE(GetProjectstoreTag()); + if (AbortFlag) + { + return; + } + ValidateOne(Index); + }); + } + else + { + ValidateOne(OpIndex); + } } } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed validating oplogs in {}. Reason: '{}'", m_BasePath, Ex.what()); + } Work.Wait(); { @@ -2110,67 +2126,81 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } if (OptionalWorkerPool) { - std::atomic_bool Result = true; std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - - for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) + try { - if (Result.load() == false) + for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) { - break; - } - Work.ScheduleWork( - *OptionalWorkerPool, - [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]( - std::atomic<bool>&) { - if (Result.load() == false) - { - return; - } - size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; - const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; - try - { - IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); - if (!Payload) + if (AbortFlag) + { + break; + } + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback]( + std::atomic<bool>& AbortFlag) { + if (AbortFlag) { - ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath); + return; } + size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; + const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; + try + { + IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); + if (!Payload) + { + ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath); + } - if (!AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) + if (!AsyncCallback(FileChunkIndex, + Payload, + IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) + { + AbortFlag.store(true); + } + } + catch (const std::exception& Ex) { - Result.store(false); + ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", + m_OuterProject->Identifier, + m_OplogId, + FileChunkIndex, + FilePath, + Ex.what()); } - } - catch (const std::exception& Ex) - { - ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", - m_OuterProject->Identifier, - m_OplogId, - FileChunkIndex, - FilePath, - Ex.what()); - } - }); - } + }); + } - if (!CidChunkHashes.empty()) + if (!CidChunkHashes.empty() && !AbortFlag) + { + m_CidStore.IterateChunks( + CidChunkHashes, + [&](size_t Index, const IoBuffer& Payload) { + size_t CidChunkIndex = CidChunkIndexes[Index]; + if (AbortFlag) + { + return false; + } + return AsyncCallback(CidChunkIndex, + Payload, + IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); + }, + OptionalWorkerPool, + LargeSizeLimit); + } + } + catch (const std::exception& Ex) { - m_CidStore.IterateChunks( - CidChunkHashes, - [&](size_t Index, const IoBuffer& Payload) { - size_t CidChunkIndex = CidChunkIndexes[Index]; - return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); - }, - OptionalWorkerPool, - LargeSizeLimit); + AbortFlag.store(true); + ZEN_WARN("Failed iterating oplog chunks in {}. Reason: '{}'", m_BasePath, Ex.what()); } Work.Wait(); - return Result.load(); + return !AbortFlag; } else { @@ -3894,19 +3924,26 @@ ProjectStore::Flush() std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - - for (const Ref<Project>& Project : Projects) + try { - Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) { - try - { - Project->Flush(); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); - } - }); + for (const Ref<Project>& Project : Projects) + { + Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) { + try + { + Project->Flush(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); + } + }); + } + } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed projects in {}. Reason: '{}'", m_ProjectBasePath, Ex.what()); } Work.Wait(); diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index c25f762f5..20dc55bca 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -528,43 +528,55 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - m_MetadataBlockStore.IterateChunks( - MetaLocations, - [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock]( - uint32_t BlockIndex, - std::span<const size_t> ChunkIndexes) -> bool { - ZEN_UNUSED(BlockIndex); - if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1) - { - return DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result); - } - else - { - ZEN_ASSERT(OptionalWorkerPool != nullptr); - std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - Work.ScheduleWork( - *OptionalWorkerPool, - [this, &Result, &MetaLocations, &MetaLocationResultIndexes, DoOneBlock, ChunkIndexes = std::move(TmpChunkIndexes)]( - std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - return; - } - try - { - if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result)) + try + { + m_MetadataBlockStore.IterateChunks( + MetaLocations, + [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock]( + uint32_t BlockIndex, + std::span<const size_t> ChunkIndexes) -> bool { + ZEN_UNUSED(BlockIndex); + if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1) + { + return DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result); + } + else + { + ZEN_ASSERT(OptionalWorkerPool != nullptr); + std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, + &Result, + &MetaLocations, + &MetaLocationResultIndexes, + DoOneBlock, + ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) { - AbortFlag.store(true); + return; } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); - } - }); - return !Work.IsAborted(); - } - }); + try + { + if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result)) + { + AbortFlag.store(true); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what()); + } + }); + return !Work.IsAborted(); + } + }); + } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed iterating block metadata chunks in {}. Reason: '{}'", m_Config.RootDirectory, Ex.what()); + } Work.Wait(); } diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 0ee70890c..0d2aef612 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -4036,50 +4036,58 @@ ZenCacheDiskLayer::DiscoverBuckets() std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - for (auto& BucketPath : FoundBucketDirectories) + try { - Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) { - ZEN_MEMSCOPE(GetCacheDiskTag()); - - const std::string BucketName = PathToUtf8(BucketPath.stem()); - try - { - BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig; - if (auto It = m_Configuration.BucketConfigMap.find_as(std::string_view(BucketName), - std::hash<std::string_view>(), - eastl::equal_to_2<std::string, std::string_view>()); - It != m_Configuration.BucketConfigMap.end()) - { - BucketConfig = &It->second; - } - - std::unique_ptr<CacheBucket> NewBucket = - std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, *BucketConfig); + for (auto& BucketPath : FoundBucketDirectories) + { + Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) { + ZEN_MEMSCOPE(GetCacheDiskTag()); - CacheBucket* Bucket = nullptr; + const std::string BucketName = PathToUtf8(BucketPath.stem()); + try { - RwLock::ExclusiveLockScope __(SyncLock); - auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); - Bucket = InsertResult.first->second.get(); - } - ZEN_ASSERT(Bucket); + BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig; + if (auto It = m_Configuration.BucketConfigMap.find_as(std::string_view(BucketName), + std::hash<std::string_view>(), + eastl::equal_to_2<std::string, std::string_view>()); + It != m_Configuration.BucketConfigMap.end()) + { + BucketConfig = &It->second; + } - if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + std::unique_ptr<CacheBucket> NewBucket = + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, *BucketConfig); + CacheBucket* Bucket = nullptr; { RwLock::ExclusiveLockScope __(SyncLock); - m_Buckets.erase(BucketName); + auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); + Bucket = InsertResult.first->second.get(); + } + ZEN_ASSERT(Bucket); + + if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + + { + RwLock::ExclusiveLockScope __(SyncLock); + m_Buckets.erase(BucketName); + } } } - } - catch (const std::exception& Err) - { - ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } - }); + catch (const std::exception& Err) + { + ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + return; + } + }); + } + } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed discovering buckets in {}. Reason: '{}'", m_RootDir, Ex.what()); } Work.Wait(); } @@ -4220,8 +4228,10 @@ ZenCacheDiskLayer::Flush() } catch (const std::exception& Ex) { + AbortFlag.store(true); ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } + Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t RemainingWork) { ZEN_UNUSED(IsAborted, IsPaused); ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", RemainingWork, m_RootDir); diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 2ab5752ff..b00abb2cb 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -396,89 +396,96 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas return m_BlockStore.IterateBlock( FoundChunkLocations, ChunkIndexes, - [AsyncCallback, FoundChunkIndexes, LargeSizeLimit](size_t ChunkIndex, const void* Data, uint64_t Size) { + [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) { if (Data == nullptr) { return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer()); } return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size)); }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size)); }, LargeSizeLimit); }; - std::atomic<bool> AsyncContinue = true; + std::atomic<bool> AbortFlag; { - std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - const bool Continue = m_BlockStore.IterateChunks( - FoundChunkLocations, - [this, - &Work, - &AsyncContinue, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - &FoundChunkIndexes, - &FoundChunkLocations, - OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { - if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) - { - std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); - Work.ScheduleWork( - *OptionalWorkerPool, - [this, - &AsyncContinue, - &AsyncCallback, - LargeSizeLimit, - DoOneBlock, - BlockIndex, - &FoundChunkIndexes, - &FoundChunkLocations, - ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - AsyncContinue.store(false); - } - if (!AsyncContinue) - { - return; - } - try - { - bool Continue = - DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - if (!Continue) - { - AsyncContinue.store(false); - } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", - m_RootDirectory, - BlockIndex, - Ex.what()); - AsyncContinue.store(false); - } - }); - return AsyncContinue.load(); - } - else - { - return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); - } - }); - if (!Continue) + try { - AsyncContinue.store(false); + const bool Continue = m_BlockStore.IterateChunks( + FoundChunkLocations, + [this, + &Work, + &AbortFlag, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + &FoundChunkIndexes, + &FoundChunkLocations, + OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { + if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) + { + std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end()); + Work.ScheduleWork( + *OptionalWorkerPool, + [this, + &AsyncCallback, + LargeSizeLimit, + DoOneBlock, + BlockIndex, + &FoundChunkIndexes, + &FoundChunkLocations, + ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) { + if (AbortFlag) + { + return; + } + try + { + bool Continue = + DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes); + if (!Continue) + { + AbortFlag.store(true); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'", + m_RootDirectory, + BlockIndex, + Ex.what()); + AbortFlag.store(true); + } + }); + return !AbortFlag.load(); + } + else + { + if (!DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes)) + { + AbortFlag.store(true); + } + return !AbortFlag.load(); + } + }); + if (!Continue) + { + AbortFlag.store(true); + } } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed iterating chunks for cas root path {}. Reason: '{}'", m_RootDirectory, Ex.what()); + } + Work.Wait(); } - return AsyncContinue.load(); + return !AbortFlag.load(); } void diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 11a266f1c..68644be2d 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -666,52 +666,64 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) + try { - if (!AsyncContinue) + for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { - break; - } - size_t ChunkIndex = FoundChunkIndexes[Index]; - uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; - if (OptionalWorkerPool) - { - Work.ScheduleWork( - *OptionalWorkerPool, - [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) { - if (AbortFlag) - { - AsyncContinue.store(false); - } - if (!AsyncContinue) - { - return; - } - try - { - if (!ProcessOne(ChunkIndex, ExpectedSize)) + if (AbortFlag) + { + AsyncContinue.store(false); + } + if (!AsyncContinue) + { + break; + } + size_t ChunkIndex = FoundChunkIndexes[Index]; + uint64_t ExpectedSize = FoundChunkExpectedSizes[Index]; + if (OptionalWorkerPool) + { + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) { + if (AbortFlag) { AsyncContinue.store(false); } - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", - m_RootDirectory, - ChunkHashes[ChunkIndex], - Ex.what()); - AsyncContinue.store(false); - } - }); - } - else - { - if (!ProcessOne(ChunkIndex, ExpectedSize)) + if (!AsyncContinue) + { + return; + } + try + { + if (!ProcessOne(ChunkIndex, ExpectedSize)) + { + AsyncContinue.store(false); + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'", + m_RootDirectory, + ChunkHashes[ChunkIndex], + Ex.what()); + AsyncContinue.store(false); + } + }); + } + else { - AsyncContinue.store(false); + if (!ProcessOne(ChunkIndex, ExpectedSize)) + { + AsyncContinue.store(false); + } } } } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed iterating chunks in {}. Reason: '{}'", this->m_RootDirectory, Ex.what()); + } Work.Wait(); } return AsyncContinue.load(); diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h index d7e986551..639c6968c 100644 --- a/src/zenutil/include/zenutil/parallelwork.h +++ b/src/zenutil/include/zenutil/parallelwork.h @@ -2,6 +2,7 @@ #pragma once +#include <zencore/scopeguard.h> #include <zencore/thread.h> #include <zencore/workthreadpool.h> @@ -26,6 +27,7 @@ public: try { WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { + auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); }); try { while (m_PauseFlag && !m_AbortFlag) @@ -38,7 +40,6 @@ public: { OnError(std::current_exception(), m_AbortFlag); } - m_PendingWork.CountDown(); }); } catch (const std::exception&) diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index 67fc03c04..aa806438b 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -2,6 +2,7 @@ #include <zenutil/parallelwork.h> +#include <zencore/callstack.h> #include <zencore/except.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> @@ -34,7 +35,33 @@ ParallelWork::~ParallelWork() m_PendingWork.CountDown(); } m_PendingWork.Wait(); - ZEN_ASSERT(m_PendingWork.Remaining() == 0); + ptrdiff_t RemainingWork = m_PendingWork.Remaining(); + if (RemainingWork != 0) + { + void* Frames[8]; + uint32_t FrameCount = GetCallstack(2, 8, Frames); + CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames); + ZEN_ERROR("ParallelWork destructor waited for outstanding work but pending work count is {} instead of 0\n{}", + RemainingWork, + CallstackToString(Callstack, " ")); + FreeCallstack(Callstack); + + uint32_t WaitedMs = 0; + while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000) + { + Sleep(50); + WaitedMs += 50; + } + RemainingWork = m_PendingWork.Remaining(); + if (RemainingWork != 0) + { + ZEN_WARN("ParallelWork destructor safety wait failed, pending work count at {}", RemainingWork) + } + else + { + ZEN_INFO("ParallelWork destructor safety wait succeeded"); + } + } } catch (const std::exception& Ex) { |