diff options
| author | Dan Engelbrecht <[email protected]> | 2024-06-13 08:53:01 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-06-13 08:53:01 +0200 |
| commit | b71d52375e41a084e661d0f55f044ca8982312a4 (patch) | |
| tree | f44e74a13e29d50ab36d1eebfdb4c7e606d879a9 /src/zenstore/cache | |
| parent | 5.5.3-pre1 (diff) | |
| download | zen-b71d52375e41a084e661d0f55f044ca8982312a4.tar.xz zen-b71d52375e41a084e661d0f55f044ca8982312a4.zip | |
Make sure we monitor for new project, oplogs, namespaces and buckets during GCv2 (#93)
- Bugfix: Make sure we monitor and include new project/oplogs created during GCv2
- Bugfix: Make sure we monitor and include new namespaces/cache buckets created during GCv2
Diffstat (limited to 'src/zenstore/cache')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 305 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 235 |
2 files changed, 414 insertions, 126 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 9dd2e4a67..f865e1c3c 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -3383,6 +3383,101 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); } +bool +ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHash>& OutReferences) +{ + auto GetAttachments = [&](const void* CbObjectData) { + CbObjectView Obj(CbObjectData); + Obj.IterateAttachments([&](CbFieldView Field) { OutReferences.emplace_back(Field.AsAttachment()); }); + }; + + std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys; + { + std::vector<IoHash> InlineKeys; + std::vector<BlockStoreLocation> InlineLocations; + std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes; + + { + std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes; + + for (const auto& Entry : m_Index) + { + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + + PayloadIndex EntryIndex = Entry.second; + const BucketPayload& Payload = m_Payloads[EntryIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + const IoHash& Key = Entry.first; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneKeys.push_back(std::make_pair(Key, Loc)); + continue; + } + + BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); + size_t ChunkIndex = InlineLocations.size(); + InlineLocations.push_back(ChunkLocation); + InlineKeys.push_back(Key); + if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end()) + { + InlineBlockChunkIndexes[It->second].push_back(ChunkIndex); + } + else + { + BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size()); + InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex}); + } + } + } + + for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes) + { + ZEN_ASSERT(!ChunkIndexes.empty()); + + bool Continue = m_BlockStore.IterateBlock( + InlineLocations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ZEN_UNUSED(ChunkIndex, Size); + GetAttachments(Data); + return !Ctx.IsCancelledFlag.load(); + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + ZEN_UNUSED(ChunkIndex); + GetAttachments(File.GetChunk(Offset, Size).GetData()); + return !Ctx.IsCancelledFlag.load(); + }); + + if (!Continue && Ctx.IsCancelledFlag.load()) + { + return false; + } + } + } + for (const auto& It : StandaloneKeys) + { + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + + IoBuffer Buffer = GetStandaloneCacheValue(It.second, It.first); + if (Buffer) + { + GetAttachments(Buffer.GetData()); + } + } + return true; +} + class DiskBucketReferenceChecker : public GcReferenceChecker { using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; @@ -3396,7 +3491,6 @@ public: { try { - m_IndexLock.reset(); m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } catch (const std::exception& Ex) @@ -3419,114 +3513,25 @@ public: } ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, - m_PrecachedReferences.size(), + m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - auto GetAttachments = [&](const void* CbObjectData) { - CbObjectView Obj(CbObjectData); - Obj.IterateAttachments([&](CbFieldView Field) { m_PrecachedReferences.emplace_back(Field.AsAttachment()); }); - }; - - // Refresh cache - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); }); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); }); - std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys; - { - std::vector<IoHash> InlineKeys; - std::vector<BlockStoreLocation> InlineLocations; - std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes; - - { - std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes; - - RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); - for (const auto& Entry : m_CacheBucket.m_Index) - { - if (Ctx.IsCancelledFlag.load()) - { - IndexLock.ReleaseNow(); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } - - PayloadIndex EntryIndex = Entry.second; - const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; - const DiskLocation& Loc = Payload.Location; - - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - const IoHash& Key = Entry.first; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - StandaloneKeys.push_back(std::make_pair(Key, Loc)); - continue; - } - - BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment); - size_t ChunkIndex = InlineLocations.size(); - InlineLocations.push_back(ChunkLocation); - InlineKeys.push_back(Key); - if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end()) - { - InlineBlockChunkIndexes[It->second].push_back(ChunkIndex); - } - else - { - BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size()); - InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex}); - } - } - } - - for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes) - { - ZEN_ASSERT(!ChunkIndexes.empty()); - - bool Continue = m_CacheBucket.m_BlockStore.IterateBlock( - InlineLocations, - ChunkIndexes, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ZEN_UNUSED(ChunkIndex, Size); - GetAttachments(Data); - return !Ctx.IsCancelledFlag.load(); - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ZEN_UNUSED(ChunkIndex); - GetAttachments(File.GetChunk(Offset, Size).GetData()); - return !Ctx.IsCancelledFlag.load(); - }); - - if (!Continue && Ctx.IsCancelledFlag.load()) - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } - } - } - for (const auto& It : StandaloneKeys) - { - if (Ctx.IsCancelledFlag.load()) - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } + RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); + bool Continue = m_CacheBucket.GetReferencesLocked(Ctx, m_References); + IndexLock.ReleaseNow(); - IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(It.second, It.first); - if (Buffer) - { - GetAttachments(Buffer.GetData()); - } - } + if (!Continue) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } } - virtual void LockState(GcCtx& Ctx) override + virtual void UpdateLockedState(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Z$::Bucket::LockState"); + ZEN_TRACE_CPU("Z$::Bucket::UpdateLockedState"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3536,32 +3541,28 @@ public: } ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, - m_PrecachedReferences.size() + m_UncachedReferences.size(), + m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock); if (Ctx.IsCancelledFlag.load()) { - m_UncachedReferences = {}; - m_IndexLock.reset(); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); + m_References = {}; + m_CacheBucket.m_TrackedReferences.reset(); return; } ZEN_ASSERT(m_CacheBucket.m_TrackedReferences); HashSet& AddedReferences(*m_CacheBucket.m_TrackedReferences); - m_UncachedReferences.reserve(AddedReferences.size()); - m_UncachedReferences.insert(m_UncachedReferences.end(), AddedReferences.begin(), AddedReferences.end()); + m_References.reserve(m_References.size() + AddedReferences.size()); + m_References.insert(m_References.end(), AddedReferences.begin(), AddedReferences.end()); AddedReferences = {}; } virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet"); - - ZEN_ASSERT(m_IndexLock); size_t InitialCount = IoCids.size(); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3576,18 +3577,7 @@ public: NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - for (const IoHash& ReferenceHash : m_PrecachedReferences) - { - if (IoCids.erase(ReferenceHash) == 1) - { - if (IoCids.empty()) - { - return; - } - } - } - - for (const IoHash& ReferenceHash : m_UncachedReferences) + for (const IoHash& ReferenceHash : m_References) { if (IoCids.erase(ReferenceHash) == 1) { @@ -3598,10 +3588,8 @@ public: } } } - CacheBucket& m_CacheBucket; - std::unique_ptr<RwLock::SharedLockScope> m_IndexLock; - std::vector<IoHash> m_PrecachedReferences; - std::vector<IoHash> m_UncachedReferences; + CacheBucket& m_CacheBucket; + std::vector<IoHash> m_References; }; std::vector<GcReferenceChecker*> @@ -3673,6 +3661,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, Reset(m_FreeMemCachedPayloads); } +RwLock::SharedLockScope +ZenCacheDiskLayer::CacheBucket::GetGcReferencerLock() +{ + return RwLock::SharedLockScope(m_IndexLock); +} + #if ZEN_WITH_TESTS void ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time) @@ -3763,7 +3757,12 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) CacheBucket* Result = Bucket.get(); m_Buckets.emplace(BucketName, std::move(Bucket)); - + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedBuckets) + { + m_CapturedBuckets->push_back(std::string(BucketName)); + } + }); return Result; } @@ -4317,6 +4316,60 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st return Details; } +std::vector<RwLock::SharedLockScope> +ZenCacheDiskLayer::GetGcReferencerLocks() +{ + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_Lock)); + for (auto& Kv : m_Buckets) + { + Locks.emplace_back(Kv.second->GetGcReferencerLock()); + } + return Locks; +} + +void +ZenCacheDiskLayer::EnableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedBuckets); + m_CapturedBuckets = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedBuckets); + } + m_UpdateCaptureRefCounter++; + }); +} + +void +ZenCacheDiskLayer::DisableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedBuckets); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedBuckets.reset(); + } + }); +} + +std::vector<std::string> +ZenCacheDiskLayer::GetCapturedBuckets() +{ + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedBuckets) + { + return *m_CapturedBuckets; + } + return {}; +} + void ZenCacheDiskLayer::MemCacheTrim() { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index d9c2d3e59..d7d594af7 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -375,6 +375,23 @@ ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const st return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter); } +std::vector<RwLock::SharedLockScope> +ZenCacheNamespace::GetGcReferencerLocks() +{ + return m_DiskLayer.GetGcReferencerLocks(); +} + +void +ZenCacheNamespace::EnableUpdateCapture() +{ + m_DiskLayer.EnableUpdateCapture(); +} +void +ZenCacheNamespace::DisableUpdateCapture() +{ + m_DiskLayer.DisableUpdateCapture(); +} + #if ZEN_WITH_TESTS void ZenCacheNamespace::SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time) @@ -439,11 +456,15 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName), m_Configuration.NamespaceConfig); } + m_Gc.AddGcReferencer(*this); + m_Gc.AddGcReferenceLocker(*this); } ZenCacheStore::~ZenCacheStore() { ZEN_INFO("closing cache store at '{}'", m_BasePath); + m_Gc.RemoveGcReferenceLocker(*this); + m_Gc.RemoveGcReferencer(*this); SetLoggingConfig({.EnableWriteLog = false, .EnableAccessLog = false}); m_Namespaces.clear(); } @@ -844,6 +865,14 @@ ZenCacheStore::GetNamespace(std::string_view Namespace) m_JobQueue, m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace), m_Configuration.NamespaceConfig)); + + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedNamespaces) + { + m_CapturedNamespaces->push_back(std::string(Namespace)); + } + }); + return NewNamespace.first->second.get(); } @@ -1003,6 +1032,212 @@ ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view Bu return {}; } +std::vector<RwLock::SharedLockScope> +ZenCacheStore::LockState(GcCtx&) +{ + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_NamespacesLock)); + for (auto& NamespaceIt : m_Namespaces) + { + std::vector<RwLock::SharedLockScope> NamespaceLocks = NamespaceIt.second->GetGcReferencerLocks(); + for (auto It = std::make_move_iterator(NamespaceLocks.begin()); It != std::make_move_iterator(NamespaceLocks.end()); It++) + { + Locks.emplace_back(std::move(*It)); + } + } + return Locks; +} + +void +ZenCacheStore::EnableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedNamespaces); + m_CapturedNamespaces = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedNamespaces); + } + m_UpdateCaptureRefCounter++; + }); + for (auto& NamespaceIt : m_Namespaces) + { + NamespaceIt.second->EnableUpdateCapture(); + } +} + +void +ZenCacheStore::DisableUpdateCapture() +{ + for (auto& NamespaceIt : m_Namespaces) + { + NamespaceIt.second->DisableUpdateCapture(); + } + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedNamespaces); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedNamespaces.reset(); + } + }); +} + +std::vector<std::string> +ZenCacheStore::GetCapturedNamespaces() +{ + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedNamespaces) + { + return *m_CapturedNamespaces; + } + return {}; +} + +std::string +ZenCacheStore::GetGcName(GcCtx&) +{ + return fmt::format("zencachestore: '{}'", m_BasePath.string()); +} + +GcStoreCompactor* +ZenCacheStore::RemoveExpiredData(GcCtx&, GcStats&) +{ + return nullptr; +} + +class CacheStoreReferenceChecker : public GcReferenceChecker +{ +public: + CacheStoreReferenceChecker(ZenCacheStore& InCacheStore) : m_CacheStore(InCacheStore) { m_CacheStore.EnableUpdateCapture(); } + + virtual ~CacheStoreReferenceChecker() + { + try + { + m_CacheStore.DisableUpdateCapture(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~CacheStoreReferenceChecker threw exception: '{}'", Ex.what()); + } + } + + virtual std::string GetGcName(GcCtx&) override { return "cachestore"; } + virtual void PreCache(GcCtx&) override {} + virtual void UpdateLockedState(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Z$::UpdateLockedState"); + + Stopwatch Timer; + + std::vector<ZenCacheDiskLayer::CacheBucket*> AddedBuckets; + + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachestore [LOCKSTATE] '{}': found {} references in {} in {} new buckets", + "cachestore", + m_References.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + AddedBuckets.size()); + }); + + std::vector<std::string> AddedNamespaces = m_CacheStore.GetCapturedNamespaces(); + + for (const std::string& AddedNamespace : AddedNamespaces) + { + if (auto It = m_CacheStore.m_Namespaces.find(AddedNamespace); It != m_CacheStore.m_Namespaces.end()) + { + ZenCacheNamespace& Namespace = *It->second; + for (auto& BucketKV : Namespace.m_DiskLayer.m_Buckets) + { + AddedBuckets.push_back(BucketKV.second.get()); + } + } + } + for (auto& NamepaceKV : m_CacheStore.m_Namespaces) + { + ZenCacheNamespace& Namespace = *NamepaceKV.second; + std::vector<std::string> NamespaceAddedBuckets = Namespace.m_DiskLayer.GetCapturedBuckets(); + for (const std::string& AddedBucketName : NamespaceAddedBuckets) + { + if (auto It = Namespace.m_DiskLayer.m_Buckets.find(AddedBucketName); It != Namespace.m_DiskLayer.m_Buckets.end()) + { + AddedBuckets.push_back(It->second.get()); + } + } + } + + for (ZenCacheDiskLayer::CacheBucket* Bucket : AddedBuckets) + { + bool Continue = Bucket->GetReferencesLocked(Ctx, m_References); + if (!Continue) + { + break; + } + } + } + + virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override + { + ZEN_TRACE_CPU("Z$::RemoveUsedReferencesFromSet"); + + size_t InitialCount = IoCids.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + "projectstore", + InitialCount - IoCids.size(), + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + for (const IoHash& ReferenceHash : m_References) + { + if (IoCids.erase(ReferenceHash) == 1) + { + if (IoCids.empty()) + { + return; + } + } + } + } + +private: + ZenCacheStore& m_CacheStore; + std::vector<IoHash> m_References; +}; + +std::vector<GcReferenceChecker*> +ZenCacheStore::CreateReferenceCheckers(GcCtx& Ctx) +{ + ZEN_TRACE_CPU("CacheStore::CreateReferenceCheckers"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachestore [CREATE CHECKERS] '{}': completed in {}", m_BasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + std::vector<GcReferenceChecker*> Checkers; + Checkers.emplace_back(new CacheStoreReferenceChecker(*this)); + return Checkers; +} + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS |