diff options
| author | Dan Engelbrecht <[email protected]> | 2025-06-16 13:17:54 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-06-16 13:17:54 +0200 |
| commit | d000167e12c6dde651ef86be9f67552291ff1b7d (patch) | |
| tree | 17fb42c4c7d61b3064c33d6aa6f8787bef329586 /src/zenserver/projectstore | |
| parent | fix build store range check (#437) (diff) | |
| download | zen-d000167e12c6dde651ef86be9f67552291ff1b7d.tar.xz zen-d000167e12c6dde651ef86be9f67552291ff1b7d.zip | |
graceful wait in parallelwork destructor (#438)
* exception safety when issuing ParallelWork
* add asserts to Latch usage to catch usage errors
* extended error messaging and recovery handling in ParallelWork destructor to help find issues
Diffstat (limited to 'src/zenserver/projectstore')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 171 |
1 files changed, 104 insertions, 67 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 6359b9db9..53e687983 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1599,20 +1599,36 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) + try { - if (OptionalWorkerPool) - { - Work.ScheduleWork(*OptionalWorkerPool, [&, Index = OpIndex](std::atomic<bool>&) { - ZEN_MEMSCOPE(GetProjectstoreTag()); - ValidateOne(Index); - }); - } - else + for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) { - ValidateOne(OpIndex); + if (AbortFlag) + { + break; + } + if (OptionalWorkerPool) + { + Work.ScheduleWork(*OptionalWorkerPool, [&ValidateOne, Index = OpIndex](std::atomic<bool>& AbortFlag) { + ZEN_MEMSCOPE(GetProjectstoreTag()); + if (AbortFlag) + { + return; + } + ValidateOne(Index); + }); + } + else + { + ValidateOne(OpIndex); + } } } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed validating oplogs in {}. Reason: '{}'", m_BasePath, Ex.what()); + } Work.Wait(); { @@ -2110,67 +2126,81 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } if (OptionalWorkerPool) { - std::atomic_bool Result = true; std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - - for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) + try { - if (Result.load() == false) + for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) { - break; - } - Work.ScheduleWork( - *OptionalWorkerPool, - [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]( - std::atomic<bool>&) { - if (Result.load() == false) - { - return; - } - size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; - const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; - try - { - IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); - if (!Payload) + if (AbortFlag) + { + break; + } + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback]( + std::atomic<bool>& AbortFlag) { + if (AbortFlag) { - ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath); + return; } + size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; + const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; + try + { + IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); + if (!Payload) + { + ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath); + } - if (!AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) + if (!AsyncCallback(FileChunkIndex, + Payload, + IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) + { + AbortFlag.store(true); + } + } + catch (const std::exception& Ex) { - Result.store(false); + ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", + m_OuterProject->Identifier, + m_OplogId, + FileChunkIndex, + FilePath, + Ex.what()); } - } - catch (const std::exception& Ex) - { - ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", - m_OuterProject->Identifier, - m_OplogId, - FileChunkIndex, - FilePath, - Ex.what()); - } - }); - } + }); + } - if (!CidChunkHashes.empty()) + if (!CidChunkHashes.empty() && !AbortFlag) + { + m_CidStore.IterateChunks( + CidChunkHashes, + [&](size_t Index, const IoBuffer& Payload) { + size_t CidChunkIndex = CidChunkIndexes[Index]; + if (AbortFlag) + { + return false; + } + return AsyncCallback(CidChunkIndex, + Payload, + IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); + }, + OptionalWorkerPool, + LargeSizeLimit); + } + } + catch (const std::exception& Ex) { - m_CidStore.IterateChunks( - CidChunkHashes, - [&](size_t Index, const IoBuffer& Payload) { - size_t CidChunkIndex = CidChunkIndexes[Index]; - return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); - }, - OptionalWorkerPool, - LargeSizeLimit); + AbortFlag.store(true); + ZEN_WARN("Failed iterating oplog chunks in {}. Reason: '{}'", m_BasePath, Ex.what()); } Work.Wait(); - return Result.load(); + return !AbortFlag; } else { @@ -3894,19 +3924,26 @@ ProjectStore::Flush() std::atomic<bool> AbortFlag; std::atomic<bool> PauseFlag; ParallelWork Work(AbortFlag, PauseFlag); - - for (const Ref<Project>& Project : Projects) + try { - Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) { - try - { - Project->Flush(); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); - } - }); + for (const Ref<Project>& Project : Projects) + { + Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) { + try + { + Project->Flush(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); + } + }); + } + } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed projects in {}. Reason: '{}'", m_ProjectBasePath, Ex.what()); } Work.Wait(); |