aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-02-28 11:10:34 +0100
committerGitHub <[email protected]>2024-02-28 11:10:34 +0100
commitbd3b270b57eafc626ca42efea3be8c460761c6ca (patch)
treebea450cfeb1c0f092d143e2f3283c65fe432617d /src
parentadd disk caching to block move (#661) (diff)
downloadzen-bd3b270b57eafc626ca42efea3be8c460761c6ca.tar.xz
zen-bd3b270b57eafc626ca42efea3be8c460761c6ca.zip
Make sure we wait for all scheduled tasks to complete before throwing exceptions further (#662)
Bugfix: We must not throw exceptions to calling function until all async work we spawned has returned
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/projectstore.cpp4
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp40
-rw-r--r--src/zenstore/cas.cpp7
-rw-r--r--src/zenstore/compactcas.cpp8
-rw-r--r--src/zenstore/filecas.cpp8
-rw-r--r--src/zenstore/gc.cpp136
-rw-r--r--src/zenstore/include/zenstore/gc.h4
7 files changed, 145 insertions, 62 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 7d4ce3809..b255ac9e6 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3344,7 +3344,7 @@ ProjectStore::AreDiskWritesAllowed() const
std::string
ProjectStore::GetGcName(GcCtx&)
{
- return fmt::format("projectstore:'{}'", m_ProjectBasePath.string());
+ return fmt::format("projectstore: '{}'", m_ProjectBasePath.string());
}
class ProjectStoreGcStoreCompactor : public GcStoreCompactor
@@ -3582,6 +3582,8 @@ public:
m_Oplog.m_OplogLock.WithExclusiveLock([&]() { m_Oplog.m_UpdatedLSNs.reset(); });
}
+ virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}'", m_Oplog.m_BasePath); }
+
virtual void PreCache(GcCtx& Ctx) override
{
if (m_PreCache)
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 1ffc959e7..b9cb89fc9 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -2647,7 +2647,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const
std::string
ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&)
{
- return fmt::format("cachebucket:'{}'", m_BucketDir.string());
+ return fmt::format("cachebucket: '{}'", m_BucketDir.string());
}
class DiskBucketStoreCompactor : public GcStoreCompactor
@@ -3039,6 +3039,8 @@ public:
}
}
+ virtual std::string GetGcName(GcCtx& Ctx) override { return m_CacheBucket.GetGcName(Ctx); }
+
virtual void PreCache(GcCtx& Ctx) override
{
ZEN_TRACE_CPU("Z$::Bucket::PreCache");
@@ -3510,8 +3512,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
{
WorkLatch.AddCount(1);
Pool.ScheduleWork([&]() {
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
-
+ auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
const std::string BucketName = PathToUtf8(BucketPath.stem());
try
{
@@ -3621,13 +3622,27 @@ ZenCacheDiskLayer::Flush()
{
WorkerThreadPool& Pool = GetSmallWorkerPool();
Latch WorkLatch(1);
- for (auto& Bucket : Buckets)
+ try
{
- WorkLatch.AddCount(1);
- Pool.ScheduleWork([&]() {
- auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
- Bucket->Flush();
- });
+ for (auto& Bucket : Buckets)
+ {
+ WorkLatch.AddCount(1);
+ Pool.ScheduleWork([&]() {
+ auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
+ try
+ {
+ Bucket->Flush();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Failed flushing bucket. Reason: '{}'", Ex.what());
+ }
+ });
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what());
}
WorkLatch.CountDown();
while (!WorkLatch.Wait(1000))
@@ -3660,6 +3675,13 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx)
for (auto& Result : Results)
{
+ if (Result.valid())
+ {
+ Result.wait();
+ }
+ }
+ for (auto& Result : Results)
+ {
Result.get();
}
}
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index b20f2049a..f1a141ca0 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -119,6 +119,13 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig)
}}));
for (std::future<void>& Result : Work)
{
+ if (Result.valid())
+ {
+ Result.wait();
+ }
+ }
+ for (std::future<void>& Result : Work)
+ {
Result.get();
}
}
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index b308ddd4f..c8fb41ffc 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -699,7 +699,11 @@ public:
{
}
- virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, GcStats& Stats, const GetUnusedReferencesFunc& GetUnusedReferences)
+ virtual std::string GetGcName(GcCtx& Ctx) override { return m_CasContainerStrategy.GetGcName(Ctx); }
+
+ virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx,
+ GcStats& Stats,
+ const GetUnusedReferencesFunc& GetUnusedReferences) override
{
ZEN_TRACE_CPU("CasContainer::RemoveUnreferencedData");
@@ -778,7 +782,7 @@ private:
std::string
CasContainerStrategy::GetGcName(GcCtx&)
{
- return fmt::format("compactcas:'{}'", (m_RootDirectory / m_ContainerBaseName).string());
+ return fmt::format("compactcas: '{}'", (m_RootDirectory / m_ContainerBaseName).string());
}
GcReferencePruner*
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 7aa180b68..af0a3a176 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -1531,7 +1531,11 @@ class FileCasReferencePruner : public GcReferencePruner
public:
FileCasReferencePruner(FileCasStrategy& Owner, std::vector<IoHash>&& Cids) : m_FileCasStrategy(Owner), m_Cids(std::move(Cids)) {}
- virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, GcStats& Stats, const GetUnusedReferencesFunc& GetUnusedReferences)
+ virtual std::string GetGcName(GcCtx& Ctx) override { return m_FileCasStrategy.GetGcName(Ctx); }
+
+ virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx,
+ GcStats& Stats,
+ const GetUnusedReferencesFunc& GetUnusedReferences) override
{
ZEN_TRACE_CPU("FileCas::RemoveUnreferencedData");
@@ -1606,7 +1610,7 @@ private:
std::string
FileCasStrategy::GetGcName(GcCtx&)
{
- return fmt::format("filecas:'{}'", m_RootDirectory.string());
+ return fmt::format("filecas: '{}'", m_RootDirectory.string());
}
GcReferencePruner*
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 8c02e82c5..f6469c51d 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -633,7 +633,7 @@ GcManager::CollectGarbage(const GcSettings& Settings)
{
// First remove any cache keys that may own references
SCOPED_TIMER(Result.RemoveExpiredDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()); if (Ctx.Settings.Verbose) {
- ZEN_INFO("GCV2: Removed epxired data for {} referenceners in {}",
+ ZEN_INFO("GCV2: Removed expired data for {} referenceners in {}",
m_GcReferencers.size(),
NiceTimeSpanMs(Result.RemoveExpiredDataMS.count()));
});
@@ -648,16 +648,24 @@ GcManager::CollectGarbage(const GcSettings& Settings)
GcReferencer* Owner = m_GcReferencers[Index];
std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index];
WorkLeft.AddCount(1);
- ThreadPool.ScheduleWork([&Ctx, &WorkLeft, Owner, Stats, &StoreCompactorsLock, &StoreCompactors]() {
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- Stats->first = Owner->GetGcName(Ctx);
- SCOPED_TIMER(Stats->second.RemoveExpiredDataStats.ElapsedMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- std::unique_ptr<GcStoreCompactor> StoreCompactor(
- Owner->RemoveExpiredData(Ctx, Stats->second.RemoveExpiredDataStats));
- if (StoreCompactor)
+ ThreadPool.ScheduleWork([this, &Ctx, &WorkLeft, Owner, Stats, &StoreCompactorsLock, &StoreCompactors]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ try
{
- RwLock::ExclusiveLockScope __(StoreCompactorsLock);
- StoreCompactors.insert_or_assign(std::move(StoreCompactor), &Stats->second.CompactStoreStats);
+ Stats->first = Owner->GetGcName(Ctx);
+ SCOPED_TIMER(Stats->second.RemoveExpiredDataStats.ElapsedMS =
+ std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ std::unique_ptr<GcStoreCompactor> StoreCompactor(
+ Owner->RemoveExpiredData(Ctx, Stats->second.RemoveExpiredDataStats));
+ if (StoreCompactor)
+ {
+ RwLock::ExclusiveLockScope __(StoreCompactorsLock);
+ StoreCompactors.insert_or_assign(std::move(StoreCompactor), &Stats->second.CompactStoreStats);
+ }
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("GCV2: Failed removing expired data for {}. Reason: '{}'", Owner->GetGcName(Ctx), Ex.what());
}
});
}
@@ -706,21 +714,30 @@ GcManager::CollectGarbage(const GcSettings& Settings)
std::pair<std::string, GcReferenceStoreStats>* Stats = &Result.ReferenceStoreStats[Index];
WorkLeft.AddCount(1);
ThreadPool.ScheduleWork(
- [&Ctx, ReferenceStore, Stats, Index, &WorkLeft, &ReferencePrunersLock, &ReferencePruners]() {
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- Stats->first = ReferenceStore->GetGcName(Ctx);
- std::unique_ptr<GcReferencePruner> ReferencePruner;
+ [this, &Ctx, ReferenceStore, Stats, Index, &WorkLeft, &ReferencePrunersLock, &ReferencePruners]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ try
{
- SCOPED_TIMER(Stats->second.CreateReferencePrunersMS =
- std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- // The ReferenceStore will pick a list of CId entries to check, returning a collector
- ReferencePruner =
- std::unique_ptr<GcReferencePruner>(ReferenceStore->CreateReferencePruner(Ctx, Stats->second));
+ Stats->first = ReferenceStore->GetGcName(Ctx);
+ std::unique_ptr<GcReferencePruner> ReferencePruner;
+ {
+ SCOPED_TIMER(Stats->second.CreateReferencePrunersMS =
+ std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ // The ReferenceStore will pick a list of CId entries to check, returning a collector
+ ReferencePruner =
+ std::unique_ptr<GcReferencePruner>(ReferenceStore->CreateReferencePruner(Ctx, Stats->second));
+ }
+ if (ReferencePruner)
+ {
+ RwLock::ExclusiveLockScope __(ReferencePrunersLock);
+ ReferencePruners.insert_or_assign(Index, std::move(ReferencePruner));
+ }
}
- if (ReferencePruner)
+ catch (std::exception& Ex)
{
- RwLock::ExclusiveLockScope __(ReferencePrunersLock);
- ReferencePruners.insert_or_assign(Index, std::move(ReferencePruner));
+ ZEN_ERROR("GCV2: Failed creating reference pruners for {}. Reason: '{}'",
+ ReferenceStore->GetGcName(Ctx),
+ Ex.what());
}
});
}
@@ -767,18 +784,18 @@ GcManager::CollectGarbage(const GcSettings& Settings)
std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index];
WorkLeft.AddCount(1);
ThreadPool.ScheduleWork(
- [&Ctx, &WorkLeft, Referencer, Index, Stats, &ReferenceCheckersLock, &ReferenceCheckers]() {
+ [this, &Ctx, &WorkLeft, Referencer, Index, Stats, &ReferenceCheckersLock, &ReferenceCheckers]() {
auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- // The Referencer will create a reference checker that guarrantees that the references do not change as
+ // The Referencer will create a reference checker that guarantees that the references do not change as
// long as it lives
std::vector<GcReferenceChecker*> Checkers;
- {
- SCOPED_TIMER(Stats->second.CreateReferenceCheckersMS =
- std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Checkers = Referencer->CreateReferenceCheckers(Ctx);
- }
try
{
+ {
+ SCOPED_TIMER(Stats->second.CreateReferenceCheckersMS =
+ std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Checkers = Referencer->CreateReferenceCheckers(Ctx);
+ }
if (!Checkers.empty())
{
RwLock::ExclusiveLockScope __(ReferenceCheckersLock);
@@ -789,14 +806,16 @@ GcManager::CollectGarbage(const GcSettings& Settings)
}
}
}
- catch (std::exception&)
+ catch (std::exception& Ex)
{
+ ZEN_ERROR("GCV2: Failed creating reference checkers for {}. Reason: '{}'",
+ Referencer->GetGcName(Ctx),
+ Ex.what());
while (!Checkers.empty())
{
delete Checkers.back();
Checkers.pop_back();
}
- throw;
}
});
}
@@ -837,10 +856,17 @@ GcManager::CollectGarbage(const GcSettings& Settings)
size_t Index = It.second;
std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index];
WorkLeft.AddCount(1);
- ThreadPool.ScheduleWork([&Ctx, Checker, Index, Stats, &WorkLeft]() {
+ ThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() {
auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- SCOPED_TIMER(Stats->second.PreCacheStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Checker->PreCache(Ctx);
+ try
+ {
+ SCOPED_TIMER(Stats->second.PreCacheStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Checker->PreCache(Ctx);
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("GCV2: Failed precaching for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what());
+ }
});
}
WorkLeft.CountDown();
@@ -886,10 +912,17 @@ GcManager::CollectGarbage(const GcSettings& Settings)
size_t Index = It.second;
std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index];
WorkLeft.AddCount(1);
- ThreadPool.ScheduleWork([&Ctx, Checker, Index, Stats, &WorkLeft]() {
+ ThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() {
auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- SCOPED_TIMER(Stats->second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Checker->LockState(Ctx);
+ try
+ {
+ SCOPED_TIMER(Stats->second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Checker->LockState(Ctx);
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_ERROR("GCV2: Failed locking state for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what());
+ }
});
}
WorkLeft.CountDown();
@@ -943,23 +976,30 @@ GcManager::CollectGarbage(const GcSettings& Settings)
GcReferenceStoreStats* Stats = &Result.ReferenceStoreStats[Index].second;
WorkLeft.AddCount(1);
ThreadPool.ScheduleWork(
- [&Ctx, Pruner, Stats, &WorkLeft, &GetUnusedReferences, &StoreCompactorsLock, &StoreCompactors]() {
+ [this, &Ctx, Pruner, Stats, &WorkLeft, &GetUnusedReferences, &StoreCompactorsLock, &StoreCompactors]() {
auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
// Go through all the ReferenceCheckers to see if the list of Cids the collector selected are
// referenced or not.
- std::unique_ptr<GcStoreCompactor> StoreCompactor;
+ try
{
- SCOPED_TIMER(Stats->RemoveUnreferencedDataStats.ElapsedMS =
- std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- StoreCompactor = std::unique_ptr<GcStoreCompactor>(
- Pruner->RemoveUnreferencedData(Ctx,
- Stats->RemoveUnreferencedDataStats,
- GetUnusedReferences));
+ std::unique_ptr<GcStoreCompactor> StoreCompactor;
+ {
+ SCOPED_TIMER(Stats->RemoveUnreferencedDataStats.ElapsedMS =
+ std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ StoreCompactor = std::unique_ptr<GcStoreCompactor>(
+ Pruner->RemoveUnreferencedData(Ctx,
+ Stats->RemoveUnreferencedDataStats,
+ GetUnusedReferences));
+ }
+ if (StoreCompactor)
+ {
+ RwLock::ExclusiveLockScope __(StoreCompactorsLock);
+ StoreCompactors.insert_or_assign(std::move(StoreCompactor), &Stats->CompactStoreStats);
+ }
}
- if (StoreCompactor)
+ catch (std::exception& Ex)
{
- RwLock::ExclusiveLockScope __(StoreCompactorsLock);
- StoreCompactors.insert_or_assign(std::move(StoreCompactor), &Stats->CompactStoreStats);
+ ZEN_ERROR("GCV2: Failed locking state for {}. Reason: '{}'", Pruner->GetGcName(Ctx), Ex.what());
}
});
}
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index 30dd97ce8..b7613883a 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -173,6 +173,8 @@ public:
// Destructor should unlock what was locked in LockState
virtual ~GcReferenceChecker() = default;
+ virtual std::string GetGcName(GcCtx& Ctx) = 0;
+
virtual void PreCache(GcCtx& Ctx) = 0;
// Lock the state and make sure no references changes, usually a read-lock is taken until the destruction
@@ -215,6 +217,8 @@ class GcReferencePruner
public:
virtual ~GcReferencePruner() = default;
+ virtual std::string GetGcName(GcCtx& Ctx) = 0;
+
typedef std::function<std::vector<IoHash>(std::span<IoHash> References)> GetUnusedReferencesFunc;
// Check a set of references to see if they are in use.