diff options
| author | Dan Engelbrecht <[email protected]> | 2023-11-06 11:38:56 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-11-06 11:38:56 +0100 |
| commit | ba972a77cb75facefd3f0da962cf34ea5a391884 (patch) | |
| tree | 9e0fe9ad41dd6ba20b966065eaa9f1dc214ab531 | |
| parent | keep a "null" iobuffer core to reduce redundant memory allocations (#507) (diff) | |
| download | zen-ba972a77cb75facefd3f0da962cf34ea5a391884.tar.xz zen-ba972a77cb75facefd3f0da962cf34ea5a391884.zip | |
multithread cache bucket (#508)
* Multithread init and flush of cache bucket
* tweaked threading cound for bucket discovery, disklayer flush and gc v2
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 87 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 504 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 9 |
4 files changed, 324 insertions, 277 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index b00754320..f80082e32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ - Feature: New options for zen command `gc-status` - `--details` that enables the detailed output from the last GC operation when using GC V2 - Feature: New garbage collection implementation, still in evaluation mode. Enabled by `--gc-v2` command line option +- Improvement: Multithread init and flush of cache bucket for faster startup and exit - Bugfix: Build script now sets up arch properly when running tests on MacOS ## 0.2.30 diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index fa80ed414..c0efdc76d 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -945,6 +945,8 @@ ZenCacheDiskLayer::CacheBucket::Flush() } auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); }); + ZEN_INFO("Flushing bucket {}", m_BucketDir); + m_BlockStore.Flush(/*ForceNewBlock*/ false); m_SlogFile.Flush(); @@ -3018,6 +3020,7 @@ ZenCacheDiskLayer::DiscoverBuckets() // Initialize buckets std::vector<std::filesystem::path> BadBucketDirectories; + std::vector<std::filesystem::path> FoundBucketDirectories; RwLock::ExclusiveLockScope _(m_Lock); @@ -3037,26 +3040,7 @@ ZenCacheDiskLayer::DiscoverBuckets() continue; } - auto InsertResult = - m_Buckets.emplace(BucketName, - std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig)); - CacheBucket& Bucket = *InsertResult.first->second; - - try - { - if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false)) - { - ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); - - m_Buckets.erase(InsertResult.first); - continue; - } - } - catch (const std::exception& Err) - { - ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); - return; - } + FoundBucketDirectories.push_back(BucketPath); ZEN_INFO("Discovered bucket '{}'", BucketName); } @@ -3081,6 +3065,53 @@ ZenCacheDiskLayer::DiscoverBuckets() ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath); } } + + RwLock SyncLock; + + const size_t MaxHwTreadUse = std::thread::hardware_concurrency(); + const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, FoundBucketDirectories.size())); + + WorkerThreadPool Pool(WorkerThreadPoolCount); + Latch WorkLatch(1); + for (auto& BucketPath : FoundBucketDirectories) + { + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + + const std::string BucketName = PathToUtf8(BucketPath.stem()); + try + { + std::unique_ptr<CacheBucket> NewBucket = + std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig); + + CacheBucket* Bucket = nullptr; + { + RwLock::ExclusiveLockScope __(SyncLock); + auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket)); + Bucket = InsertResult.first->second.get(); + } + ZEN_ASSERT(Bucket); + + if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false)) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir); + + { + RwLock::ExclusiveLockScope __(SyncLock); + m_Buckets.erase(BucketName); + } + } + } + catch (const std::exception& Err) + { + ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what()); + return; + } + }); + } + WorkLatch.CountDown(); + WorkLatch.Wait(); } bool @@ -3133,6 +3164,10 @@ ZenCacheDiskLayer::Flush() { RwLock::SharedLockScope _(m_Lock); + if (m_Buckets.empty()) + { + return; + } Buckets.reserve(m_Buckets.size()); for (auto& Kv : m_Buckets) { @@ -3140,11 +3175,21 @@ ZenCacheDiskLayer::Flush() Buckets.push_back(Bucket); } } + const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); + const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size())); + WorkerThreadPool Pool(WorkerThreadPoolCount); + Latch WorkLatch(1); for (auto& Bucket : Buckets) { - Bucket->Flush(); + WorkLatch.AddCount(1); + Pool.ScheduleWork([&]() { + auto _ = MakeGuard([&]() { WorkLatch.CountDown(); }); + Bucket->Flush(); + }); } + WorkLatch.CountDown(); + WorkLatch.Wait(); } void diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 4d146c16c..f319475f3 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -327,7 +327,7 @@ GcManager::~GcManager() { } -//////// Begin New GC WIP +//////// Begin GC V2 void GcResult::Sum() @@ -396,6 +396,10 @@ GcManager::RemoveGcReferenceStore(GcReferenceStore& ReferenceStore) std::erase_if(m_GcReferenceStores, [&](GcReferenceStore* $) { return $ == &ReferenceStore; }); } +#define SCOPED_TIMER(closure) \ + Stopwatch $Timer##__LINE__; \ + auto $Guard##__LINE = MakeGuard([&, &Timer = $Timer##__LINE__]() { closure }) + GcResult GcManager::CollectGarbage(const GcSettings& Settings) { @@ -408,264 +412,260 @@ GcManager::CollectGarbage(const GcSettings& Settings) RwLock::SharedLockScope GcLock(m_Lock); - static const bool SingleThread = -#if ZEN_BUILD_DEBUG - true -#else - false -#endif - ; + int WorkerThreadPoolCount = 0; + if (!Settings.SingleThread) + { + const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); + WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, m_GcReferencers.size())); + } -#define SCOPED_TIMER(closure) \ - Stopwatch $Timer##__LINE__; \ - auto $Guard##__LINE = MakeGuard([&, &Timer = $Timer##__LINE__]() { closure }) + Result.ReferencerStats.resize(m_GcReferencers.size()); + Result.ReferenceStoreStats.resize(m_GcReferenceStores.size()); - Result.ReferencerStats.resize(m_GcReferencers.size()); - Result.ReferenceStoreStats.resize(m_GcReferenceStores.size()); - - WorkerThreadPool ThreadPool(SingleThread ? 0 : 8); - - ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size()); - if (!m_GcReferencers.empty()) - { - Latch WorkLeft(1); - // First remove any cache keys that may own references - SCOPED_TIMER(Result.RemoveExpiredDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - for (size_t Index = 0; Index < m_GcReferencers.size(); Index++) - { - GcReferencer* Owner = m_GcReferencers[Index]; - std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index]; - WorkLeft.AddCount(1); - ThreadPool.ScheduleWork([&Ctx, Owner, &Stats, &WorkLeft]() { - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - Stats.first = Owner->GetGcName(Ctx); - SCOPED_TIMER(Stats.second.RemoveExpiredDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Owner->RemoveExpiredData(Ctx, Stats.second); - }); - } - WorkLeft.CountDown(); - WorkLeft.Wait(); - } - - if (Ctx.Settings.SkipCidDelete) - { - Result.Sum(); - return Result; - } - - ZEN_INFO("GCV2: Creating reference pruners from {} reference stores", m_GcReferenceStores.size()); - std::unordered_map<size_t, std::unique_ptr<GcReferencePruner>> ReferencePruners; - if (!m_GcReferenceStores.empty()) - { - ReferencePruners.reserve(m_GcReferenceStores.size()); - Latch WorkLeft(1); - RwLock ReferencePrunersLock; - // CreateReferencePruner is usually not very heavy but big data sets change that - SCOPED_TIMER(Result.CreateReferencePrunerMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - for (size_t Index = 0; Index < m_GcReferenceStores.size(); Index++) - { - GcReferenceStore* ReferenceStore = m_GcReferenceStores[Index]; - std::pair<std::string, GcReferenceStoreStats>& Stats = Result.ReferenceStoreStats[Index]; - WorkLeft.AddCount(1); - ThreadPool.ScheduleWork([&Ctx, ReferenceStore, &Stats, Index, &WorkLeft, &ReferencePrunersLock, &ReferencePruners]() { - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - Stats.first = ReferenceStore->GetGcName(Ctx); - std::unique_ptr<GcReferencePruner> ReferencePruner; - { - SCOPED_TIMER(Stats.second.CreateReferencePrunerMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - // The ReferenceStore will pick a list of CId entries to check, returning a collector - ReferencePruner = std::unique_ptr<GcReferencePruner>(ReferenceStore->CreateReferencePruner(Ctx, Stats.second)); - } - if (ReferencePruner) - { - RwLock::ExclusiveLockScope __(ReferencePrunersLock); - ReferencePruners.insert_or_assign(Index, std::move(ReferencePruner)); - } - }); - } - WorkLeft.CountDown(); - WorkLeft.Wait(); - } - - ZEN_INFO("GCV2: Creating reference checkers from {} referencers", m_GcReferencers.size()); - std::unordered_map<std::unique_ptr<GcReferenceChecker>, size_t> ReferenceCheckers; - if (!m_GcReferencers.empty()) - { - ReferenceCheckers.reserve(m_GcReferencers.size()); - Latch WorkLeft(1); - RwLock ReferenceCheckersLock; - SCOPED_TIMER(Result.CreateReferenceCheckersMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - // Lock all reference owners from changing the reference data and get access to check for referenced data - for (size_t Index = 0; Index < m_GcReferencers.size(); Index++) - { - GcReferencer* Referencer = m_GcReferencers[Index]; - std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index]; - WorkLeft.AddCount(1); - ThreadPool.ScheduleWork([&Ctx, &WorkLeft, Referencer, Index, &Stats, &ReferenceCheckersLock, &ReferenceCheckers]() { - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - // The Referencer will create a reference checker that guarrantees that the references do not change as long as it lives - std::vector<GcReferenceChecker*> Checkers; - { - SCOPED_TIMER(Stats.second.CreateReferenceCheckersMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Checkers = Referencer->CreateReferenceCheckers(Ctx); - } - try - { - if (!Checkers.empty()) - { - RwLock::ExclusiveLockScope __(ReferenceCheckersLock); - for (auto& Checker : Checkers) - { - ReferenceCheckers.insert_or_assign(std::unique_ptr<GcReferenceChecker>(Checker), Index); - Checker = nullptr; - } - } - } - catch (std::exception&) - { - while (!Checkers.empty()) - { - delete Checkers.back(); - Checkers.pop_back(); - } - throw; - } - }); - } - WorkLeft.CountDown(); - WorkLeft.Wait(); - } - - std::unordered_map<std::unique_ptr<GcReferenceStoreCompactor>, size_t> ReferenceStoreCompactors; - ReferenceStoreCompactors.reserve(ReferencePruners.size()); - - ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size()); - { - SCOPED_TIMER(uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); Result.WriteBlockMS = std::chrono::milliseconds(ElapsedMS); - ZEN_INFO("GCV2: Writes blocked for {}", NiceTimeSpanMs(ElapsedMS))); - if (!ReferenceCheckers.empty()) - { - // Locking all references checkers so we have a steady state of which references are used - // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until - // we delete the ReferenceCheckers - Latch WorkLeft(1); - - SCOPED_TIMER(Result.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - for (auto& It : ReferenceCheckers) - { - GcReferenceChecker* Checker = It.first.get(); - size_t Index = It.second; - std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index]; - WorkLeft.AddCount(1); - ThreadPool.ScheduleWork([&Ctx, Checker, Index, &Stats, &WorkLeft]() { - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - SCOPED_TIMER(Stats.second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Checker->LockState(Ctx); - }); - } - WorkLeft.CountDown(); - WorkLeft.Wait(); - } - - ZEN_INFO("GCV2: Removing unreferenced data for {} reference pruners", ReferencePruners.size()); - if (!ReferencePruners.empty()) - { - const auto GetUnusedReferences = [&ReferenceCheckers, &Ctx](std::span<IoHash> References) -> std::vector<IoHash> { - HashSet UnusedCids(References.begin(), References.end()); - for (const auto& It : ReferenceCheckers) - { - GcReferenceChecker* ReferenceChecker = It.first.get(); - ReferenceChecker->RemoveUsedReferencesFromSet(Ctx, UnusedCids); - if (UnusedCids.empty()) - { - return {}; - } - } - return std::vector<IoHash>(UnusedCids.begin(), UnusedCids.end()); - }; - - // checking all Cids agains references in cache - // Ask stores to remove data that the ReferenceCheckers says are not referenced - this should be a lightweight operation - // that only updates in-memory index, actual disk changes should be done by the ReferenceStoreCompactors - - Latch WorkLeft(1); - RwLock ReferenceStoreCompactorsLock; - - SCOPED_TIMER(Result.RemoveUnreferencedDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - for (auto& It : ReferencePruners) - { - GcReferencePruner* Pruner = It.second.get(); - size_t Index = It.first; - GcReferenceStoreStats& Stats = Result.ReferenceStoreStats[Index].second; - WorkLeft.AddCount(1); - ThreadPool.ScheduleWork([&Ctx, - Pruner, - &Stats, - &WorkLeft, - Index, - &GetUnusedReferences, - &ReferenceStoreCompactorsLock, - &ReferenceStoreCompactors]() { - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not. - std::unique_ptr<GcReferenceStoreCompactor> ReferenceCompactor; - { - SCOPED_TIMER(Stats.RemoveUnreferencedDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - ReferenceCompactor = - std::unique_ptr<GcReferenceStoreCompactor>(Pruner->RemoveUnreferencedData(Ctx, Stats, GetUnusedReferences)); - } - if (ReferenceCompactor) - { - RwLock::ExclusiveLockScope __(ReferenceStoreCompactorsLock); - ReferenceStoreCompactors.insert_or_assign(std::move(ReferenceCompactor), Index); - } - }); - } - WorkLeft.CountDown(); - WorkLeft.Wait(); - } - // Let the GcReferencers add new data, we will only change on-disk data at this point, adding new data is allowed - ReferenceCheckers.clear(); - } - - // Let go of the pruners - ReferencePruners.clear(); - - ZEN_INFO("GCV2: Compacting reference stores for {} reference store compactors", ReferenceStoreCompactors.size()); - if (!ReferenceStoreCompactors.empty()) - { - Latch WorkLeft(1); - - // Remove the stuff we deemed unreferenced from disk - may be heavy operation - SCOPED_TIMER(Result.CompactReferenceStoreMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - for (auto& It : ReferenceStoreCompactors) - { - GcReferenceStoreCompactor* Compactor = It.first.get(); - size_t Index = It.second; - GcReferenceStoreStats& Stats = Result.ReferenceStoreStats[Index].second; - WorkLeft.AddCount(1); - ThreadPool.ScheduleWork([&Ctx, Compactor, &Stats, &WorkLeft]() { - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not. - SCOPED_TIMER(Stats.CompactReferenceStoreMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Compactor->CompactReferenceStore(Ctx, Stats); - }); - } - WorkLeft.CountDown(); - WorkLeft.Wait(); - } - - ReferenceStoreCompactors.clear(); - - ZEN_INFO("GCV2: Completed in {}", NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); - } + WorkerThreadPool ThreadPool(WorkerThreadPoolCount); + + ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size()); + if (!m_GcReferencers.empty()) + { + Latch WorkLeft(1); + // First remove any cache keys that may own references + SCOPED_TIMER(Result.RemoveExpiredDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + for (size_t Index = 0; Index < m_GcReferencers.size(); Index++) + { + GcReferencer* Owner = m_GcReferencers[Index]; + std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index]; + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([&Ctx, Owner, &Stats, &WorkLeft]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + Stats.first = Owner->GetGcName(Ctx); + SCOPED_TIMER(Stats.second.RemoveExpiredDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Owner->RemoveExpiredData(Ctx, Stats.second); + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + + if (Ctx.Settings.SkipCidDelete) + { + Result.Sum(); + return Result; + } + + ZEN_INFO("GCV2: Creating reference pruners from {} reference stores", m_GcReferenceStores.size()); + std::unordered_map<size_t, std::unique_ptr<GcReferencePruner>> ReferencePruners; + if (!m_GcReferenceStores.empty()) + { + ReferencePruners.reserve(m_GcReferenceStores.size()); + Latch WorkLeft(1); + RwLock ReferencePrunersLock; + // CreateReferencePruner is usually not very heavy but big data sets change that + SCOPED_TIMER(Result.CreateReferencePrunerMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + for (size_t Index = 0; Index < m_GcReferenceStores.size(); Index++) + { + GcReferenceStore* ReferenceStore = m_GcReferenceStores[Index]; + std::pair<std::string, GcReferenceStoreStats>& Stats = Result.ReferenceStoreStats[Index]; + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([&Ctx, ReferenceStore, &Stats, Index, &WorkLeft, &ReferencePrunersLock, &ReferencePruners]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + Stats.first = ReferenceStore->GetGcName(Ctx); + std::unique_ptr<GcReferencePruner> ReferencePruner; + { + SCOPED_TIMER(Stats.second.CreateReferencePrunerMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + // The ReferenceStore will pick a list of CId entries to check, returning a collector + ReferencePruner = std::unique_ptr<GcReferencePruner>(ReferenceStore->CreateReferencePruner(Ctx, Stats.second)); + } + if (ReferencePruner) + { + RwLock::ExclusiveLockScope __(ReferencePrunersLock); + ReferencePruners.insert_or_assign(Index, std::move(ReferencePruner)); + } + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + + ZEN_INFO("GCV2: Creating reference checkers from {} referencers", m_GcReferencers.size()); + std::unordered_map<std::unique_ptr<GcReferenceChecker>, size_t> ReferenceCheckers; + if (!m_GcReferencers.empty()) + { + ReferenceCheckers.reserve(m_GcReferencers.size()); + Latch WorkLeft(1); + RwLock ReferenceCheckersLock; + SCOPED_TIMER(Result.CreateReferenceCheckersMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + // Lock all reference owners from changing the reference data and get access to check for referenced data + for (size_t Index = 0; Index < m_GcReferencers.size(); Index++) + { + GcReferencer* Referencer = m_GcReferencers[Index]; + std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index]; + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([&Ctx, &WorkLeft, Referencer, Index, &Stats, &ReferenceCheckersLock, &ReferenceCheckers]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + // The Referencer will create a reference checker that guarrantees that the references do not change as long as it lives + std::vector<GcReferenceChecker*> Checkers; + { + SCOPED_TIMER(Stats.second.CreateReferenceCheckersMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Checkers = Referencer->CreateReferenceCheckers(Ctx); + } + try + { + if (!Checkers.empty()) + { + RwLock::ExclusiveLockScope __(ReferenceCheckersLock); + for (auto& Checker : Checkers) + { + ReferenceCheckers.insert_or_assign(std::unique_ptr<GcReferenceChecker>(Checker), Index); + Checker = nullptr; + } + } + } + catch (std::exception&) + { + while (!Checkers.empty()) + { + delete Checkers.back(); + Checkers.pop_back(); + } + throw; + } + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + + std::unordered_map<std::unique_ptr<GcReferenceStoreCompactor>, size_t> ReferenceStoreCompactors; + ReferenceStoreCompactors.reserve(ReferencePruners.size()); + + ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size()); + { + SCOPED_TIMER(uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); Result.WriteBlockMS = std::chrono::milliseconds(ElapsedMS); + ZEN_INFO("GCV2: Writes blocked for {}", NiceTimeSpanMs(ElapsedMS))); + if (!ReferenceCheckers.empty()) + { + // Locking all references checkers so we have a steady state of which references are used + // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until + // we delete the ReferenceCheckers + Latch WorkLeft(1); + + SCOPED_TIMER(Result.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + for (auto& It : ReferenceCheckers) + { + GcReferenceChecker* Checker = It.first.get(); + size_t Index = It.second; + std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index]; + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([&Ctx, Checker, Index, &Stats, &WorkLeft]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + SCOPED_TIMER(Stats.second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Checker->LockState(Ctx); + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + + ZEN_INFO("GCV2: Removing unreferenced data for {} reference pruners", ReferencePruners.size()); + if (!ReferencePruners.empty()) + { + const auto GetUnusedReferences = [&ReferenceCheckers, &Ctx](std::span<IoHash> References) -> std::vector<IoHash> { + HashSet UnusedCids(References.begin(), References.end()); + for (const auto& It : ReferenceCheckers) + { + GcReferenceChecker* ReferenceChecker = It.first.get(); + ReferenceChecker->RemoveUsedReferencesFromSet(Ctx, UnusedCids); + if (UnusedCids.empty()) + { + return {}; + } + } + return std::vector<IoHash>(UnusedCids.begin(), UnusedCids.end()); + }; + + // checking all Cids agains references in cache + // Ask stores to remove data that the ReferenceCheckers says are not referenced - this should be a lightweight operation + // that only updates in-memory index, actual disk changes should be done by the ReferenceStoreCompactors + + Latch WorkLeft(1); + RwLock ReferenceStoreCompactorsLock; + + SCOPED_TIMER(Result.RemoveUnreferencedDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + for (auto& It : ReferencePruners) + { + GcReferencePruner* Pruner = It.second.get(); + size_t Index = It.first; + GcReferenceStoreStats& Stats = Result.ReferenceStoreStats[Index].second; + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([&Ctx, + Pruner, + &Stats, + &WorkLeft, + Index, + &GetUnusedReferences, + &ReferenceStoreCompactorsLock, + &ReferenceStoreCompactors]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not. + std::unique_ptr<GcReferenceStoreCompactor> ReferenceCompactor; + { + SCOPED_TIMER(Stats.RemoveUnreferencedDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + ReferenceCompactor = + std::unique_ptr<GcReferenceStoreCompactor>(Pruner->RemoveUnreferencedData(Ctx, Stats, GetUnusedReferences)); + } + if (ReferenceCompactor) + { + RwLock::ExclusiveLockScope __(ReferenceStoreCompactorsLock); + ReferenceStoreCompactors.insert_or_assign(std::move(ReferenceCompactor), Index); + } + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + // Let the GcReferencers add new data, we will only change on-disk data at this point, adding new data is allowed + ReferenceCheckers.clear(); + } + + // Let go of the pruners + ReferencePruners.clear(); + + ZEN_INFO("GCV2: Compacting reference stores for {} reference store compactors", ReferenceStoreCompactors.size()); + if (!ReferenceStoreCompactors.empty()) + { + Latch WorkLeft(1); + + // Remove the stuff we deemed unreferenced from disk - may be heavy operation + SCOPED_TIMER(Result.CompactReferenceStoreMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + for (auto& It : ReferenceStoreCompactors) + { + GcReferenceStoreCompactor* Compactor = It.first.get(); + size_t Index = It.second; + GcReferenceStoreStats& Stats = Result.ReferenceStoreStats[Index].second; + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([&Ctx, Compactor, &Stats, &WorkLeft]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not. + SCOPED_TIMER(Stats.CompactReferenceStoreMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Compactor->CompactReferenceStore(Ctx, Stats); + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); + } + + ReferenceStoreCompactors.clear(); + + ZEN_INFO("GCV2: Completed in {}", NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs())); + } Result.Sum(); return Result; -#undef SCOPED_TIMER } -//////// End New GC WIP +#undef SCOPED_TIMER + +//////// End GC V2 void GcManager::AddGcContributor(GcContributor* Contributor) diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index e2e99d5a4..1b9929216 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -52,7 +52,7 @@ public: static TimePoint TimePointFromTick(const Tick TickCount) { return TimePoint{Duration{TickCount}}; } }; -//////// Begin New GC WIP +//////// Begin GC V2 struct GcSettings { @@ -62,6 +62,7 @@ struct GcSettings bool IsDeleteMode = false; bool SkipCidDelete = false; bool Verbose = false; + bool SingleThread = false; }; struct GcReferencerStats @@ -237,7 +238,7 @@ public: virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) = 0; }; -//////// End New GC WIP +//////// End GC V2 /** Garbage Collection context object */ @@ -332,7 +333,7 @@ public: GcManager(); ~GcManager(); - //////// Begin New GC WIP + //////// Begin GC V2 void AddGcReferencer(GcReferencer& Referencer); void RemoveGcReferencer(GcReferencer& Referencer); @@ -342,7 +343,7 @@ public: GcResult CollectGarbage(const GcSettings& Settings); - //////// End New GC WIP + //////// End GC V2 void AddGcContributor(GcContributor* Contributor); void RemoveGcContributor(GcContributor* Contributor); |