diff options
| author | Dan Engelbrecht <[email protected]> | 2024-06-13 08:53:01 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-06-13 08:53:01 +0200 |
| commit | b71d52375e41a084e661d0f55f044ca8982312a4 (patch) | |
| tree | f44e74a13e29d50ab36d1eebfdb4c7e606d879a9 /src/zenstore | |
| parent | 5.5.3-pre1 (diff) | |
| download | zen-b71d52375e41a084e661d0f55f044ca8982312a4.tar.xz zen-b71d52375e41a084e661d0f55f044ca8982312a4.zip | |
Make sure we monitor for new project, oplogs, namespaces and buckets during GCv2 (#93)
- Bugfix: Make sure we monitor and include new project/oplogs created during GCv2
- Bugfix: Make sure we monitor and include new namespaces/cache buckets created during GCv2
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 305 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 235 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 124 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 12 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/structuredcachestore.h | 25 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 45 |
6 files changed, 571 insertions, 175 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 9dd2e4a67..f865e1c3c 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -3383,6 +3383,101 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); } +bool +ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHash>& OutReferences) +{ + auto GetAttachments = [&](const void* CbObjectData) { + CbObjectView Obj(CbObjectData); + Obj.IterateAttachments([&](CbFieldView Field) { OutReferences.emplace_back(Field.AsAttachment()); }); + }; + + std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys; + { + std::vector<IoHash> InlineKeys; + std::vector<BlockStoreLocation> InlineLocations; + std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes; + + { + std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes; + + for (const auto& Entry : m_Index) + { + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + + PayloadIndex EntryIndex = Entry.second; + const BucketPayload& Payload = m_Payloads[EntryIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + const IoHash& Key = Entry.first; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneKeys.push_back(std::make_pair(Key, Loc)); + continue; + } + + BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); + size_t ChunkIndex = InlineLocations.size(); + InlineLocations.push_back(ChunkLocation); + InlineKeys.push_back(Key); + if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end()) + { + InlineBlockChunkIndexes[It->second].push_back(ChunkIndex); + } + else + { + BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size()); + InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex}); + } + } + } + + for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes) + { + ZEN_ASSERT(!ChunkIndexes.empty()); + + bool Continue = m_BlockStore.IterateBlock( + InlineLocations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ZEN_UNUSED(ChunkIndex, Size); + GetAttachments(Data); + return !Ctx.IsCancelledFlag.load(); + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + ZEN_UNUSED(ChunkIndex); + GetAttachments(File.GetChunk(Offset, Size).GetData()); + return !Ctx.IsCancelledFlag.load(); + }); + + if (!Continue && Ctx.IsCancelledFlag.load()) + { + return false; + } + } + } + for (const auto& It : StandaloneKeys) + { + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + + IoBuffer Buffer = GetStandaloneCacheValue(It.second, It.first); + if (Buffer) + { + GetAttachments(Buffer.GetData()); + } + } + return true; +} + class DiskBucketReferenceChecker : public GcReferenceChecker { using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; @@ -3396,7 +3491,6 @@ public: { try { - m_IndexLock.reset(); m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } catch (const std::exception& Ex) @@ -3419,114 +3513,25 @@ public: } ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, - m_PrecachedReferences.size(), + m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - auto GetAttachments = [&](const void* CbObjectData) { - CbObjectView Obj(CbObjectData); - Obj.IterateAttachments([&](CbFieldView Field) { m_PrecachedReferences.emplace_back(Field.AsAttachment()); }); - }; - - // Refresh cache - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); }); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); }); - std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys; - { - std::vector<IoHash> InlineKeys; - std::vector<BlockStoreLocation> InlineLocations; - std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes; - - { - std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes; - - RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); - for (const auto& Entry : m_CacheBucket.m_Index) - { - if (Ctx.IsCancelledFlag.load()) - { - IndexLock.ReleaseNow(); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } - - PayloadIndex EntryIndex = Entry.second; - const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; - const DiskLocation& Loc = Payload.Location; - - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - const IoHash& Key = Entry.first; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - StandaloneKeys.push_back(std::make_pair(Key, Loc)); - continue; - } - - BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment); - size_t ChunkIndex = InlineLocations.size(); - InlineLocations.push_back(ChunkLocation); - InlineKeys.push_back(Key); - if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end()) - { - InlineBlockChunkIndexes[It->second].push_back(ChunkIndex); - } - else - { - BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size()); - InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex}); - } - } - } - - for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes) - { - ZEN_ASSERT(!ChunkIndexes.empty()); - - bool Continue = m_CacheBucket.m_BlockStore.IterateBlock( - InlineLocations, - ChunkIndexes, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ZEN_UNUSED(ChunkIndex, Size); - GetAttachments(Data); - return !Ctx.IsCancelledFlag.load(); - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ZEN_UNUSED(ChunkIndex); - GetAttachments(File.GetChunk(Offset, Size).GetData()); - return !Ctx.IsCancelledFlag.load(); - }); - - if (!Continue && Ctx.IsCancelledFlag.load()) - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } - } - } - for (const auto& It : StandaloneKeys) - { - if (Ctx.IsCancelledFlag.load()) - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } + RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); + bool Continue = m_CacheBucket.GetReferencesLocked(Ctx, m_References); + IndexLock.ReleaseNow(); - IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(It.second, It.first); - if (Buffer) - { - GetAttachments(Buffer.GetData()); - } - } + if (!Continue) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } } - virtual void LockState(GcCtx& Ctx) override + virtual void UpdateLockedState(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Z$::Bucket::LockState"); + ZEN_TRACE_CPU("Z$::Bucket::UpdateLockedState"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3536,32 +3541,28 @@ public: } ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, - m_PrecachedReferences.size() + m_UncachedReferences.size(), + m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock); if (Ctx.IsCancelledFlag.load()) { - m_UncachedReferences = {}; - m_IndexLock.reset(); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); + m_References = {}; + m_CacheBucket.m_TrackedReferences.reset(); return; } ZEN_ASSERT(m_CacheBucket.m_TrackedReferences); HashSet& AddedReferences(*m_CacheBucket.m_TrackedReferences); - m_UncachedReferences.reserve(AddedReferences.size()); - m_UncachedReferences.insert(m_UncachedReferences.end(), AddedReferences.begin(), AddedReferences.end()); + m_References.reserve(m_References.size() + AddedReferences.size()); + m_References.insert(m_References.end(), AddedReferences.begin(), AddedReferences.end()); AddedReferences = {}; } virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet"); - - ZEN_ASSERT(m_IndexLock); size_t InitialCount = IoCids.size(); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3576,18 +3577,7 @@ public: NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - for (const IoHash& ReferenceHash : m_PrecachedReferences) - { - if (IoCids.erase(ReferenceHash) == 1) - { - if (IoCids.empty()) - { - return; - } - } - } - - for (const IoHash& ReferenceHash : m_UncachedReferences) + for (const IoHash& ReferenceHash : m_References) { if (IoCids.erase(ReferenceHash) == 1) { @@ -3598,10 +3588,8 @@ public: } } } - CacheBucket& m_CacheBucket; - std::unique_ptr<RwLock::SharedLockScope> m_IndexLock; - std::vector<IoHash> m_PrecachedReferences; - std::vector<IoHash> m_UncachedReferences; + CacheBucket& m_CacheBucket; + std::vector<IoHash> m_References; }; std::vector<GcReferenceChecker*> @@ -3673,6 +3661,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, Reset(m_FreeMemCachedPayloads); } +RwLock::SharedLockScope +ZenCacheDiskLayer::CacheBucket::GetGcReferencerLock() +{ + return RwLock::SharedLockScope(m_IndexLock); +} + #if ZEN_WITH_TESTS void ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time) @@ -3763,7 +3757,12 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) CacheBucket* Result = Bucket.get(); m_Buckets.emplace(BucketName, std::move(Bucket)); - + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedBuckets) + { + m_CapturedBuckets->push_back(std::string(BucketName)); + } + }); return Result; } @@ -4317,6 +4316,60 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st return Details; } +std::vector<RwLock::SharedLockScope> +ZenCacheDiskLayer::GetGcReferencerLocks() +{ + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_Lock)); + for (auto& Kv : m_Buckets) + { + Locks.emplace_back(Kv.second->GetGcReferencerLock()); + } + return Locks; +} + +void +ZenCacheDiskLayer::EnableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedBuckets); + m_CapturedBuckets = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedBuckets); + } + m_UpdateCaptureRefCounter++; + }); +} + +void +ZenCacheDiskLayer::DisableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedBuckets); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedBuckets.reset(); + } + }); +} + +std::vector<std::string> +ZenCacheDiskLayer::GetCapturedBuckets() +{ + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedBuckets) + { + return *m_CapturedBuckets; + } + return {}; +} + void ZenCacheDiskLayer::MemCacheTrim() { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index d9c2d3e59..d7d594af7 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -375,6 +375,23 @@ ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const st return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter); } +std::vector<RwLock::SharedLockScope> +ZenCacheNamespace::GetGcReferencerLocks() +{ + return m_DiskLayer.GetGcReferencerLocks(); +} + +void +ZenCacheNamespace::EnableUpdateCapture() +{ + m_DiskLayer.EnableUpdateCapture(); +} +void +ZenCacheNamespace::DisableUpdateCapture() +{ + m_DiskLayer.DisableUpdateCapture(); +} + #if ZEN_WITH_TESTS void ZenCacheNamespace::SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time) @@ -439,11 +456,15 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName), m_Configuration.NamespaceConfig); } + m_Gc.AddGcReferencer(*this); + m_Gc.AddGcReferenceLocker(*this); } ZenCacheStore::~ZenCacheStore() { ZEN_INFO("closing cache store at '{}'", m_BasePath); + m_Gc.RemoveGcReferenceLocker(*this); + m_Gc.RemoveGcReferencer(*this); SetLoggingConfig({.EnableWriteLog = false, .EnableAccessLog = false}); m_Namespaces.clear(); } @@ -844,6 +865,14 @@ ZenCacheStore::GetNamespace(std::string_view Namespace) m_JobQueue, m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace), m_Configuration.NamespaceConfig)); + + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedNamespaces) + { + m_CapturedNamespaces->push_back(std::string(Namespace)); + } + }); + return NewNamespace.first->second.get(); } @@ -1003,6 +1032,212 @@ ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view Bu return {}; } +std::vector<RwLock::SharedLockScope> +ZenCacheStore::LockState(GcCtx&) +{ + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_NamespacesLock)); + for (auto& NamespaceIt : m_Namespaces) + { + std::vector<RwLock::SharedLockScope> NamespaceLocks = NamespaceIt.second->GetGcReferencerLocks(); + for (auto It = std::make_move_iterator(NamespaceLocks.begin()); It != std::make_move_iterator(NamespaceLocks.end()); It++) + { + Locks.emplace_back(std::move(*It)); + } + } + return Locks; +} + +void +ZenCacheStore::EnableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedNamespaces); + m_CapturedNamespaces = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedNamespaces); + } + m_UpdateCaptureRefCounter++; + }); + for (auto& NamespaceIt : m_Namespaces) + { + NamespaceIt.second->EnableUpdateCapture(); + } +} + +void +ZenCacheStore::DisableUpdateCapture() +{ + for (auto& NamespaceIt : m_Namespaces) + { + NamespaceIt.second->DisableUpdateCapture(); + } + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedNamespaces); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedNamespaces.reset(); + } + }); +} + +std::vector<std::string> +ZenCacheStore::GetCapturedNamespaces() +{ + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedNamespaces) + { + return *m_CapturedNamespaces; + } + return {}; +} + +std::string +ZenCacheStore::GetGcName(GcCtx&) +{ + return fmt::format("zencachestore: '{}'", m_BasePath.string()); +} + +GcStoreCompactor* +ZenCacheStore::RemoveExpiredData(GcCtx&, GcStats&) +{ + return nullptr; +} + +class CacheStoreReferenceChecker : public GcReferenceChecker +{ +public: + CacheStoreReferenceChecker(ZenCacheStore& InCacheStore) : m_CacheStore(InCacheStore) { m_CacheStore.EnableUpdateCapture(); } + + virtual ~CacheStoreReferenceChecker() + { + try + { + m_CacheStore.DisableUpdateCapture(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~CacheStoreReferenceChecker threw exception: '{}'", Ex.what()); + } + } + + virtual std::string GetGcName(GcCtx&) override { return "cachestore"; } + virtual void PreCache(GcCtx&) override {} + virtual void UpdateLockedState(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Z$::UpdateLockedState"); + + Stopwatch Timer; + + std::vector<ZenCacheDiskLayer::CacheBucket*> AddedBuckets; + + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachestore [LOCKSTATE] '{}': found {} references in {} in {} new buckets", + "cachestore", + m_References.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + AddedBuckets.size()); + }); + + std::vector<std::string> AddedNamespaces = m_CacheStore.GetCapturedNamespaces(); + + for (const std::string& AddedNamespace : AddedNamespaces) + { + if (auto It = m_CacheStore.m_Namespaces.find(AddedNamespace); It != m_CacheStore.m_Namespaces.end()) + { + ZenCacheNamespace& Namespace = *It->second; + for (auto& BucketKV : Namespace.m_DiskLayer.m_Buckets) + { + AddedBuckets.push_back(BucketKV.second.get()); + } + } + } + for (auto& NamepaceKV : m_CacheStore.m_Namespaces) + { + ZenCacheNamespace& Namespace = *NamepaceKV.second; + std::vector<std::string> NamespaceAddedBuckets = Namespace.m_DiskLayer.GetCapturedBuckets(); + for (const std::string& AddedBucketName : NamespaceAddedBuckets) + { + if (auto It = Namespace.m_DiskLayer.m_Buckets.find(AddedBucketName); It != Namespace.m_DiskLayer.m_Buckets.end()) + { + AddedBuckets.push_back(It->second.get()); + } + } + } + + for (ZenCacheDiskLayer::CacheBucket* Bucket : AddedBuckets) + { + bool Continue = Bucket->GetReferencesLocked(Ctx, m_References); + if (!Continue) + { + break; + } + } + } + + virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override + { + ZEN_TRACE_CPU("Z$::RemoveUsedReferencesFromSet"); + + size_t InitialCount = IoCids.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + "projectstore", + InitialCount - IoCids.size(), + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + for (const IoHash& ReferenceHash : m_References) + { + if (IoCids.erase(ReferenceHash) == 1) + { + if (IoCids.empty()) + { + return; + } + } + } + } + +private: + ZenCacheStore& m_CacheStore; + std::vector<IoHash> m_References; +}; + +std::vector<GcReferenceChecker*> +ZenCacheStore::CreateReferenceCheckers(GcCtx& Ctx) +{ + ZEN_TRACE_CPU("CacheStore::CreateReferenceCheckers"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachestore [CREATE CHECKERS] '{}': completed in {}", m_BasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + std::vector<GcReferenceChecker*> Checkers; + Checkers.emplace_back(new CacheStoreReferenceChecker(*this)); + return Checkers; +} + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index e8cf6ec5e..8db34b9c5 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -382,7 +382,7 @@ WriteReferencerStats(CbObjectWriter& Writer, const GcReferencerStats& Stats, boo Writer << "CreateReferenceCheckers" << ToTimeSpan(Stats.CreateReferenceCheckersMS); Writer << "PreCacheState" << ToTimeSpan(Stats.PreCacheStateMS); - Writer << "LockState" << ToTimeSpan(Stats.LockStateMS); + Writer << "UpdateLockedState" << ToTimeSpan(Stats.UpdateLockedStateMS); Writer << "Elapsed" << ToTimeSpan(Stats.ElapsedMS); }; @@ -452,6 +452,7 @@ WriteGCResult(CbObjectWriter& Writer, const GcResult& Result, bool HumanReadable Writer << "CreateReferenceCheckers" << ToTimeSpan(Result.CreateReferenceCheckersMS); Writer << "PreCacheState" << ToTimeSpan(Result.PreCacheStateMS); Writer << "LockState" << ToTimeSpan(Result.LockStateMS); + Writer << "UpdateLockedState" << ToTimeSpan(Result.UpdateLockedStateMS); Writer << "CreateReferencePruners" << ToTimeSpan(Result.CreateReferencePrunersMS); Writer << "RemoveUnreferencedData" << ToTimeSpan(Result.RemoveUnreferencedDataMS); @@ -510,7 +511,7 @@ void Sum(GcReferencerStats& Stat) { Stat.ElapsedMS = Stat.RemoveExpiredDataStats.ElapsedMS + Stat.CompactStoreStats.ElapsedMS + Stat.CreateReferenceCheckersMS + - Stat.PreCacheStateMS + Stat.LockStateMS; + Stat.PreCacheStateMS + Stat.UpdateLockedStateMS; } void @@ -521,7 +522,7 @@ Add(GcReferencerStats& Sum, const GcReferencerStats& Sub) Sum.CreateReferenceCheckersMS += Sub.CreateReferenceCheckersMS; Sum.PreCacheStateMS += Sub.PreCacheStateMS; - Sum.LockStateMS += Sub.LockStateMS; + Sum.UpdateLockedStateMS += Sub.UpdateLockedStateMS; Sum.ElapsedMS += Sub.ElapsedMS; } @@ -584,6 +585,19 @@ GcManager::RemoveGcReferencer(GcReferencer& Referencer) } void +GcManager::AddGcReferenceLocker(GcReferenceLocker& ReferenceLocker) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_GcReferencerLockers.push_back(&ReferenceLocker); +} +void +GcManager::RemoveGcReferenceLocker(GcReferenceLocker& ReferenceLocker) +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::erase_if(m_GcReferencerLockers, [&](GcReferenceLocker* $) { return $ == &ReferenceLocker; }); +} + +void GcManager::AddGcReferenceStore(GcReferenceStore& ReferenceStore) { RwLock::ExclusiveLockScope _(m_Lock); @@ -879,59 +893,90 @@ GcManager::CollectGarbage(const GcSettings& Settings) } } + std::vector<RwLock::SharedLockScope> LockerScopes; SCOPED_TIMER(uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); Result.WriteBlockMS = std::chrono::milliseconds(ElapsedMS); ZEN_INFO("GCV2: Writes blocked for {}", NiceTimeSpanMs(ElapsedMS))); { - ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size()); if (!ReferenceCheckers.empty()) { if (CheckGCCancel()) { return Sum(Result, true); } - ZEN_TRACE_CPU("GcV2::LockState"); - - // Locking all references checkers so we have a steady state of which references are used - // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until - // we delete the ReferenceCheckers - Latch WorkLeft(1); - + ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size()); { - SCOPED_TIMER(Result.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()); - if (Ctx.Settings.Verbose) { - ZEN_INFO("GCV2: Locked state using {} reference checkers in {}", - ReferenceCheckers.size(), - NiceTimeSpanMs(Result.LockStateMS.count())); - }); - for (auto& It : ReferenceCheckers) + ZEN_TRACE_CPU("GcV2::LockReferencers"); + // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until + // we delete the ReferenceLockers + Latch WorkLeft(1); { - if (CheckGCCancel()) + SCOPED_TIMER(Result.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()); + if (Ctx.Settings.Verbose) { + ZEN_INFO("GCV2: Locked referencers using {} reference lockers in {}", + ReferenceCheckers.size(), + NiceTimeSpanMs(Result.LockStateMS.count())); + }); + for (GcReferenceLocker* ReferenceLocker : m_GcReferencerLockers) { - WorkLeft.CountDown(); - WorkLeft.Wait(); - return Sum(Result, true); - } - - GcReferenceChecker* Checker = It.first.get(); - size_t Index = It.second; - std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index]; - WorkLeft.AddCount(1); - ThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() { - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - try + std::vector<RwLock::SharedLockScope> LockScopes = ReferenceLocker->LockState(Ctx); + for (auto It = std::make_move_iterator(LockScopes.begin()); + It != std::make_move_iterator(LockScopes.end()); + It++) { - SCOPED_TIMER(Stats->second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Checker->LockState(Ctx); + LockerScopes.emplace_back(std::move(*It)); } - catch (const std::exception& Ex) + } + } + } + ZEN_INFO("GCV2: Updating locked state for {} reference checkers", ReferenceCheckers.size()); + { + ZEN_TRACE_CPU("GcV2::UpdateLockedState"); + + // Locking all references checkers so we have a steady state of which references are used + // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until + // we delete the ReferenceCheckers + Latch WorkLeft(1); + + { + SCOPED_TIMER(Result.UpdateLockedStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()); + if (Ctx.Settings.Verbose) { + ZEN_INFO("GCV2: Updated locked state using {} reference checkers in {}", + ReferenceCheckers.size(), + NiceTimeSpanMs(Result.UpdateLockedStateMS.count())); + }); + for (auto& It : ReferenceCheckers) + { + if (CheckGCCancel()) { - ZEN_ERROR("GCV2: Failed locking state for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what()); - SetCancelGC(true); + WorkLeft.CountDown(); + WorkLeft.Wait(); + return Sum(Result, true); } - }); + + GcReferenceChecker* Checker = It.first.get(); + size_t Index = It.second; + std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index]; + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + try + { + SCOPED_TIMER(Stats->second.UpdateLockedStateMS = + std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Checker->UpdateLockedState(Ctx); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'", + Checker->GetGcName(Ctx), + Ex.what()); + SetCancelGC(true); + } + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); } - WorkLeft.CountDown(); - WorkLeft.Wait(); } } } @@ -1020,6 +1065,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) } } // Let the GcReferencers add new data, we will only change on-disk data at this point, adding new data is allowed + LockerScopes.clear(); ReferenceCheckers.clear(); ReferencePruners.clear(); } diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 9dee4d3f7..537f4396a 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -197,6 +197,12 @@ public: CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; + std::vector<RwLock::SharedLockScope> GetGcReferencerLocks(); + + void EnableUpdateCapture(); + void DisableUpdateCapture(); + std::vector<std::string> GetCapturedBuckets(); + #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS @@ -227,6 +233,8 @@ public: void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); void CollectGarbage(GcContext& GcCtx); + RwLock::SharedLockScope GetGcReferencerLock(); + bool GetReferencesLocked(GcCtx& Ctx, std::vector<IoHash>& OutReferences); inline GcStorageSize StorageSize() const { @@ -461,12 +469,16 @@ private: mutable RwLock m_Lock; std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; + mutable RwLock m_UpdateCaptureLock; + uint32_t m_UpdateCaptureRefCounter = 0; + std::unique_ptr<std::vector<std::string>> m_CapturedBuckets; ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; friend class DiskBucketStoreCompactor; friend class DiskBucketReferenceChecker; + friend class CacheStoreReferenceChecker; }; } // namespace zen diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 7460d01ce..9160db667 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -48,6 +48,7 @@ class JobQueue; projects from each other. */ + class ZenCacheNamespace final : public GcStorage, public GcContributor { public: @@ -118,6 +119,11 @@ public: CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; + std::vector<RwLock::SharedLockScope> GetGcReferencerLocks(); + + void EnableUpdateCapture(); + void DisableUpdateCapture(); + #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS @@ -137,6 +143,8 @@ private: ZenCacheNamespace(const ZenCacheNamespace&) = delete; ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete; + + friend class CacheStoreReferenceChecker; }; /** Cache store interface @@ -145,7 +153,7 @@ private: */ -class ZenCacheStore final : public RefCounted, public StatsProvider +class ZenCacheStore final : public RefCounted, public StatsProvider, public GcReferencer, public GcReferenceLocker { public: static constexpr std::string_view DefaultNamespace = @@ -271,6 +279,16 @@ public: // StatsProvider virtual void ReportMetrics(StatsMetrics& Statsd) override; + virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override; + + virtual std::string GetGcName(GcCtx& Ctx) override; + virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + + void EnableUpdateCapture(); + void DisableUpdateCapture(); + std::vector<std::string> GetCapturedNamespaces(); + private: const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const; ZenCacheNamespace* GetNamespace(std::string_view Namespace); @@ -283,6 +301,9 @@ private: mutable RwLock m_NamespacesLock; NamespaceMap m_Namespaces; std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces; + mutable RwLock m_UpdateCaptureLock; + uint32_t m_UpdateCaptureRefCounter = 0; + std::unique_ptr<std::vector<std::string>> m_CapturedNamespaces; GcManager& m_Gc; JobQueue& m_JobQueue; @@ -314,6 +335,8 @@ private: std::thread m_AsyncLoggingThread; std::atomic_bool m_WriteLogEnabled; std::atomic_bool m_AccessLogEnabled; + + friend class CacheStoreReferenceChecker; }; void structured_cachestore_forcelink(); diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index 5262c6d2e..c3a71baa6 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -88,7 +88,7 @@ struct GcReferencerStats std::chrono::milliseconds CreateReferenceCheckersMS = {}; std::chrono::milliseconds PreCacheStateMS = {}; - std::chrono::milliseconds LockStateMS = {}; + std::chrono::milliseconds UpdateLockedStateMS = {}; std::chrono::milliseconds ElapsedMS = {}; }; @@ -115,6 +115,7 @@ struct GcResult std::chrono::milliseconds CreateReferenceCheckersMS = {}; std::chrono::milliseconds PreCacheStateMS = {}; std::chrono::milliseconds LockStateMS = {}; + std::chrono::milliseconds UpdateLockedStateMS = {}; std::chrono::milliseconds CreateReferencePrunersMS = {}; std::chrono::milliseconds RemoveUnreferencedDataMS = {}; @@ -177,13 +178,19 @@ public: virtual std::string GetGcName(GcCtx& Ctx) = 0; + // Read as much of the current state - nothing is locked for you here so you need to lock as appropriate virtual void PreCache(GcCtx& Ctx) = 0; - // Lock the state and make sure no references changes, usually a read-lock is taken until the destruction - // of the instance. Called once before any calls to RemoveUsedReferencesFromSet - // The implementation should be as fast as possible as LockState is part of a stop the world (from changes) - // until all instances of GcReferenceChecker are deleted - virtual void LockState(GcCtx& Ctx) = 0; + // Update the state after all ReferenceCheckers has completed PreCache and all ReferenceLockers has + // completed their LockState operation. + // At this stage all data that UpdateLockedState needs to touch should be locked by the ReferenceLockers. + // *IMPORTANT* Do *not* take any locks (shared or exclusive) in this code. + // This is because we need to acquire the locks in an ordered manner and not end up in a deadlock due to other code + // trying to get exclusive locks halfway through our execution. + // Called once before any calls to RemoveUsedReferencesFromSet. + // The implementation should be as fast as possible as UpdateLockedState is part of a stop the world (from changes) + // until all instances of GcReferenceChecker UpdateLockedState are completed + virtual void UpdateLockedState(GcCtx& Ctx) = 0; // Go through IoCids and see which ones are referenced. If it is the reference must be removed from IoCids // This function should use pre-cached information on what is referenced as we are in stop the world mode @@ -191,6 +198,22 @@ public: }; /** + * @brief An interface to implement a lock for Stop The World (from writing new data) + * + * This interface is registered/unregistered to GcManager vua AddGcReferenceLocker() and RemoveGcReferenceLockerr() + */ +class GcReferenceLocker +{ +public: + virtual ~GcReferenceLocker() = default; + + // Take all the locks needed to execute UpdateLockedState for the all the GcReferenceChecker in your domain + // Once all the GcReferenceChecker has executed UpdateLockedState and RemoveUsedReferencesFromSet for all + // domains has completed, the locks will be disposed and writes are allowed once again + virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) = 0; +}; + +/** * @brief Interface to handle GC of data that references Cid data * * This interface is registered/unregistered to GcManager vua AddGcReferencer() and RemoveGcReferencer() @@ -203,7 +226,7 @@ protected: public: virtual std::string GetGcName(GcCtx& Ctx) = 0; - // Remove expired data based on either GcCtx::Settings CacheExpireTime/ProjectExpireTime + // Remove expired data based on either GcCtx::Settings CacheExpireTime or ProjectExpireTime virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) = 0; // Create 0-n GcReferenceChecker for this GcReferencer. Caller will manage lifetime of @@ -350,6 +373,9 @@ public: void AddGcReferencer(GcReferencer& Referencer); void RemoveGcReferencer(GcReferencer& Referencer); + void AddGcReferenceLocker(GcReferenceLocker& ReferenceLocker); + void RemoveGcReferenceLocker(GcReferenceLocker& ReferenceLocker); + void AddGcReferenceStore(GcReferenceStore& ReferenceStore); void RemoveGcReferenceStore(GcReferenceStore& ReferenceStore); @@ -382,8 +408,9 @@ private: CidStore* m_CidStore = nullptr; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; - std::vector<GcReferencer*> m_GcReferencers; - std::vector<GcReferenceStore*> m_GcReferenceStores; + std::vector<GcReferencer*> m_GcReferencers; + std::vector<GcReferenceLocker*> m_GcReferencerLockers; + std::vector<GcReferenceStore*> m_GcReferenceStores; std::atomic_bool m_CancelGC{false}; }; |