aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-16 13:17:54 +0200
committerGitHub Enterprise <[email protected]>2025-06-16 13:17:54 +0200
commitd000167e12c6dde651ef86be9f67552291ff1b7d (patch)
tree17fb42c4c7d61b3064c33d6aa6f8787bef329586 /src
parentfix build store range check (#437) (diff)
downloadzen-d000167e12c6dde651ef86be9f67552291ff1b7d.tar.xz
zen-d000167e12c6dde651ef86be9f67552291ff1b7d.zip
graceful wait in parallelwork destructor (#438)
* exception safety when issuing ParallelWork * add asserts to Latch usage to catch usage errors * extended error messaging and recovery handling in ParallelWork destructor to help find issues
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/thread.h4
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp11
-rw-r--r--src/zenserver/projectstore/projectstore.cpp171
-rw-r--r--src/zenstore/buildstore/buildstore.cpp82
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp80
-rw-r--r--src/zenstore/compactcas.cpp137
-rw-r--r--src/zenstore/filecas.cpp86
-rw-r--r--src/zenutil/include/zenutil/parallelwork.h3
-rw-r--r--src/zenutil/parallelwork.cpp29
9 files changed, 359 insertions, 244 deletions
diff --git a/src/zencore/include/zencore/thread.h b/src/zencore/include/zencore/thread.h
index 8fb781571..d9fb5c023 100644
--- a/src/zencore/include/zencore/thread.h
+++ b/src/zencore/include/zencore/thread.h
@@ -183,6 +183,7 @@ public:
void CountDown()
{
std::ptrdiff_t Old = Counter.fetch_sub(1);
+ ZEN_ASSERT(Old > 0);
if (Old == 1)
{
Complete.Set();
@@ -197,8 +198,7 @@ public:
void AddCount(std::ptrdiff_t Count)
{
std::atomic_ptrdiff_t Old = Counter.fetch_add(Count);
- ZEN_UNUSED(Old);
- ZEN_ASSERT_SLOW(Old > 0);
+ ZEN_ASSERT(Old > 0);
}
bool Wait(int TimeoutMs = -1)
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 9f2e826d6..bb0c55618 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -1593,10 +1593,19 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
ZEN_INFO("Replaying {} requests", RequestCount);
for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
{
- Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>&) {
+ if (AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>& AbortFlag) {
IoBuffer Body;
zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body);
+ if (AbortFlag)
+ {
+ return;
+ }
+
if (Body)
{
uint32_t AcceptMagic = 0;
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 6359b9db9..53e687983 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1599,20 +1599,36 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
- for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
+ try
{
- if (OptionalWorkerPool)
- {
- Work.ScheduleWork(*OptionalWorkerPool, [&, Index = OpIndex](std::atomic<bool>&) {
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ValidateOne(Index);
- });
- }
- else
+ for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
{
- ValidateOne(OpIndex);
+ if (AbortFlag)
+ {
+ break;
+ }
+ if (OptionalWorkerPool)
+ {
+ Work.ScheduleWork(*OptionalWorkerPool, [&ValidateOne, Index = OpIndex](std::atomic<bool>& AbortFlag) {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ if (AbortFlag)
+ {
+ return;
+ }
+ ValidateOne(Index);
+ });
+ }
+ else
+ {
+ ValidateOne(OpIndex);
+ }
}
}
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed validating oplogs in {}. Reason: '{}'", m_BasePath, Ex.what());
+ }
Work.Wait();
{
@@ -2110,67 +2126,81 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
}
if (OptionalWorkerPool)
{
- std::atomic_bool Result = true;
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
-
- for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
+ try
{
- if (Result.load() == false)
+ for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
{
- break;
- }
- Work.ScheduleWork(
- *OptionalWorkerPool,
- [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result](
- std::atomic<bool>&) {
- if (Result.load() == false)
- {
- return;
- }
- size_t FileChunkIndex = FileChunkIndexes[ChunkIndex];
- const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex];
- try
- {
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
- if (!Payload)
+ if (AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback](
+ std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
- ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath);
+ return;
}
+ size_t FileChunkIndex = FileChunkIndexes[ChunkIndex];
+ const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex];
+ try
+ {
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
+ if (!Payload)
+ {
+ ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath);
+ }
- if (!AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0))
+ if (!AsyncCallback(FileChunkIndex,
+ Payload,
+ IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0))
+ {
+ AbortFlag.store(true);
+ }
+ }
+ catch (const std::exception& Ex)
{
- Result.store(false);
+ ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ FileChunkIndex,
+ FilePath,
+ Ex.what());
}
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'",
- m_OuterProject->Identifier,
- m_OplogId,
- FileChunkIndex,
- FilePath,
- Ex.what());
- }
- });
- }
+ });
+ }
- if (!CidChunkHashes.empty())
+ if (!CidChunkHashes.empty() && !AbortFlag)
+ {
+ m_CidStore.IterateChunks(
+ CidChunkHashes,
+ [&](size_t Index, const IoBuffer& Payload) {
+ size_t CidChunkIndex = CidChunkIndexes[Index];
+ if (AbortFlag)
+ {
+ return false;
+ }
+ return AsyncCallback(CidChunkIndex,
+ Payload,
+ IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0);
+ },
+ OptionalWorkerPool,
+ LargeSizeLimit);
+ }
+ }
+ catch (const std::exception& Ex)
{
- m_CidStore.IterateChunks(
- CidChunkHashes,
- [&](size_t Index, const IoBuffer& Payload) {
- size_t CidChunkIndex = CidChunkIndexes[Index];
- return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0);
- },
- OptionalWorkerPool,
- LargeSizeLimit);
+ AbortFlag.store(true);
+ ZEN_WARN("Failed iterating oplog chunks in {}. Reason: '{}'", m_BasePath, Ex.what());
}
Work.Wait();
- return Result.load();
+ return !AbortFlag;
}
else
{
@@ -3894,19 +3924,26 @@ ProjectStore::Flush()
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
-
- for (const Ref<Project>& Project : Projects)
+ try
{
- Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) {
- try
- {
- Project->Flush();
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what());
- }
- });
+ for (const Ref<Project>& Project : Projects)
+ {
+ Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) {
+ try
+ {
+ Project->Flush();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what());
+ }
+ });
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed projects in {}. Reason: '{}'", m_ProjectBasePath, Ex.what());
}
Work.Wait();
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index c25f762f5..20dc55bca 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -528,43 +528,55 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
- m_MetadataBlockStore.IterateChunks(
- MetaLocations,
- [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock](
- uint32_t BlockIndex,
- std::span<const size_t> ChunkIndexes) -> bool {
- ZEN_UNUSED(BlockIndex);
- if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1)
- {
- return DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result);
- }
- else
- {
- ZEN_ASSERT(OptionalWorkerPool != nullptr);
- std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- Work.ScheduleWork(
- *OptionalWorkerPool,
- [this, &Result, &MetaLocations, &MetaLocationResultIndexes, DoOneBlock, ChunkIndexes = std::move(TmpChunkIndexes)](
- std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- return;
- }
- try
- {
- if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result))
+ try
+ {
+ m_MetadataBlockStore.IterateChunks(
+ MetaLocations,
+ [this, OptionalWorkerPool, &Work, &Result, &MetaLocations, &MetaLocationResultIndexes, &ReferencedBlocks, DoOneBlock](
+ uint32_t BlockIndex,
+ std::span<const size_t> ChunkIndexes) -> bool {
+ ZEN_UNUSED(BlockIndex);
+ if (ChunkIndexes.size() == MetaLocations.size() || OptionalWorkerPool == nullptr || ReferencedBlocks.size() == 1)
+ {
+ return DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result);
+ }
+ else
+ {
+ ZEN_ASSERT(OptionalWorkerPool != nullptr);
+ std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this,
+ &Result,
+ &MetaLocations,
+ &MetaLocationResultIndexes,
+ DoOneBlock,
+ ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
- AbortFlag.store(true);
+ return;
}
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
- }
- });
- return !Work.IsAborted();
- }
- });
+ try
+ {
+ if (!DoOneBlock(MetaLocations, MetaLocationResultIndexes, ChunkIndexes, Result))
+ {
+ AbortFlag.store(true);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed getting metadata for {} chunks. Reason: {}", ChunkIndexes.size(), Ex.what());
+ }
+ });
+ return !Work.IsAborted();
+ }
+ });
+ }
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed iterating block metadata chunks in {}. Reason: '{}'", m_Config.RootDirectory, Ex.what());
+ }
Work.Wait();
}
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 0ee70890c..0d2aef612 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -4036,50 +4036,58 @@ ZenCacheDiskLayer::DiscoverBuckets()
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
- for (auto& BucketPath : FoundBucketDirectories)
+ try
{
- Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) {
- ZEN_MEMSCOPE(GetCacheDiskTag());
-
- const std::string BucketName = PathToUtf8(BucketPath.stem());
- try
- {
- BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig;
- if (auto It = m_Configuration.BucketConfigMap.find_as(std::string_view(BucketName),
- std::hash<std::string_view>(),
- eastl::equal_to_2<std::string, std::string_view>());
- It != m_Configuration.BucketConfigMap.end())
- {
- BucketConfig = &It->second;
- }
-
- std::unique_ptr<CacheBucket> NewBucket =
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, *BucketConfig);
+ for (auto& BucketPath : FoundBucketDirectories)
+ {
+ Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) {
+ ZEN_MEMSCOPE(GetCacheDiskTag());
- CacheBucket* Bucket = nullptr;
+ const std::string BucketName = PathToUtf8(BucketPath.stem());
+ try
{
- RwLock::ExclusiveLockScope __(SyncLock);
- auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
- Bucket = InsertResult.first->second.get();
- }
- ZEN_ASSERT(Bucket);
+ BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig;
+ if (auto It = m_Configuration.BucketConfigMap.find_as(std::string_view(BucketName),
+ std::hash<std::string_view>(),
+ eastl::equal_to_2<std::string, std::string_view>());
+ It != m_Configuration.BucketConfigMap.end())
+ {
+ BucketConfig = &It->second;
+ }
- if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ std::unique_ptr<CacheBucket> NewBucket =
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, *BucketConfig);
+ CacheBucket* Bucket = nullptr;
{
RwLock::ExclusiveLockScope __(SyncLock);
- m_Buckets.erase(BucketName);
+ auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
+ Bucket = InsertResult.first->second.get();
+ }
+ ZEN_ASSERT(Bucket);
+
+ if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+
+ {
+ RwLock::ExclusiveLockScope __(SyncLock);
+ m_Buckets.erase(BucketName);
+ }
}
}
- }
- catch (const std::exception& Err)
- {
- ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- return;
- }
- });
+ catch (const std::exception& Err)
+ {
+ ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ return;
+ }
+ });
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed discovering buckets in {}. Reason: '{}'", m_RootDir, Ex.what());
}
Work.Wait();
}
@@ -4220,8 +4228,10 @@ ZenCacheDiskLayer::Flush()
}
catch (const std::exception& Ex)
{
+ AbortFlag.store(true);
ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what());
}
+
Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t RemainingWork) {
ZEN_UNUSED(IsAborted, IsPaused);
ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", RemainingWork, m_RootDir);
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 2ab5752ff..b00abb2cb 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -396,89 +396,96 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
return m_BlockStore.IterateBlock(
FoundChunkLocations,
ChunkIndexes,
- [AsyncCallback, FoundChunkIndexes, LargeSizeLimit](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, const void* Data, uint64_t Size) {
if (Data == nullptr)
{
return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer());
}
return AsyncCallback(FoundChunkIndexes[ChunkIndex], IoBuffer(IoBuffer::Wrap, Data, Size));
},
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ [AsyncCallback, FoundChunkIndexes](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
return AsyncCallback(FoundChunkIndexes[ChunkIndex], File.GetChunk(Offset, Size));
},
LargeSizeLimit);
};
- std::atomic<bool> AsyncContinue = true;
+ std::atomic<bool> AbortFlag;
{
- std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
- const bool Continue = m_BlockStore.IterateChunks(
- FoundChunkLocations,
- [this,
- &Work,
- &AsyncContinue,
- &AsyncCallback,
- LargeSizeLimit,
- DoOneBlock,
- &FoundChunkIndexes,
- &FoundChunkLocations,
- OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
- if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
- {
- std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
- Work.ScheduleWork(
- *OptionalWorkerPool,
- [this,
- &AsyncContinue,
- &AsyncCallback,
- LargeSizeLimit,
- DoOneBlock,
- BlockIndex,
- &FoundChunkIndexes,
- &FoundChunkLocations,
- ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- AsyncContinue.store(false);
- }
- if (!AsyncContinue)
- {
- return;
- }
- try
- {
- bool Continue =
- DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
- if (!Continue)
- {
- AsyncContinue.store(false);
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
- m_RootDirectory,
- BlockIndex,
- Ex.what());
- AsyncContinue.store(false);
- }
- });
- return AsyncContinue.load();
- }
- else
- {
- return DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
- }
- });
- if (!Continue)
+ try
{
- AsyncContinue.store(false);
+ const bool Continue = m_BlockStore.IterateChunks(
+ FoundChunkLocations,
+ [this,
+ &Work,
+ &AbortFlag,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ OptionalWorkerPool](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) {
+ if (OptionalWorkerPool && (ChunkIndexes.size() > 3))
+ {
+ std::vector<size_t> TmpChunkIndexes(ChunkIndexes.begin(), ChunkIndexes.end());
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this,
+ &AsyncCallback,
+ LargeSizeLimit,
+ DoOneBlock,
+ BlockIndex,
+ &FoundChunkIndexes,
+ &FoundChunkLocations,
+ ChunkIndexes = std::move(TmpChunkIndexes)](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
+ {
+ return;
+ }
+ try
+ {
+ bool Continue =
+ DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes);
+ if (!Continue)
+ {
+ AbortFlag.store(true);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, block {}. Reason: '{}'",
+ m_RootDirectory,
+ BlockIndex,
+ Ex.what());
+ AbortFlag.store(true);
+ }
+ });
+ return !AbortFlag.load();
+ }
+ else
+ {
+ if (!DoOneBlock(AsyncCallback, LargeSizeLimit, FoundChunkIndexes, FoundChunkLocations, ChunkIndexes))
+ {
+ AbortFlag.store(true);
+ }
+ return !AbortFlag.load();
+ }
+ });
+ if (!Continue)
+ {
+ AbortFlag.store(true);
+ }
}
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed iterating chunks for cas root path {}. Reason: '{}'", m_RootDirectory, Ex.what());
+ }
+
Work.Wait();
}
- return AsyncContinue.load();
+ return !AbortFlag.load();
}
void
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 11a266f1c..68644be2d 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -666,52 +666,64 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
- for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
+ try
{
- if (!AsyncContinue)
+ for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
{
- break;
- }
- size_t ChunkIndex = FoundChunkIndexes[Index];
- uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
- if (OptionalWorkerPool)
- {
- Work.ScheduleWork(
- *OptionalWorkerPool,
- [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) {
- if (AbortFlag)
- {
- AsyncContinue.store(false);
- }
- if (!AsyncContinue)
- {
- return;
- }
- try
- {
- if (!ProcessOne(ChunkIndex, ExpectedSize))
+ if (AbortFlag)
+ {
+ AsyncContinue.store(false);
+ }
+ if (!AsyncContinue)
+ {
+ break;
+ }
+ size_t ChunkIndex = FoundChunkIndexes[Index];
+ uint64_t ExpectedSize = FoundChunkExpectedSizes[Index];
+ if (OptionalWorkerPool)
+ {
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &ProcessOne, &ChunkHashes, ChunkIndex, ExpectedSize, &AsyncContinue](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
AsyncContinue.store(false);
}
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
- m_RootDirectory,
- ChunkHashes[ChunkIndex],
- Ex.what());
- AsyncContinue.store(false);
- }
- });
- }
- else
- {
- if (!ProcessOne(ChunkIndex, ExpectedSize))
+ if (!AsyncContinue)
+ {
+ return;
+ }
+ try
+ {
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
+ {
+ AsyncContinue.store(false);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed iterating chunks for cas root path {}, chunk {}. Reason: '{}'",
+ m_RootDirectory,
+ ChunkHashes[ChunkIndex],
+ Ex.what());
+ AsyncContinue.store(false);
+ }
+ });
+ }
+ else
{
- AsyncContinue.store(false);
+ if (!ProcessOne(ChunkIndex, ExpectedSize))
+ {
+ AsyncContinue.store(false);
+ }
}
}
}
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed iterating chunks in {}. Reason: '{}'", this->m_RootDirectory, Ex.what());
+ }
Work.Wait();
}
return AsyncContinue.load();
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h
index d7e986551..639c6968c 100644
--- a/src/zenutil/include/zenutil/parallelwork.h
+++ b/src/zenutil/include/zenutil/parallelwork.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zencore/scopeguard.h>
#include <zencore/thread.h>
#include <zencore/workthreadpool.h>
@@ -26,6 +27,7 @@ public:
try
{
WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
+ auto _ = MakeGuard([this]() { m_PendingWork.CountDown(); });
try
{
while (m_PauseFlag && !m_AbortFlag)
@@ -38,7 +40,6 @@ public:
{
OnError(std::current_exception(), m_AbortFlag);
}
- m_PendingWork.CountDown();
});
}
catch (const std::exception&)
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
index 67fc03c04..aa806438b 100644
--- a/src/zenutil/parallelwork.cpp
+++ b/src/zenutil/parallelwork.cpp
@@ -2,6 +2,7 @@
#include <zenutil/parallelwork.h>
+#include <zencore/callstack.h>
#include <zencore/except.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
@@ -34,7 +35,33 @@ ParallelWork::~ParallelWork()
m_PendingWork.CountDown();
}
m_PendingWork.Wait();
- ZEN_ASSERT(m_PendingWork.Remaining() == 0);
+ ptrdiff_t RemainingWork = m_PendingWork.Remaining();
+ if (RemainingWork != 0)
+ {
+ void* Frames[8];
+ uint32_t FrameCount = GetCallstack(2, 8, Frames);
+ CallstackFrames* Callstack = CreateCallstack(FrameCount, Frames);
+ ZEN_ERROR("ParallelWork destructor waited for outstanding work but pending work count is {} instead of 0\n{}",
+ RemainingWork,
+ CallstackToString(Callstack, " "));
+ FreeCallstack(Callstack);
+
+ uint32_t WaitedMs = 0;
+ while (m_PendingWork.Remaining() > 0 && WaitedMs < 2000)
+ {
+ Sleep(50);
+ WaitedMs += 50;
+ }
+ RemainingWork = m_PendingWork.Remaining();
+ if (RemainingWork != 0)
+ {
+ ZEN_WARN("ParallelWork destructor safety wait failed, pending work count at {}", RemainingWork)
+ }
+ else
+ {
+ ZEN_INFO("ParallelWork destructor safety wait succeeded");
+ }
+ }
}
catch (const std::exception& Ex)
{