aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-16 13:17:54 +0200
committerGitHub Enterprise <[email protected]>2025-06-16 13:17:54 +0200
commitd000167e12c6dde651ef86be9f67552291ff1b7d (patch)
tree17fb42c4c7d61b3064c33d6aa6f8787bef329586 /src/zenstore/cache/cachedisklayer.cpp
parentfix build store range check (#437) (diff)
downloadzen-d000167e12c6dde651ef86be9f67552291ff1b7d.tar.xz
zen-d000167e12c6dde651ef86be9f67552291ff1b7d.zip
graceful wait in parallelwork destructor (#438)
* exception safety when issuing ParallelWork * add asserts to Latch usage to catch usage errors * extended error messaging and recovery handling in ParallelWork destructor to help find issues
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp80
1 files changed, 45 insertions, 35 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 0ee70890c..0d2aef612 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -4036,50 +4036,58 @@ ZenCacheDiskLayer::DiscoverBuckets()
std::atomic<bool> AbortFlag;
std::atomic<bool> PauseFlag;
ParallelWork Work(AbortFlag, PauseFlag);
- for (auto& BucketPath : FoundBucketDirectories)
+ try
{
- Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) {
- ZEN_MEMSCOPE(GetCacheDiskTag());
-
- const std::string BucketName = PathToUtf8(BucketPath.stem());
- try
- {
- BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig;
- if (auto It = m_Configuration.BucketConfigMap.find_as(std::string_view(BucketName),
- std::hash<std::string_view>(),
- eastl::equal_to_2<std::string, std::string_view>());
- It != m_Configuration.BucketConfigMap.end())
- {
- BucketConfig = &It->second;
- }
-
- std::unique_ptr<CacheBucket> NewBucket =
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, *BucketConfig);
+ for (auto& BucketPath : FoundBucketDirectories)
+ {
+ Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) {
+ ZEN_MEMSCOPE(GetCacheDiskTag());
- CacheBucket* Bucket = nullptr;
+ const std::string BucketName = PathToUtf8(BucketPath.stem());
+ try
{
- RwLock::ExclusiveLockScope __(SyncLock);
- auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
- Bucket = InsertResult.first->second.get();
- }
- ZEN_ASSERT(Bucket);
+ BucketConfiguration* BucketConfig = &m_Configuration.BucketConfig;
+ if (auto It = m_Configuration.BucketConfigMap.find_as(std::string_view(BucketName),
+ std::hash<std::string_view>(),
+ eastl::equal_to_2<std::string, std::string_view>());
+ It != m_Configuration.BucketConfigMap.end())
+ {
+ BucketConfig = &It->second;
+ }
- if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ std::unique_ptr<CacheBucket> NewBucket =
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, *BucketConfig);
+ CacheBucket* Bucket = nullptr;
{
RwLock::ExclusiveLockScope __(SyncLock);
- m_Buckets.erase(BucketName);
+ auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
+ Bucket = InsertResult.first->second.get();
+ }
+ ZEN_ASSERT(Bucket);
+
+ if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+
+ {
+ RwLock::ExclusiveLockScope __(SyncLock);
+ m_Buckets.erase(BucketName);
+ }
}
}
- }
- catch (const std::exception& Err)
- {
- ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- return;
- }
- });
+ catch (const std::exception& Err)
+ {
+ ZEN_ERROR("Opening bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ return;
+ }
+ });
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ AbortFlag.store(true);
+ ZEN_WARN("Failed discovering buckets in {}. Reason: '{}'", m_RootDir, Ex.what());
}
Work.Wait();
}
@@ -4220,8 +4228,10 @@ ZenCacheDiskLayer::Flush()
}
catch (const std::exception& Ex)
{
+ AbortFlag.store(true);
ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what());
}
+
Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t RemainingWork) {
ZEN_UNUSED(IsAborted, IsPaused);
ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", RemainingWork, m_RootDir);