diff options
| author | Dan Engelbrecht <[email protected]> | 2024-06-13 08:53:01 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-06-13 08:53:01 +0200 |
| commit | b71d52375e41a084e661d0f55f044ca8982312a4 (patch) | |
| tree | f44e74a13e29d50ab36d1eebfdb4c7e606d879a9 /src/zenstore/cache/cachedisklayer.cpp | |
| parent | 5.5.3-pre1 (diff) | |
| download | zen-b71d52375e41a084e661d0f55f044ca8982312a4.tar.xz zen-b71d52375e41a084e661d0f55f044ca8982312a4.zip | |
Make sure we monitor for new project, oplogs, namespaces and buckets during GCv2 (#93)
- Bugfix: Make sure we monitor and include new project/oplogs created during GCv2
- Bugfix: Make sure we monitor and include new namespaces/cache buckets created during GCv2
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 305 |
1 files changed, 179 insertions, 126 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 9dd2e4a67..f865e1c3c 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -3383,6 +3383,101 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); } +bool +ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHash>& OutReferences) +{ + auto GetAttachments = [&](const void* CbObjectData) { + CbObjectView Obj(CbObjectData); + Obj.IterateAttachments([&](CbFieldView Field) { OutReferences.emplace_back(Field.AsAttachment()); }); + }; + + std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys; + { + std::vector<IoHash> InlineKeys; + std::vector<BlockStoreLocation> InlineLocations; + std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes; + + { + std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes; + + for (const auto& Entry : m_Index) + { + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + + PayloadIndex EntryIndex = Entry.second; + const BucketPayload& Payload = m_Payloads[EntryIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + const IoHash& Key = Entry.first; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneKeys.push_back(std::make_pair(Key, Loc)); + continue; + } + + BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); + size_t ChunkIndex = InlineLocations.size(); + InlineLocations.push_back(ChunkLocation); + InlineKeys.push_back(Key); + if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end()) + { + InlineBlockChunkIndexes[It->second].push_back(ChunkIndex); + } + else + { + BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size()); + InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex}); + } + } + } + + for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes) + { + ZEN_ASSERT(!ChunkIndexes.empty()); + + bool Continue = m_BlockStore.IterateBlock( + InlineLocations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ZEN_UNUSED(ChunkIndex, Size); + GetAttachments(Data); + return !Ctx.IsCancelledFlag.load(); + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + ZEN_UNUSED(ChunkIndex); + GetAttachments(File.GetChunk(Offset, Size).GetData()); + return !Ctx.IsCancelledFlag.load(); + }); + + if (!Continue && Ctx.IsCancelledFlag.load()) + { + return false; + } + } + } + for (const auto& It : StandaloneKeys) + { + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + + IoBuffer Buffer = GetStandaloneCacheValue(It.second, It.first); + if (Buffer) + { + GetAttachments(Buffer.GetData()); + } + } + return true; +} + class DiskBucketReferenceChecker : public GcReferenceChecker { using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; @@ -3396,7 +3491,6 @@ public: { try { - m_IndexLock.reset(); m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } catch (const std::exception& Ex) @@ -3419,114 +3513,25 @@ public: } ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, - m_PrecachedReferences.size(), + m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - auto GetAttachments = [&](const void* CbObjectData) { - CbObjectView Obj(CbObjectData); - Obj.IterateAttachments([&](CbFieldView Field) { m_PrecachedReferences.emplace_back(Field.AsAttachment()); }); - }; - - // Refresh cache - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); }); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); }); - std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys; - { - std::vector<IoHash> InlineKeys; - std::vector<BlockStoreLocation> InlineLocations; - std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes; - - { - std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes; - - RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); - for (const auto& Entry : m_CacheBucket.m_Index) - { - if (Ctx.IsCancelledFlag.load()) - { - IndexLock.ReleaseNow(); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } - - PayloadIndex EntryIndex = Entry.second; - const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; - const DiskLocation& Loc = Payload.Location; - - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - const IoHash& Key = Entry.first; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - StandaloneKeys.push_back(std::make_pair(Key, Loc)); - continue; - } - - BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment); - size_t ChunkIndex = InlineLocations.size(); - InlineLocations.push_back(ChunkLocation); - InlineKeys.push_back(Key); - if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end()) - { - InlineBlockChunkIndexes[It->second].push_back(ChunkIndex); - } - else - { - BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size()); - InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex}); - } - } - } - - for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes) - { - ZEN_ASSERT(!ChunkIndexes.empty()); - - bool Continue = m_CacheBucket.m_BlockStore.IterateBlock( - InlineLocations, - ChunkIndexes, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ZEN_UNUSED(ChunkIndex, Size); - GetAttachments(Data); - return !Ctx.IsCancelledFlag.load(); - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ZEN_UNUSED(ChunkIndex); - GetAttachments(File.GetChunk(Offset, Size).GetData()); - return !Ctx.IsCancelledFlag.load(); - }); - - if (!Continue && Ctx.IsCancelledFlag.load()) - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } - } - } - for (const auto& It : StandaloneKeys) - { - if (Ctx.IsCancelledFlag.load()) - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } + RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); + bool Continue = m_CacheBucket.GetReferencesLocked(Ctx, m_References); + IndexLock.ReleaseNow(); - IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(It.second, It.first); - if (Buffer) - { - GetAttachments(Buffer.GetData()); - } - } + if (!Continue) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } } - virtual void LockState(GcCtx& Ctx) override + virtual void UpdateLockedState(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Z$::Bucket::LockState"); + ZEN_TRACE_CPU("Z$::Bucket::UpdateLockedState"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3536,32 +3541,28 @@ public: } ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, - m_PrecachedReferences.size() + m_UncachedReferences.size(), + m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock); if (Ctx.IsCancelledFlag.load()) { - m_UncachedReferences = {}; - m_IndexLock.reset(); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); + m_References = {}; + m_CacheBucket.m_TrackedReferences.reset(); return; } ZEN_ASSERT(m_CacheBucket.m_TrackedReferences); HashSet& AddedReferences(*m_CacheBucket.m_TrackedReferences); - m_UncachedReferences.reserve(AddedReferences.size()); - m_UncachedReferences.insert(m_UncachedReferences.end(), AddedReferences.begin(), AddedReferences.end()); + m_References.reserve(m_References.size() + AddedReferences.size()); + m_References.insert(m_References.end(), AddedReferences.begin(), AddedReferences.end()); AddedReferences = {}; } virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet"); - - ZEN_ASSERT(m_IndexLock); size_t InitialCount = IoCids.size(); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3576,18 +3577,7 @@ public: NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - for (const IoHash& ReferenceHash : m_PrecachedReferences) - { - if (IoCids.erase(ReferenceHash) == 1) - { - if (IoCids.empty()) - { - return; - } - } - } - - for (const IoHash& ReferenceHash : m_UncachedReferences) + for (const IoHash& ReferenceHash : m_References) { if (IoCids.erase(ReferenceHash) == 1) { @@ -3598,10 +3588,8 @@ public: } } } - CacheBucket& m_CacheBucket; - std::unique_ptr<RwLock::SharedLockScope> m_IndexLock; - std::vector<IoHash> m_PrecachedReferences; - std::vector<IoHash> m_UncachedReferences; + CacheBucket& m_CacheBucket; + std::vector<IoHash> m_References; }; std::vector<GcReferenceChecker*> @@ -3673,6 +3661,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, Reset(m_FreeMemCachedPayloads); } +RwLock::SharedLockScope +ZenCacheDiskLayer::CacheBucket::GetGcReferencerLock() +{ + return RwLock::SharedLockScope(m_IndexLock); +} + #if ZEN_WITH_TESTS void ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time) @@ -3763,7 +3757,12 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) CacheBucket* Result = Bucket.get(); m_Buckets.emplace(BucketName, std::move(Bucket)); - + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedBuckets) + { + m_CapturedBuckets->push_back(std::string(BucketName)); + } + }); return Result; } @@ -4317,6 +4316,60 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st return Details; } +std::vector<RwLock::SharedLockScope> +ZenCacheDiskLayer::GetGcReferencerLocks() +{ + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_Lock)); + for (auto& Kv : m_Buckets) + { + Locks.emplace_back(Kv.second->GetGcReferencerLock()); + } + return Locks; +} + +void +ZenCacheDiskLayer::EnableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedBuckets); + m_CapturedBuckets = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedBuckets); + } + m_UpdateCaptureRefCounter++; + }); +} + +void +ZenCacheDiskLayer::DisableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedBuckets); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedBuckets.reset(); + } + }); +} + +std::vector<std::string> +ZenCacheDiskLayer::GetCapturedBuckets() +{ + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedBuckets) + { + return *m_CapturedBuckets; + } + return {}; +} + void ZenCacheDiskLayer::MemCacheTrim() { |