aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-11-06 11:38:56 +0100
committerGitHub <[email protected]>2023-11-06 11:38:56 +0100
commitba972a77cb75facefd3f0da962cf34ea5a391884 (patch)
tree9e0fe9ad41dd6ba20b966065eaa9f1dc214ab531
parentkeep a "null" iobuffer core to reduce redundant memory allocations (#507) (diff)
downloadzen-ba972a77cb75facefd3f0da962cf34ea5a391884.tar.xz
zen-ba972a77cb75facefd3f0da962cf34ea5a391884.zip
multithread cache bucket (#508)
* Multithread init and flush of cache bucket * tweaked threading cound for bucket discovery, disklayer flush and gc v2
-rw-r--r--CHANGELOG.md1
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp87
-rw-r--r--src/zenstore/gc.cpp504
-rw-r--r--src/zenstore/include/zenstore/gc.h9
4 files changed, 324 insertions, 277 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b00754320..f80082e32 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,7 @@
- Feature: New options for zen command `gc-status`
- `--details` that enables the detailed output from the last GC operation when using GC V2
- Feature: New garbage collection implementation, still in evaluation mode. Enabled by `--gc-v2` command line option
+- Improvement: Multithread init and flush of cache bucket for faster startup and exit
- Bugfix: Build script now sets up arch properly when running tests on MacOS
## 0.2.30
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index fa80ed414..c0efdc76d 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -945,6 +945,8 @@ ZenCacheDiskLayer::CacheBucket::Flush()
}
auto FlushingGuard = MakeGuard([&] { m_IsFlushing.store(false); });
+ ZEN_INFO("Flushing bucket {}", m_BucketDir);
+
m_BlockStore.Flush(/*ForceNewBlock*/ false);
m_SlogFile.Flush();
@@ -3018,6 +3020,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
// Initialize buckets
std::vector<std::filesystem::path> BadBucketDirectories;
+ std::vector<std::filesystem::path> FoundBucketDirectories;
RwLock::ExclusiveLockScope _(m_Lock);
@@ -3037,26 +3040,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
continue;
}
- auto InsertResult =
- m_Buckets.emplace(BucketName,
- std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig));
- CacheBucket& Bucket = *InsertResult.first->second;
-
- try
- {
- if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false))
- {
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
-
- m_Buckets.erase(InsertResult.first);
- continue;
- }
- }
- catch (const std::exception& Err)
- {
- ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
- return;
- }
+ FoundBucketDirectories.push_back(BucketPath);
ZEN_INFO("Discovered bucket '{}'", BucketName);
}
@@ -3081,6 +3065,53 @@ ZenCacheDiskLayer::DiscoverBuckets()
ZEN_WARN("bad bucket delete failed for '{}'", BadBucketPath);
}
}
+
+ RwLock SyncLock;
+
+ const size_t MaxHwTreadUse = std::thread::hardware_concurrency();
+ const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, FoundBucketDirectories.size()));
+
+ WorkerThreadPool Pool(WorkerThreadPoolCount);
+ Latch WorkLatch(1);
+ for (auto& BucketPath : FoundBucketDirectories)
+ {
+ WorkLatch.AddCount(1);
+ Pool.ScheduleWork([&]() {
+ auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
+
+ const std::string BucketName = PathToUtf8(BucketPath.stem());
+ try
+ {
+ std::unique_ptr<CacheBucket> NewBucket =
+ std::make_unique<CacheBucket>(m_Gc, m_TotalMemCachedSize, BucketName, m_Configuration.BucketConfig);
+
+ CacheBucket* Bucket = nullptr;
+ {
+ RwLock::ExclusiveLockScope __(SyncLock);
+ auto InsertResult = m_Buckets.emplace(BucketName, std::move(NewBucket));
+ Bucket = InsertResult.first->second.get();
+ }
+ ZEN_ASSERT(Bucket);
+
+ if (!Bucket->OpenOrCreate(BucketPath, /* AllowCreate */ false))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+
+ {
+ RwLock::ExclusiveLockScope __(SyncLock);
+ m_Buckets.erase(BucketName);
+ }
+ }
+ }
+ catch (const std::exception& Err)
+ {
+ ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ return;
+ }
+ });
+ }
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
}
bool
@@ -3133,6 +3164,10 @@ ZenCacheDiskLayer::Flush()
{
RwLock::SharedLockScope _(m_Lock);
+ if (m_Buckets.empty())
+ {
+ return;
+ }
Buckets.reserve(m_Buckets.size());
for (auto& Kv : m_Buckets)
{
@@ -3140,11 +3175,21 @@ ZenCacheDiskLayer::Flush()
Buckets.push_back(Bucket);
}
}
+ const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u);
+ const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size()));
+ WorkerThreadPool Pool(WorkerThreadPoolCount);
+ Latch WorkLatch(1);
for (auto& Bucket : Buckets)
{
- Bucket->Flush();
+ WorkLatch.AddCount(1);
+ Pool.ScheduleWork([&]() {
+ auto _ = MakeGuard([&]() { WorkLatch.CountDown(); });
+ Bucket->Flush();
+ });
}
+ WorkLatch.CountDown();
+ WorkLatch.Wait();
}
void
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index 4d146c16c..f319475f3 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -327,7 +327,7 @@ GcManager::~GcManager()
{
}
-//////// Begin New GC WIP
+//////// Begin GC V2
void
GcResult::Sum()
@@ -396,6 +396,10 @@ GcManager::RemoveGcReferenceStore(GcReferenceStore& ReferenceStore)
std::erase_if(m_GcReferenceStores, [&](GcReferenceStore* $) { return $ == &ReferenceStore; });
}
+#define SCOPED_TIMER(closure) \
+ Stopwatch $Timer##__LINE__; \
+ auto $Guard##__LINE = MakeGuard([&, &Timer = $Timer##__LINE__]() { closure })
+
GcResult
GcManager::CollectGarbage(const GcSettings& Settings)
{
@@ -408,264 +412,260 @@ GcManager::CollectGarbage(const GcSettings& Settings)
RwLock::SharedLockScope GcLock(m_Lock);
- static const bool SingleThread =
-#if ZEN_BUILD_DEBUG
- true
-#else
- false
-#endif
- ;
+ int WorkerThreadPoolCount = 0;
+ if (!Settings.SingleThread)
+ {
+ const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u);
+ WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, m_GcReferencers.size()));
+ }
-#define SCOPED_TIMER(closure) \
- Stopwatch $Timer##__LINE__; \
- auto $Guard##__LINE = MakeGuard([&, &Timer = $Timer##__LINE__]() { closure })
+ Result.ReferencerStats.resize(m_GcReferencers.size());
+ Result.ReferenceStoreStats.resize(m_GcReferenceStores.size());
- Result.ReferencerStats.resize(m_GcReferencers.size());
- Result.ReferenceStoreStats.resize(m_GcReferenceStores.size());
-
- WorkerThreadPool ThreadPool(SingleThread ? 0 : 8);
-
- ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size());
- if (!m_GcReferencers.empty())
- {
- Latch WorkLeft(1);
- // First remove any cache keys that may own references
- SCOPED_TIMER(Result.RemoveExpiredDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- for (size_t Index = 0; Index < m_GcReferencers.size(); Index++)
- {
- GcReferencer* Owner = m_GcReferencers[Index];
- std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index];
- WorkLeft.AddCount(1);
- ThreadPool.ScheduleWork([&Ctx, Owner, &Stats, &WorkLeft]() {
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- Stats.first = Owner->GetGcName(Ctx);
- SCOPED_TIMER(Stats.second.RemoveExpiredDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Owner->RemoveExpiredData(Ctx, Stats.second);
- });
- }
- WorkLeft.CountDown();
- WorkLeft.Wait();
- }
-
- if (Ctx.Settings.SkipCidDelete)
- {
- Result.Sum();
- return Result;
- }
-
- ZEN_INFO("GCV2: Creating reference pruners from {} reference stores", m_GcReferenceStores.size());
- std::unordered_map<size_t, std::unique_ptr<GcReferencePruner>> ReferencePruners;
- if (!m_GcReferenceStores.empty())
- {
- ReferencePruners.reserve(m_GcReferenceStores.size());
- Latch WorkLeft(1);
- RwLock ReferencePrunersLock;
- // CreateReferencePruner is usually not very heavy but big data sets change that
- SCOPED_TIMER(Result.CreateReferencePrunerMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- for (size_t Index = 0; Index < m_GcReferenceStores.size(); Index++)
- {
- GcReferenceStore* ReferenceStore = m_GcReferenceStores[Index];
- std::pair<std::string, GcReferenceStoreStats>& Stats = Result.ReferenceStoreStats[Index];
- WorkLeft.AddCount(1);
- ThreadPool.ScheduleWork([&Ctx, ReferenceStore, &Stats, Index, &WorkLeft, &ReferencePrunersLock, &ReferencePruners]() {
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- Stats.first = ReferenceStore->GetGcName(Ctx);
- std::unique_ptr<GcReferencePruner> ReferencePruner;
- {
- SCOPED_TIMER(Stats.second.CreateReferencePrunerMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- // The ReferenceStore will pick a list of CId entries to check, returning a collector
- ReferencePruner = std::unique_ptr<GcReferencePruner>(ReferenceStore->CreateReferencePruner(Ctx, Stats.second));
- }
- if (ReferencePruner)
- {
- RwLock::ExclusiveLockScope __(ReferencePrunersLock);
- ReferencePruners.insert_or_assign(Index, std::move(ReferencePruner));
- }
- });
- }
- WorkLeft.CountDown();
- WorkLeft.Wait();
- }
-
- ZEN_INFO("GCV2: Creating reference checkers from {} referencers", m_GcReferencers.size());
- std::unordered_map<std::unique_ptr<GcReferenceChecker>, size_t> ReferenceCheckers;
- if (!m_GcReferencers.empty())
- {
- ReferenceCheckers.reserve(m_GcReferencers.size());
- Latch WorkLeft(1);
- RwLock ReferenceCheckersLock;
- SCOPED_TIMER(Result.CreateReferenceCheckersMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- // Lock all reference owners from changing the reference data and get access to check for referenced data
- for (size_t Index = 0; Index < m_GcReferencers.size(); Index++)
- {
- GcReferencer* Referencer = m_GcReferencers[Index];
- std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index];
- WorkLeft.AddCount(1);
- ThreadPool.ScheduleWork([&Ctx, &WorkLeft, Referencer, Index, &Stats, &ReferenceCheckersLock, &ReferenceCheckers]() {
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- // The Referencer will create a reference checker that guarrantees that the references do not change as long as it lives
- std::vector<GcReferenceChecker*> Checkers;
- {
- SCOPED_TIMER(Stats.second.CreateReferenceCheckersMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Checkers = Referencer->CreateReferenceCheckers(Ctx);
- }
- try
- {
- if (!Checkers.empty())
- {
- RwLock::ExclusiveLockScope __(ReferenceCheckersLock);
- for (auto& Checker : Checkers)
- {
- ReferenceCheckers.insert_or_assign(std::unique_ptr<GcReferenceChecker>(Checker), Index);
- Checker = nullptr;
- }
- }
- }
- catch (std::exception&)
- {
- while (!Checkers.empty())
- {
- delete Checkers.back();
- Checkers.pop_back();
- }
- throw;
- }
- });
- }
- WorkLeft.CountDown();
- WorkLeft.Wait();
- }
-
- std::unordered_map<std::unique_ptr<GcReferenceStoreCompactor>, size_t> ReferenceStoreCompactors;
- ReferenceStoreCompactors.reserve(ReferencePruners.size());
-
- ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size());
- {
- SCOPED_TIMER(uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); Result.WriteBlockMS = std::chrono::milliseconds(ElapsedMS);
- ZEN_INFO("GCV2: Writes blocked for {}", NiceTimeSpanMs(ElapsedMS)));
- if (!ReferenceCheckers.empty())
- {
- // Locking all references checkers so we have a steady state of which references are used
- // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until
- // we delete the ReferenceCheckers
- Latch WorkLeft(1);
-
- SCOPED_TIMER(Result.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- for (auto& It : ReferenceCheckers)
- {
- GcReferenceChecker* Checker = It.first.get();
- size_t Index = It.second;
- std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index];
- WorkLeft.AddCount(1);
- ThreadPool.ScheduleWork([&Ctx, Checker, Index, &Stats, &WorkLeft]() {
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- SCOPED_TIMER(Stats.second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Checker->LockState(Ctx);
- });
- }
- WorkLeft.CountDown();
- WorkLeft.Wait();
- }
-
- ZEN_INFO("GCV2: Removing unreferenced data for {} reference pruners", ReferencePruners.size());
- if (!ReferencePruners.empty())
- {
- const auto GetUnusedReferences = [&ReferenceCheckers, &Ctx](std::span<IoHash> References) -> std::vector<IoHash> {
- HashSet UnusedCids(References.begin(), References.end());
- for (const auto& It : ReferenceCheckers)
- {
- GcReferenceChecker* ReferenceChecker = It.first.get();
- ReferenceChecker->RemoveUsedReferencesFromSet(Ctx, UnusedCids);
- if (UnusedCids.empty())
- {
- return {};
- }
- }
- return std::vector<IoHash>(UnusedCids.begin(), UnusedCids.end());
- };
-
- // checking all Cids agains references in cache
- // Ask stores to remove data that the ReferenceCheckers says are not referenced - this should be a lightweight operation
- // that only updates in-memory index, actual disk changes should be done by the ReferenceStoreCompactors
-
- Latch WorkLeft(1);
- RwLock ReferenceStoreCompactorsLock;
-
- SCOPED_TIMER(Result.RemoveUnreferencedDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- for (auto& It : ReferencePruners)
- {
- GcReferencePruner* Pruner = It.second.get();
- size_t Index = It.first;
- GcReferenceStoreStats& Stats = Result.ReferenceStoreStats[Index].second;
- WorkLeft.AddCount(1);
- ThreadPool.ScheduleWork([&Ctx,
- Pruner,
- &Stats,
- &WorkLeft,
- Index,
- &GetUnusedReferences,
- &ReferenceStoreCompactorsLock,
- &ReferenceStoreCompactors]() {
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not.
- std::unique_ptr<GcReferenceStoreCompactor> ReferenceCompactor;
- {
- SCOPED_TIMER(Stats.RemoveUnreferencedDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- ReferenceCompactor =
- std::unique_ptr<GcReferenceStoreCompactor>(Pruner->RemoveUnreferencedData(Ctx, Stats, GetUnusedReferences));
- }
- if (ReferenceCompactor)
- {
- RwLock::ExclusiveLockScope __(ReferenceStoreCompactorsLock);
- ReferenceStoreCompactors.insert_or_assign(std::move(ReferenceCompactor), Index);
- }
- });
- }
- WorkLeft.CountDown();
- WorkLeft.Wait();
- }
- // Let the GcReferencers add new data, we will only change on-disk data at this point, adding new data is allowed
- ReferenceCheckers.clear();
- }
-
- // Let go of the pruners
- ReferencePruners.clear();
-
- ZEN_INFO("GCV2: Compacting reference stores for {} reference store compactors", ReferenceStoreCompactors.size());
- if (!ReferenceStoreCompactors.empty())
- {
- Latch WorkLeft(1);
-
- // Remove the stuff we deemed unreferenced from disk - may be heavy operation
- SCOPED_TIMER(Result.CompactReferenceStoreMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- for (auto& It : ReferenceStoreCompactors)
- {
- GcReferenceStoreCompactor* Compactor = It.first.get();
- size_t Index = It.second;
- GcReferenceStoreStats& Stats = Result.ReferenceStoreStats[Index].second;
- WorkLeft.AddCount(1);
- ThreadPool.ScheduleWork([&Ctx, Compactor, &Stats, &WorkLeft]() {
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not.
- SCOPED_TIMER(Stats.CompactReferenceStoreMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Compactor->CompactReferenceStore(Ctx, Stats);
- });
- }
- WorkLeft.CountDown();
- WorkLeft.Wait();
- }
-
- ReferenceStoreCompactors.clear();
-
- ZEN_INFO("GCV2: Completed in {}", NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()));
- }
+ WorkerThreadPool ThreadPool(WorkerThreadPoolCount);
+
+ ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size());
+ if (!m_GcReferencers.empty())
+ {
+ Latch WorkLeft(1);
+ // First remove any cache keys that may own references
+ SCOPED_TIMER(Result.RemoveExpiredDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ for (size_t Index = 0; Index < m_GcReferencers.size(); Index++)
+ {
+ GcReferencer* Owner = m_GcReferencers[Index];
+ std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index];
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx, Owner, &Stats, &WorkLeft]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ Stats.first = Owner->GetGcName(Ctx);
+ SCOPED_TIMER(Stats.second.RemoveExpiredDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Owner->RemoveExpiredData(Ctx, Stats.second);
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+
+ if (Ctx.Settings.SkipCidDelete)
+ {
+ Result.Sum();
+ return Result;
+ }
+
+ ZEN_INFO("GCV2: Creating reference pruners from {} reference stores", m_GcReferenceStores.size());
+ std::unordered_map<size_t, std::unique_ptr<GcReferencePruner>> ReferencePruners;
+ if (!m_GcReferenceStores.empty())
+ {
+ ReferencePruners.reserve(m_GcReferenceStores.size());
+ Latch WorkLeft(1);
+ RwLock ReferencePrunersLock;
+ // CreateReferencePruner is usually not very heavy but big data sets change that
+ SCOPED_TIMER(Result.CreateReferencePrunerMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ for (size_t Index = 0; Index < m_GcReferenceStores.size(); Index++)
+ {
+ GcReferenceStore* ReferenceStore = m_GcReferenceStores[Index];
+ std::pair<std::string, GcReferenceStoreStats>& Stats = Result.ReferenceStoreStats[Index];
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx, ReferenceStore, &Stats, Index, &WorkLeft, &ReferencePrunersLock, &ReferencePruners]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ Stats.first = ReferenceStore->GetGcName(Ctx);
+ std::unique_ptr<GcReferencePruner> ReferencePruner;
+ {
+ SCOPED_TIMER(Stats.second.CreateReferencePrunerMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ // The ReferenceStore will pick a list of CId entries to check, returning a collector
+ ReferencePruner = std::unique_ptr<GcReferencePruner>(ReferenceStore->CreateReferencePruner(Ctx, Stats.second));
+ }
+ if (ReferencePruner)
+ {
+ RwLock::ExclusiveLockScope __(ReferencePrunersLock);
+ ReferencePruners.insert_or_assign(Index, std::move(ReferencePruner));
+ }
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+
+ ZEN_INFO("GCV2: Creating reference checkers from {} referencers", m_GcReferencers.size());
+ std::unordered_map<std::unique_ptr<GcReferenceChecker>, size_t> ReferenceCheckers;
+ if (!m_GcReferencers.empty())
+ {
+ ReferenceCheckers.reserve(m_GcReferencers.size());
+ Latch WorkLeft(1);
+ RwLock ReferenceCheckersLock;
+ SCOPED_TIMER(Result.CreateReferenceCheckersMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ // Lock all reference owners from changing the reference data and get access to check for referenced data
+ for (size_t Index = 0; Index < m_GcReferencers.size(); Index++)
+ {
+ GcReferencer* Referencer = m_GcReferencers[Index];
+ std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index];
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx, &WorkLeft, Referencer, Index, &Stats, &ReferenceCheckersLock, &ReferenceCheckers]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ // The Referencer will create a reference checker that guarrantees that the references do not change as long as it lives
+ std::vector<GcReferenceChecker*> Checkers;
+ {
+ SCOPED_TIMER(Stats.second.CreateReferenceCheckersMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Checkers = Referencer->CreateReferenceCheckers(Ctx);
+ }
+ try
+ {
+ if (!Checkers.empty())
+ {
+ RwLock::ExclusiveLockScope __(ReferenceCheckersLock);
+ for (auto& Checker : Checkers)
+ {
+ ReferenceCheckers.insert_or_assign(std::unique_ptr<GcReferenceChecker>(Checker), Index);
+ Checker = nullptr;
+ }
+ }
+ }
+ catch (std::exception&)
+ {
+ while (!Checkers.empty())
+ {
+ delete Checkers.back();
+ Checkers.pop_back();
+ }
+ throw;
+ }
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+
+ std::unordered_map<std::unique_ptr<GcReferenceStoreCompactor>, size_t> ReferenceStoreCompactors;
+ ReferenceStoreCompactors.reserve(ReferencePruners.size());
+
+ ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size());
+ {
+ SCOPED_TIMER(uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); Result.WriteBlockMS = std::chrono::milliseconds(ElapsedMS);
+ ZEN_INFO("GCV2: Writes blocked for {}", NiceTimeSpanMs(ElapsedMS)));
+ if (!ReferenceCheckers.empty())
+ {
+ // Locking all references checkers so we have a steady state of which references are used
+ // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until
+ // we delete the ReferenceCheckers
+ Latch WorkLeft(1);
+
+ SCOPED_TIMER(Result.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ for (auto& It : ReferenceCheckers)
+ {
+ GcReferenceChecker* Checker = It.first.get();
+ size_t Index = It.second;
+ std::pair<std::string, GcReferencerStats>& Stats = Result.ReferencerStats[Index];
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx, Checker, Index, &Stats, &WorkLeft]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ SCOPED_TIMER(Stats.second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Checker->LockState(Ctx);
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+
+ ZEN_INFO("GCV2: Removing unreferenced data for {} reference pruners", ReferencePruners.size());
+ if (!ReferencePruners.empty())
+ {
+ const auto GetUnusedReferences = [&ReferenceCheckers, &Ctx](std::span<IoHash> References) -> std::vector<IoHash> {
+ HashSet UnusedCids(References.begin(), References.end());
+ for (const auto& It : ReferenceCheckers)
+ {
+ GcReferenceChecker* ReferenceChecker = It.first.get();
+ ReferenceChecker->RemoveUsedReferencesFromSet(Ctx, UnusedCids);
+ if (UnusedCids.empty())
+ {
+ return {};
+ }
+ }
+ return std::vector<IoHash>(UnusedCids.begin(), UnusedCids.end());
+ };
+
+ // checking all Cids agains references in cache
+ // Ask stores to remove data that the ReferenceCheckers says are not referenced - this should be a lightweight operation
+ // that only updates in-memory index, actual disk changes should be done by the ReferenceStoreCompactors
+
+ Latch WorkLeft(1);
+ RwLock ReferenceStoreCompactorsLock;
+
+ SCOPED_TIMER(Result.RemoveUnreferencedDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ for (auto& It : ReferencePruners)
+ {
+ GcReferencePruner* Pruner = It.second.get();
+ size_t Index = It.first;
+ GcReferenceStoreStats& Stats = Result.ReferenceStoreStats[Index].second;
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx,
+ Pruner,
+ &Stats,
+ &WorkLeft,
+ Index,
+ &GetUnusedReferences,
+ &ReferenceStoreCompactorsLock,
+ &ReferenceStoreCompactors]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not.
+ std::unique_ptr<GcReferenceStoreCompactor> ReferenceCompactor;
+ {
+ SCOPED_TIMER(Stats.RemoveUnreferencedDataMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ ReferenceCompactor =
+ std::unique_ptr<GcReferenceStoreCompactor>(Pruner->RemoveUnreferencedData(Ctx, Stats, GetUnusedReferences));
+ }
+ if (ReferenceCompactor)
+ {
+ RwLock::ExclusiveLockScope __(ReferenceStoreCompactorsLock);
+ ReferenceStoreCompactors.insert_or_assign(std::move(ReferenceCompactor), Index);
+ }
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+ // Let the GcReferencers add new data, we will only change on-disk data at this point, adding new data is allowed
+ ReferenceCheckers.clear();
+ }
+
+ // Let go of the pruners
+ ReferencePruners.clear();
+
+ ZEN_INFO("GCV2: Compacting reference stores for {} reference store compactors", ReferenceStoreCompactors.size());
+ if (!ReferenceStoreCompactors.empty())
+ {
+ Latch WorkLeft(1);
+
+ // Remove the stuff we deemed unreferenced from disk - may be heavy operation
+ SCOPED_TIMER(Result.CompactReferenceStoreMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ for (auto& It : ReferenceStoreCompactors)
+ {
+ GcReferenceStoreCompactor* Compactor = It.first.get();
+ size_t Index = It.second;
+ GcReferenceStoreStats& Stats = Result.ReferenceStoreStats[Index].second;
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([&Ctx, Compactor, &Stats, &WorkLeft]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ // Go through all the ReferenceCheckers to see if the list of Cids the collector selected are referenced or not.
+ SCOPED_TIMER(Stats.CompactReferenceStoreMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Compactor->CompactReferenceStore(Ctx, Stats);
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ }
+
+ ReferenceStoreCompactors.clear();
+
+ ZEN_INFO("GCV2: Completed in {}", NiceTimeSpanMs(TotalTimer.GetElapsedTimeMs()));
+ }
Result.Sum();
return Result;
-#undef SCOPED_TIMER
}
-//////// End New GC WIP
+#undef SCOPED_TIMER
+
+//////// End GC V2
void
GcManager::AddGcContributor(GcContributor* Contributor)
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index e2e99d5a4..1b9929216 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -52,7 +52,7 @@ public:
static TimePoint TimePointFromTick(const Tick TickCount) { return TimePoint{Duration{TickCount}}; }
};
-//////// Begin New GC WIP
+//////// Begin GC V2
struct GcSettings
{
@@ -62,6 +62,7 @@ struct GcSettings
bool IsDeleteMode = false;
bool SkipCidDelete = false;
bool Verbose = false;
+ bool SingleThread = false;
};
struct GcReferencerStats
@@ -237,7 +238,7 @@ public:
virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) = 0;
};
-//////// End New GC WIP
+//////// End GC V2
/** Garbage Collection context object
*/
@@ -332,7 +333,7 @@ public:
GcManager();
~GcManager();
- //////// Begin New GC WIP
+ //////// Begin GC V2
void AddGcReferencer(GcReferencer& Referencer);
void RemoveGcReferencer(GcReferencer& Referencer);
@@ -342,7 +343,7 @@ public:
GcResult CollectGarbage(const GcSettings& Settings);
- //////// End New GC WIP
+ //////// End GC V2
void AddGcContributor(GcContributor* Contributor);
void RemoveGcContributor(GcContributor* Contributor);