diff options
| author | Dan Engelbrecht <[email protected]> | 2024-02-28 11:10:34 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-02-28 11:10:34 +0100 |
| commit | bd3b270b57eafc626ca42efea3be8c460761c6ca (patch) | |
| tree | bea450cfeb1c0f092d143e2f3283c65fe432617d /src | |
| parent | add disk caching to block move (#661) (diff) | |
| download | zen-bd3b270b57eafc626ca42efea3be8c460761c6ca.tar.xz zen-bd3b270b57eafc626ca42efea3be8c460761c6ca.zip | |
Make sure we wait for all scheduled tasks to complete before throwing exceptions further (#662)
Bugfix: We must not throw exceptions to calling function until all async work we spawned has returned
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 4 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 40 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 7 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 8 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 8 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 136 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 4 |
7 files changed, 145 insertions, 62 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 7d4ce3809..b255ac9e6 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -3344,7 +3344,7 @@ ProjectStore::AreDiskWritesAllowed() const std::string ProjectStore::GetGcName(GcCtx&) { - return fmt::format("projectstore:'{}'", m_ProjectBasePath.string()); + return fmt::format("projectstore: '{}'", m_ProjectBasePath.string()); } class ProjectStoreGcStoreCompactor : public GcStoreCompactor @@ -3582,6 +3582,8 @@ public: m_Oplog.m_OplogLock.WithExclusiveLock([&]() { m_Oplog.m_UpdatedLSNs.reset(); }); } + virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}'", m_Oplog.m_BasePath); } + virtual void PreCache(GcCtx& Ctx) override { if (m_PreCache) diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 1ffc959e7..b9cb89fc9 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -2647,7 +2647,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const std::string ZenCacheDiskLayer::CacheBucket::GetGcName(GcCtx&) { - return fmt::format("cachebucket:'{}'", m_BucketDir.string()); + return fmt::format("cachebucket: '{}'", m_BucketDir.string()); } class DiskBucketStoreCompactor : public GcStoreCompactor @@ -3039,6 +3039,8 @@ public: } } + virtual std::string GetGcName(GcCtx& Ctx) override { return m_CacheBucket.GetGcName(Ctx); } + virtual void PreCache(GcCtx& Ctx) override { ZEN_TRACE_CPU("Z$::Bucket::PreCache"); @@ -3510,8 +3512,7 @@ ZenCacheDiskLayer::DiscoverBuckets() { WorkLatch.AddCount(1); Pool.ScheduleWork([&]() { - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); - + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); const std::string BucketName = PathToUtf8(BucketPath.stem()); try { @@ -3621,13 +3622,27 @@ ZenCacheDiskLayer::Flush() { WorkerThreadPool& Pool = GetSmallWorkerPool(); Latch WorkLatch(1); - for (auto& Bucket : Buckets) + try { - WorkLatch.AddCount(1); - Pool.ScheduleWork([&]() { - auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); - Bucket->Flush(); - }); + for (auto& Bucket : Buckets) + { + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + try + { + Bucket->Flush(); + } + catch (std::exception& Ex) + { + ZEN_ERROR("Failed flushing bucket. Reason: '{}'", Ex.what()); + } + }); + } + } + catch (std::exception& Ex) + { + ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } WorkLatch.CountDown(); while (!WorkLatch.Wait(1000)) @@ -3660,6 +3675,13 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) for (auto& Result : Results) { + if (Result.valid()) + { + Result.wait(); + } + } + for (auto& Result : Results) + { Result.get(); } } diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index b20f2049a..f1a141ca0 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -119,6 +119,13 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig) }})); for (std::future<void>& Result : Work) { + if (Result.valid()) + { + Result.wait(); + } + } + for (std::future<void>& Result : Work) + { Result.get(); } } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index b308ddd4f..c8fb41ffc 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -699,7 +699,11 @@ public: { } - virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, GcStats& Stats, const GetUnusedReferencesFunc& GetUnusedReferences) + virtual std::string GetGcName(GcCtx& Ctx) override { return m_CasContainerStrategy.GetGcName(Ctx); } + + virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, + GcStats& Stats, + const GetUnusedReferencesFunc& GetUnusedReferences) override { ZEN_TRACE_CPU("CasContainer::RemoveUnreferencedData"); @@ -778,7 +782,7 @@ private: std::string CasContainerStrategy::GetGcName(GcCtx&) { - return fmt::format("compactcas:'{}'", (m_RootDirectory / m_ContainerBaseName).string()); + return fmt::format("compactcas: '{}'", (m_RootDirectory / m_ContainerBaseName).string()); } GcReferencePruner* diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 7aa180b68..af0a3a176 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -1531,7 +1531,11 @@ class FileCasReferencePruner : public GcReferencePruner public: FileCasReferencePruner(FileCasStrategy& Owner, std::vector<IoHash>&& Cids) : m_FileCasStrategy(Owner), m_Cids(std::move(Cids)) {} - virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, GcStats& Stats, const GetUnusedReferencesFunc& GetUnusedReferences) + virtual std::string GetGcName(GcCtx& Ctx) override { return m_FileCasStrategy.GetGcName(Ctx); } + + virtual GcStoreCompactor* RemoveUnreferencedData(GcCtx& Ctx, + GcStats& Stats, + const GetUnusedReferencesFunc& GetUnusedReferences) override { ZEN_TRACE_CPU("FileCas::RemoveUnreferencedData"); @@ -1606,7 +1610,7 @@ private: std::string FileCasStrategy::GetGcName(GcCtx&) { - return fmt::format("filecas:'{}'", m_RootDirectory.string()); + return fmt::format("filecas: '{}'", m_RootDirectory.string()); } GcReferencePruner* diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 8c02e82c5..f6469c51d 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -633,7 +633,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) { // First remove any cache keys that may own references SCOPED_TIMER(Result.RemoveExpiredDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()); if (Ctx.Settings.Verbose) { - ZEN_INFO("GCV2: Removed epxired data for {} referenceners in {}", + ZEN_INFO("GCV2: Removed expired data for {} referenceners in {}", m_GcReferencers.size(), NiceTimeSpanMs(Result.RemoveExpiredDataMS.count())); }); @@ -648,16 +648,24 @@ GcManager::CollectGarbage(const GcSettings& Settings) GcReferencer* Owner = m_GcReferencers[Index]; std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index]; WorkLeft.AddCount(1); - ThreadPool.ScheduleWork([&Ctx, &WorkLeft, Owner, Stats, &StoreCompactorsLock, &StoreCompactors]() { - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - Stats->first = Owner->GetGcName(Ctx); - SCOPED_TIMER(Stats->second.RemoveExpiredDataStats.ElapsedMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - std::unique_ptr<GcStoreCompactor> StoreCompactor( - Owner->RemoveExpiredData(Ctx, Stats->second.RemoveExpiredDataStats)); - if (StoreCompactor) + ThreadPool.ScheduleWork([this, &Ctx, &WorkLeft, Owner, Stats, &StoreCompactorsLock, &StoreCompactors]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + try { - RwLock::ExclusiveLockScope __(StoreCompactorsLock); - StoreCompactors.insert_or_assign(std::move(StoreCompactor), &Stats->second.CompactStoreStats); + Stats->first = Owner->GetGcName(Ctx); + SCOPED_TIMER(Stats->second.RemoveExpiredDataStats.ElapsedMS = + std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + std::unique_ptr<GcStoreCompactor> StoreCompactor( + Owner->RemoveExpiredData(Ctx, Stats->second.RemoveExpiredDataStats)); + if (StoreCompactor) + { + RwLock::ExclusiveLockScope __(StoreCompactorsLock); + StoreCompactors.insert_or_assign(std::move(StoreCompactor), &Stats->second.CompactStoreStats); + } + } + catch (std::exception& Ex) + { + ZEN_ERROR("GCV2: Failed removing expired data for {}. Reason: '{}'", Owner->GetGcName(Ctx), Ex.what()); } }); } @@ -706,21 +714,30 @@ GcManager::CollectGarbage(const GcSettings& Settings) std::pair<std::string, GcReferenceStoreStats>* Stats = &Result.ReferenceStoreStats[Index]; WorkLeft.AddCount(1); ThreadPool.ScheduleWork( - [&Ctx, ReferenceStore, Stats, Index, &WorkLeft, &ReferencePrunersLock, &ReferencePruners]() { - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - Stats->first = ReferenceStore->GetGcName(Ctx); - std::unique_ptr<GcReferencePruner> ReferencePruner; + [this, &Ctx, ReferenceStore, Stats, Index, &WorkLeft, &ReferencePrunersLock, &ReferencePruners]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + try { - SCOPED_TIMER(Stats->second.CreateReferencePrunersMS = - std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - // The ReferenceStore will pick a list of CId entries to check, returning a collector - ReferencePruner = - std::unique_ptr<GcReferencePruner>(ReferenceStore->CreateReferencePruner(Ctx, Stats->second)); + Stats->first = ReferenceStore->GetGcName(Ctx); + std::unique_ptr<GcReferencePruner> ReferencePruner; + { + SCOPED_TIMER(Stats->second.CreateReferencePrunersMS = + std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + // The ReferenceStore will pick a list of CId entries to check, returning a collector + ReferencePruner = + std::unique_ptr<GcReferencePruner>(ReferenceStore->CreateReferencePruner(Ctx, Stats->second)); + } + if (ReferencePruner) + { + RwLock::ExclusiveLockScope __(ReferencePrunersLock); + ReferencePruners.insert_or_assign(Index, std::move(ReferencePruner)); + } } - if (ReferencePruner) + catch (std::exception& Ex) { - RwLock::ExclusiveLockScope __(ReferencePrunersLock); - ReferencePruners.insert_or_assign(Index, std::move(ReferencePruner)); + ZEN_ERROR("GCV2: Failed creating reference pruners for {}. Reason: '{}'", + ReferenceStore->GetGcName(Ctx), + Ex.what()); } }); } @@ -767,18 +784,18 @@ GcManager::CollectGarbage(const GcSettings& Settings) std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index]; WorkLeft.AddCount(1); ThreadPool.ScheduleWork( - [&Ctx, &WorkLeft, Referencer, Index, Stats, &ReferenceCheckersLock, &ReferenceCheckers]() { + [this, &Ctx, &WorkLeft, Referencer, Index, Stats, &ReferenceCheckersLock, &ReferenceCheckers]() { auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - // The Referencer will create a reference checker that guarrantees that the references do not change as + // The Referencer will create a reference checker that guarantees that the references do not change as // long as it lives std::vector<GcReferenceChecker*> Checkers; - { - SCOPED_TIMER(Stats->second.CreateReferenceCheckersMS = - std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Checkers = Referencer->CreateReferenceCheckers(Ctx); - } try { + { + SCOPED_TIMER(Stats->second.CreateReferenceCheckersMS = + std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Checkers = Referencer->CreateReferenceCheckers(Ctx); + } if (!Checkers.empty()) { RwLock::ExclusiveLockScope __(ReferenceCheckersLock); @@ -789,14 +806,16 @@ GcManager::CollectGarbage(const GcSettings& Settings) } } } - catch (std::exception&) + catch (std::exception& Ex) { + ZEN_ERROR("GCV2: Failed creating reference checkers for {}. Reason: '{}'", + Referencer->GetGcName(Ctx), + Ex.what()); while (!Checkers.empty()) { delete Checkers.back(); Checkers.pop_back(); } - throw; } }); } @@ -837,10 +856,17 @@ GcManager::CollectGarbage(const GcSettings& Settings) size_t Index = It.second; std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index]; WorkLeft.AddCount(1); - ThreadPool.ScheduleWork([&Ctx, Checker, Index, Stats, &WorkLeft]() { + ThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() { auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - SCOPED_TIMER(Stats->second.PreCacheStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Checker->PreCache(Ctx); + try + { + SCOPED_TIMER(Stats->second.PreCacheStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Checker->PreCache(Ctx); + } + catch (std::exception& Ex) + { + ZEN_ERROR("GCV2: Failed precaching for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what()); + } }); } WorkLeft.CountDown(); @@ -886,10 +912,17 @@ GcManager::CollectGarbage(const GcSettings& Settings) size_t Index = It.second; std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index]; WorkLeft.AddCount(1); - ThreadPool.ScheduleWork([&Ctx, Checker, Index, Stats, &WorkLeft]() { + ThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() { auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - SCOPED_TIMER(Stats->second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Checker->LockState(Ctx); + try + { + SCOPED_TIMER(Stats->second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Checker->LockState(Ctx); + } + catch (std::exception& Ex) + { + ZEN_ERROR("GCV2: Failed locking state for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what()); + } }); } WorkLeft.CountDown(); @@ -943,23 +976,30 @@ GcManager::CollectGarbage(const GcSettings& Settings) GcReferenceStoreStats* Stats = &Result.ReferenceStoreStats[Index].second; WorkLeft.AddCount(1); ThreadPool.ScheduleWork( - [&Ctx, Pruner, Stats, &WorkLeft, &GetUnusedReferences, &StoreCompactorsLock, &StoreCompactors]() { + [this, &Ctx, Pruner, Stats, &WorkLeft, &GetUnusedReferences, &StoreCompactorsLock, &StoreCompactors]() { auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are // referenced or not. - std::unique_ptr<GcStoreCompactor> StoreCompactor; + try { - SCOPED_TIMER(Stats->RemoveUnreferencedDataStats.ElapsedMS = - std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - StoreCompactor = std::unique_ptr<GcStoreCompactor>( - Pruner->RemoveUnreferencedData(Ctx, - Stats->RemoveUnreferencedDataStats, - GetUnusedReferences)); + std::unique_ptr<GcStoreCompactor> StoreCompactor; + { + SCOPED_TIMER(Stats->RemoveUnreferencedDataStats.ElapsedMS = + std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + StoreCompactor = std::unique_ptr<GcStoreCompactor>( + Pruner->RemoveUnreferencedData(Ctx, + Stats->RemoveUnreferencedDataStats, + GetUnusedReferences)); + } + if (StoreCompactor) + { + RwLock::ExclusiveLockScope __(StoreCompactorsLock); + StoreCompactors.insert_or_assign(std::move(StoreCompactor), &Stats->CompactStoreStats); + } } - if (StoreCompactor) + catch (std::exception& Ex) { - RwLock::ExclusiveLockScope __(StoreCompactorsLock); - StoreCompactors.insert_or_assign(std::move(StoreCompactor), &Stats->CompactStoreStats); + ZEN_ERROR("GCV2: Failed locking state for {}. Reason: '{}'", Pruner->GetGcName(Ctx), Ex.what()); } }); } diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index 30dd97ce8..b7613883a 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -173,6 +173,8 @@ public: // Destructor should unlock what was locked in LockState virtual ~GcReferenceChecker() = default; + virtual std::string GetGcName(GcCtx& Ctx) = 0; + virtual void PreCache(GcCtx& Ctx) = 0; // Lock the state and make sure no references changes, usually a read-lock is taken until the destruction @@ -215,6 +217,8 @@ class GcReferencePruner public: virtual ~GcReferencePruner() = default; + virtual std::string GetGcName(GcCtx& Ctx) = 0; + typedef std::function<std::vector<IoHash>(std::span<IoHash> References)> GetUnusedReferencesFunc; // Check a set of references to see if they are in use. |