aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/projectstore
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-16 13:17:54 +0200
committerGitHub Enterprise <[email protected]>2025-06-16 13:17:54 +0200
commitd000167e12c6dde651ef86be9f67552291ff1b7d (patch)
tree17fb42c4c7d61b3064c33d6aa6f8787bef329586 /src/zenserver/projectstore
parentfix build store range check (#437) (diff)
downloadzen-d000167e12c6dde651ef86be9f67552291ff1b7d.tar.xz
zen-d000167e12c6dde651ef86be9f67552291ff1b7d.zip
graceful wait in parallelwork destructor (#438)
* exception safety when issuing ParallelWork * add asserts to Latch usage to catch usage errors * extended error messaging and recovery handling in ParallelWork destructor to help find issues
Diffstat (limited to 'src/zenserver/projectstore')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp171
1 files changed, 104 insertions, 67 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 6359b9db9..53e687983 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1599,20 +1599,36 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
- for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
+ try
{
- if (OptionalWorkerPool)
- {
- Work.ScheduleWork(*OptionalWorkerPool, [&, Index = OpIndex](std::atomic<bool>&) {
- ZEN_MEMSCOPE(GetProjectstoreTag());
- ValidateOne(Index);
- });
- }
- else
+ for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
{
- ValidateOne(OpIndex);
+ if (AbortFlag)
+ {
+ break;
+ }
+ if (OptionalWorkerPool)
+ {
+ Work.ScheduleWork(*OptionalWorkerPool, [&ValidateOne, Index = OpIndex](std::atomic<bool>& AbortFlag) {
+ ZEN_MEMSCOPE(GetProjectstoreTag());
+ if (AbortFlag)
+ {
+ return;
+ }
+ ValidateOne(Index);
+ });
+ }
+ else
+ {
+ ValidateOne(OpIndex);
+ }
}
}
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed validating oplogs in {}. Reason: '{}'", m_BasePath, Ex.what());
+ }
Work.Wait();
{
@@ -2110,67 +2126,81 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
}
if (OptionalWorkerPool)
{
- std::atomic_bool Result = true;
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
-
- for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
+ try
{
- if (Result.load() == false)
+ for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
{
- break;
- }
- Work.ScheduleWork(
- *OptionalWorkerPool,
- [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result](
- std::atomic<bool>&) {
- if (Result.load() == false)
- {
- return;
- }
- size_t FileChunkIndex = FileChunkIndexes[ChunkIndex];
- const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex];
- try
- {
- IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
- if (!Payload)
+ if (AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(
+ *OptionalWorkerPool,
+ [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback](
+ std::atomic<bool>& AbortFlag) {
+ if (AbortFlag)
{
- ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath);
+ return;
}
+ size_t FileChunkIndex = FileChunkIndexes[ChunkIndex];
+ const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex];
+ try
+ {
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath);
+ if (!Payload)
+ {
+ ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath);
+ }
- if (!AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0))
+ if (!AsyncCallback(FileChunkIndex,
+ Payload,
+ IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0))
+ {
+ AbortFlag.store(true);
+ }
+ }
+ catch (const std::exception& Ex)
{
- Result.store(false);
+ ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'",
+ m_OuterProject->Identifier,
+ m_OplogId,
+ FileChunkIndex,
+ FilePath,
+ Ex.what());
}
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'",
- m_OuterProject->Identifier,
- m_OplogId,
- FileChunkIndex,
- FilePath,
- Ex.what());
- }
- });
- }
+ });
+ }
- if (!CidChunkHashes.empty())
+ if (!CidChunkHashes.empty() && !AbortFlag)
+ {
+ m_CidStore.IterateChunks(
+ CidChunkHashes,
+ [&](size_t Index, const IoBuffer& Payload) {
+ size_t CidChunkIndex = CidChunkIndexes[Index];
+ if (AbortFlag)
+ {
+ return false;
+ }
+ return AsyncCallback(CidChunkIndex,
+ Payload,
+ IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0);
+ },
+ OptionalWorkerPool,
+ LargeSizeLimit);
+ }
+ }
+ catch (const std::exception& Ex)
{
- m_CidStore.IterateChunks(
- CidChunkHashes,
- [&](size_t Index, const IoBuffer& Payload) {
- size_t CidChunkIndex = CidChunkIndexes[Index];
- return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0);
- },
- OptionalWorkerPool,
- LargeSizeLimit);
+ AbortFlag.store(true);
+ ZEN_WARN("Failed iterating oplog chunks in {}. Reason: '{}'", m_BasePath, Ex.what());
}
Work.Wait();
- return Result.load();
+ return !AbortFlag;
}
else
{
@@ -3894,19 +3924,26 @@ ProjectStore::Flush()
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
-
- for (const Ref<Project>& Project : Projects)
+ try
{
- Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) {
- try
- {
- Project->Flush();
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what());
- }
- });
+ for (const Ref<Project>& Project : Projects)
+ {
+ Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) {
+ try
+ {
+ Project->Flush();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what());
+ }
+ });
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed projects in {}. Reason: '{}'", m_ProjectBasePath, Ex.what());
}
Work.Wait();