diff options
| author | Dan Engelbrecht <[email protected]> | 2024-06-13 08:53:01 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-06-13 08:53:01 +0200 |
| commit | b71d52375e41a084e661d0f55f044ca8982312a4 (patch) | |
| tree | f44e74a13e29d50ab36d1eebfdb4c7e606d879a9 | |
| parent | 5.5.3-pre1 (diff) | |
| download | zen-b71d52375e41a084e661d0f55f044ca8982312a4.tar.xz zen-b71d52375e41a084e661d0f55f044ca8982312a4.zip | |
Make sure we monitor for new project, oplogs, namespaces and buckets during GCv2 (#93)
- Bugfix: Make sure we monitor and include new project/oplogs created during GCv2
- Bugfix: Make sure we monitor and include new namespaces/cache buckets created during GCv2
| -rw-r--r-- | CHANGELOG.md | 2 | ||||
| -rw-r--r-- | src/zencore/include/zencore/thread.h | 10 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 469 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 63 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 305 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 235 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 124 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 12 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/structuredcachestore.h | 25 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/gc.h | 45 |
10 files changed, 998 insertions, 292 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 4461ccc0e..38024fed0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -77,6 +77,8 @@ - `--alias` - alias name of the share - replaces `--workspace` and `--share` options - `--chunks` - the chunk ids for the chunk or the share local paths for the chunk - Bugfix: Removed test data added at current folder when running test +- Bugfix: Make sure we monitor and include new project/oplogs created during GCv2 +- Bugfix: Make sure we monitor and include new namespaces/cache buckets created during GCv2 - Improvement: Various minor optimizations in cache package formatting - Improvement: Add batch fetch of cache values in the GetCacheValues request - Improvement: Use a smaller thread pool for network operations when doing oplog import to reduce risk of NIC/router failure diff --git a/src/zencore/include/zencore/thread.h b/src/zencore/include/zencore/thread.h index bae630db9..9362802a1 100644 --- a/src/zencore/include/zencore/thread.h +++ b/src/zencore/include/zencore/thread.h @@ -35,6 +35,16 @@ public: struct SharedLockScope { + SharedLockScope(const SharedLockScope& Rhs) = delete; + SharedLockScope(SharedLockScope&& Rhs) : m_Lock(Rhs.m_Lock) { Rhs.m_Lock = nullptr; } + SharedLockScope& operator=(SharedLockScope&& Rhs) + { + ReleaseNow(); + m_Lock = Rhs.m_Lock; + Rhs.m_Lock = nullptr; + return *this; + } + SharedLockScope& operator=(const SharedLockScope& Rhs) = delete; SharedLockScope(RwLock& Lock) : m_Lock(&Lock) { Lock.AcquireShared(); } ~SharedLockScope() { ReleaseNow(); } diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 4c434c39b..beac1ab97 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1094,6 +1094,12 @@ void ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler) { RwLock::SharedLockScope _(m_OplogLock); + IterateOplogLocked(std::move(Handler)); +} + +void +ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Handler) +{ if (!m_Storage) { return; @@ -1264,22 +1270,25 @@ ProjectStore::Oplog::AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid: } void -ProjectStore::Oplog::CaptureUpdatedLSN(RwLock::ExclusiveLockScope&, uint32_t LSN) +ProjectStore::Oplog::CaptureUpdatedLSNs(std::span<const uint32_t> LSNs) { - if (m_UpdatedLSNs) - { - m_UpdatedLSNs->push_back(LSN); - } + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedLSNs) + { + m_CapturedLSNs->reserve(m_CapturedLSNs->size() + LSNs.size()); + m_CapturedLSNs->insert(m_CapturedLSNs->end(), LSNs.begin(), LSNs.end()); + } + }); } void ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes) { - m_OplogLock.WithExclusiveLock([this, AttachmentHashes]() { - if (m_NonGCAttachments) + m_UpdateCaptureLock.WithExclusiveLock([this, AttachmentHashes]() { + if (m_CapturedAttachments) { - m_NonGCAttachments->reserve(m_NonGCAttachments->size() + AttachmentHashes.size()); - m_NonGCAttachments->insert(m_NonGCAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end()); + m_CapturedAttachments->reserve(m_CapturedAttachments->size() + AttachmentHashes.size()); + m_CapturedAttachments->insert(m_CapturedAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end()); } }); } @@ -1287,18 +1296,18 @@ ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentH void ProjectStore::Oplog::EnableUpdateCapture() { - m_OplogLock.WithExclusiveLock([&]() { + m_UpdateCaptureLock.WithExclusiveLock([&]() { if (m_UpdateCaptureRefCounter == 0) { - ZEN_ASSERT(!m_UpdatedLSNs); - ZEN_ASSERT(!m_NonGCAttachments); - m_UpdatedLSNs = std::make_unique<std::vector<int>>(); - m_NonGCAttachments = std::make_unique<std::vector<IoHash>>(); + ZEN_ASSERT(!m_CapturedLSNs); + ZEN_ASSERT(!m_CapturedAttachments); + m_CapturedLSNs = std::make_unique<std::vector<uint32_t>>(); + m_CapturedAttachments = std::make_unique<std::vector<IoHash>>(); } else { - ZEN_ASSERT(m_UpdatedLSNs); - ZEN_ASSERT(m_NonGCAttachments); + ZEN_ASSERT(m_CapturedLSNs); + ZEN_ASSERT(m_CapturedAttachments); } m_UpdateCaptureRefCounter++; }); @@ -1307,51 +1316,49 @@ ProjectStore::Oplog::EnableUpdateCapture() void ProjectStore::Oplog::DisableUpdateCapture() { - m_OplogLock.WithExclusiveLock([&]() { - ZEN_ASSERT(m_UpdatedLSNs); - ZEN_ASSERT(m_NonGCAttachments); + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedLSNs); + ZEN_ASSERT(m_CapturedAttachments); ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); m_UpdateCaptureRefCounter--; if (m_UpdateCaptureRefCounter == 0) { - m_UpdatedLSNs.reset(); - m_NonGCAttachments.reset(); + m_CapturedLSNs.reset(); + m_CapturedAttachments.reset(); } }); } void -ProjectStore::Oplog::IterateUpdatedLSNs(RwLock::SharedLockScope&, std::function<bool(const CbObjectView& UpdateOp)>&& Callback) +ProjectStore::Oplog::IterateCapturedLSNs(std::function<bool(const CbObjectView& UpdateOp)>&& Callback) { - if (m_UpdatedLSNs) - { - if (!m_Storage) + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedLSNs) { - return; - } - for (int UpdatedLSN : *m_UpdatedLSNs) - { - if (const auto AddressEntryIt = m_OpAddressMap.find(UpdatedLSN); AddressEntryIt != m_OpAddressMap.end()) + if (!m_Storage) { - Callback(m_Storage->GetOp(AddressEntryIt->second)); + return; + } + for (int UpdatedLSN : *m_CapturedLSNs) + { + if (const auto AddressEntryIt = m_OpAddressMap.find(UpdatedLSN); AddressEntryIt != m_OpAddressMap.end()) + { + Callback(m_Storage->GetOp(AddressEntryIt->second)); + } } } - } + }); } -void -ProjectStore::Oplog::IterateAddedAttachments(RwLock::SharedLockScope&, std::function<bool(const IoHash& RawHash)>&& Callback) +std::vector<IoHash> +ProjectStore::Oplog::GetCapturedAttachments() { - if (m_NonGCAttachments) + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedAttachments) { - for (const IoHash& ReferenceHash : *m_NonGCAttachments) - { - if (!Callback(ReferenceHash)) - { - break; - } - } + return *m_CapturedAttachments; } + return {}; } void @@ -1511,9 +1518,6 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn; - - CaptureUpdatedLSN(OplogLock, OpEntry.OpLsn); - return OpEntry.OpLsn; } @@ -1605,6 +1609,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) RwLock::ExclusiveLockScope OplogLock(m_OplogLock); const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry); + CaptureUpdatedLSNs(std::array<uint32_t, 1u>({EntryId})); return EntryId; } @@ -1645,6 +1650,7 @@ ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores) { EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]); } + CaptureUpdatedLSNs(EntryIds); } return EntryIds; } @@ -1868,6 +1874,14 @@ ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem: .first->second.get(); Log->Write(); + + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedOplogs) + { + m_CapturedOplogs->push_back(std::string(OplogId)); + } + }); + return Log; } catch (const std::exception&) @@ -2125,6 +2139,61 @@ ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath) return true; } +void +ProjectStore::Project::EnableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedOplogs); + m_CapturedOplogs = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedOplogs); + } + m_UpdateCaptureRefCounter++; + }); +} + +void +ProjectStore::Project::DisableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedOplogs); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedOplogs.reset(); + } + }); +} + +std::vector<std::string> +ProjectStore::Project::GetCapturedOplogs() +{ + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedOplogs) + { + return *m_CapturedOplogs; + } + return {}; +} + +std::vector<RwLock::SharedLockScope> +ProjectStore::Project::GetGcReferencerLocks() +{ + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_ProjectLock)); + Locks.reserve(1 + m_Oplogs.size()); + for (auto& Kv : m_Oplogs) + { + Locks.emplace_back(Kv.second->GetGcReferencerLock()); + } + return Locks; +} + bool ProjectStore::Project::IsExpired(const RwLock::SharedLockScope&, const std::string& EntryName, @@ -2221,11 +2290,13 @@ ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcMa m_Gc.AddGcContributor(this); m_Gc.AddGcStorage(this); m_Gc.AddGcReferencer(*this); + m_Gc.AddGcReferenceLocker(*this); } ProjectStore::~ProjectStore() { ZEN_INFO("closing project store at '{}'", m_ProjectBasePath); + m_Gc.RemoveGcReferenceLocker(*this); m_Gc.RemoveGcReferencer(*this); m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcContributor(this); @@ -2559,6 +2630,13 @@ ProjectStore::NewProject(const std::filesystem::path& BasePath, Prj->ProjectFilePath = ProjectFilePath; Prj->Write(); + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedProjects) + { + m_CapturedProjects->push_back(std::string(ProjectId)); + } + }); + return Prj; } @@ -3753,6 +3831,48 @@ ProjectStore::AreDiskWritesAllowed() const return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } +void +ProjectStore::EnableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedProjects); + m_CapturedProjects = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedProjects); + } + m_UpdateCaptureRefCounter++; + }); +} + +void +ProjectStore::DisableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedProjects); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedProjects.reset(); + } + }); +} + +std::vector<std::string> +ProjectStore::GetCapturedProjects() +{ + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedProjects) + { + return *m_CapturedProjects; + } + return {}; +} + std::string ProjectStore::GetGcName(GcCtx&) { @@ -3988,17 +4108,13 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) class ProjectStoreReferenceChecker : public GcReferenceChecker { public: - ProjectStoreReferenceChecker(ProjectStore::Oplog& Owner) : m_Oplog(Owner) {} + ProjectStoreReferenceChecker(ProjectStore& InProjectStore) : m_ProjectStore(InProjectStore) { m_ProjectStore.EnableUpdateCapture(); } virtual ~ProjectStoreReferenceChecker() { try { - m_OplogLock.reset(); - if (m_OplogCaptureEnabled) - { - m_Oplog.DisableUpdateCapture(); - } + m_ProjectStore.DisableUpdateCapture(); } catch (const std::exception& Ex) { @@ -4006,42 +4122,190 @@ public: } } - virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}'", m_Oplog.m_BasePath); } + virtual std::string GetGcName(GcCtx&) override { return "projectstore"; } + virtual void PreCache(GcCtx&) override {} - virtual void PreCache(GcCtx& Ctx) override + virtual void UpdateLockedState(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Store::PreCache"); + ZEN_TRACE_CPU("Store::UpdateLockedState"); + + Stopwatch Timer; + + std::vector<ProjectStore::Oplog*> AddedOplogs; - Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } - ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}", - m_Oplog.m_BasePath, + ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} in {} new oplogs", + "projectstore", m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - m_Oplog.m_OuterProject->Identifier, - m_Oplog.OplogId()); + AddedOplogs.size()); + }); + + std::vector<std::string> AddedProjects = m_ProjectStore.GetCapturedProjects(); + for (const std::string& AddedProject : AddedProjects) + { + if (auto It = m_ProjectStore.m_Projects.find(AddedProject); It != m_ProjectStore.m_Projects.end()) + { + ProjectStore::Project& Project = *It->second; + for (auto& OplogPair : Project.m_Oplogs) + { + ProjectStore::Oplog* Oplog = OplogPair.second.get(); + AddedOplogs.push_back(Oplog); + } + } + } + for (auto& ProjectPair : m_ProjectStore.m_Projects) + { + ProjectStore::Project& Project = *ProjectPair.second; + std::vector<std::string> AddedOplogNames(Project.GetCapturedOplogs()); + for (const std::string& OplogName : AddedOplogNames) + { + if (auto It = Project.m_Oplogs.find(OplogName); It != Project.m_Oplogs.end()) + { + ProjectStore::Oplog* Oplog = It->second.get(); + AddedOplogs.push_back(Oplog); + } + } + } + + for (ProjectStore::Oplog* Oplog : AddedOplogs) + { + size_t BaseReferenceCount = m_References.size(); + + Stopwatch InnerTimer; + const auto __ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}", + Oplog->m_BasePath, + m_References.size() - BaseReferenceCount, + NiceTimeSpanMs(InnerTimer.GetElapsedTimeMs()), + Oplog->OplogId()); + }); + + Oplog->IterateOplogLocked([&](const CbObjectView& UpdateOp) -> bool { + UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); + return true; + }); + } + } + + virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override + { + ZEN_TRACE_CPU("Store::RemoveUsedReferencesFromSet"); + + size_t InitialCount = IoCids.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + "projectstore", + InitialCount - IoCids.size(), + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_Oplog.EnableUpdateCapture(); - m_OplogCaptureEnabled = true; + for (const IoHash& ReferenceHash : m_References) + { + if (IoCids.erase(ReferenceHash) == 1) + { + if (IoCids.empty()) + { + return; + } + } + } + } - RwLock::SharedLockScope __(m_Oplog.m_OplogLock); - if (Ctx.IsCancelledFlag) +private: + ProjectStore& m_ProjectStore; + std::vector<IoHash> m_References; +}; + +class ProjectStoreOplogReferenceChecker : public GcReferenceChecker +{ +public: + ProjectStoreOplogReferenceChecker(ProjectStore& InProjectStore, Ref<ProjectStore::Project> InProject, std::string_view InOplog) + : m_ProjectStore(InProjectStore) + , m_Project(InProject) + , m_OplogName(InOplog) + { + m_Project->EnableUpdateCapture(); + } + + virtual ~ProjectStoreOplogReferenceChecker() + { + try { - return; + m_Project->DisableUpdateCapture(); + if (m_OplogCaptureEnabled) + { + ZEN_ASSERT(m_Oplog); + m_Oplog->DisableUpdateCapture(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~ProjectStoreOplogReferenceChecker threw exception: '{}'", Ex.what()); } - m_Oplog.IterateOplog([&](CbObjectView Op) { - Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); + } + + virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}/{}'", m_Project->Identifier, m_OplogName); } + + virtual void PreCache(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Store::Oplog::PreCache"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + if (m_Oplog) + { + ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}", + m_Oplog->m_BasePath, + m_References.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + m_Project->Identifier, + m_Oplog->OplogId()); + } }); + + if (auto It = m_Project->m_Oplogs.find(m_OplogName); It != m_Project->m_Oplogs.end()) + { + m_Oplog = It->second.get(); + m_Oplog->EnableUpdateCapture(); + m_OplogCaptureEnabled = true; + + RwLock::SharedLockScope __(m_Oplog->m_OplogLock); + if (Ctx.IsCancelledFlag) + { + return; + } + m_Oplog->IterateOplog([&](CbObjectView Op) { + Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); + }); + } } - virtual void LockState(GcCtx& Ctx) override + virtual void UpdateLockedState(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Store::LockState"); + ZEN_TRACE_CPU("Store::Oplog::UpdateLockedState"); + if (!m_Oplog) + { + return; + } Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -4050,25 +4314,28 @@ public: return; } ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}/{}", - m_Oplog.m_BasePath, + m_Oplog->m_BasePath, m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - m_Oplog.m_OuterProject->Identifier, - m_Oplog.OplogId()); + m_Project->Identifier, + m_Oplog->OplogId()); }); - m_OplogLock = std::make_unique<RwLock::SharedLockScope>(m_Oplog.m_OplogLock); - m_Oplog.IterateUpdatedLSNs(*m_OplogLock, [&](const CbObjectView& UpdateOp) -> bool { + m_Oplog->IterateCapturedLSNs([&](const CbObjectView& UpdateOp) -> bool { UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); return true; }); + std::vector<IoHash> AddedAttachments = m_Oplog->GetCapturedAttachments(); + m_References.insert(m_References.end(), AddedAttachments.begin(), AddedAttachments.end()); } virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { - ZEN_TRACE_CPU("Store::RemoveUsedReferencesFromSet"); - - ZEN_ASSERT(m_OplogLock); + ZEN_TRACE_CPU("Store::Oplog::RemoveUsedReferencesFromSet"); + if (!m_Oplog) + { + return; + } size_t InitialCount = IoCids.size(); Stopwatch Timer; @@ -4078,7 +4345,7 @@ public: return; } ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", - m_Oplog.m_BasePath, + m_Oplog->m_BasePath, InitialCount - IoCids.size(), InitialCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); @@ -4094,21 +4361,13 @@ public: } } } - m_Oplog.IterateAddedAttachments(*m_OplogLock, [&](const IoHash& RawHash) -> bool { - if (IoCids.erase(RawHash) == 1) - { - if (IoCids.empty()) - { - return false; - } - } - return true; - }); } - ProjectStore::Oplog& m_Oplog; - std::unique_ptr<RwLock::SharedLockScope> m_OplogLock; - std::vector<IoHash> m_References; - bool m_OplogCaptureEnabled = false; + ProjectStore& m_ProjectStore; + Ref<ProjectStore::Project> m_Project; + std::string m_OplogName; + ProjectStore::Oplog* m_Oplog = nullptr; + std::vector<IoHash> m_References; + bool m_OplogCaptureEnabled = false; }; std::vector<GcReferenceChecker*> @@ -4135,6 +4394,8 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) DiscoverProjects(); std::vector<Ref<ProjectStore::Project>> Projects; + std::vector<GcReferenceChecker*> Checkers; + Checkers.emplace_back(new ProjectStoreReferenceChecker(*this)); { RwLock::SharedLockScope Lock(m_ProjectsLock); Projects.reserve(m_Projects.size()); @@ -4145,21 +4406,15 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) } } ProjectCount += Projects.size(); - std::vector<GcReferenceChecker*> Checkers; try { for (const Ref<ProjectStore::Project>& Project : Projects) { std::vector<std::string> OpLogs = Project->ScanForOplogs(); - Checkers.reserve(OpLogs.size()); + Checkers.reserve(Checkers.size() + OpLogs.size()); for (const std::string& OpLogId : OpLogs) { - ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId); - if (Oplog == nullptr) - { - continue; - } - Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog)); + Checkers.emplace_back(new ProjectStoreOplogReferenceChecker(*this, Project, OpLogId)); OplogCount++; } } @@ -4177,6 +4432,22 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) return Checkers; } +std::vector<RwLock::SharedLockScope> +ProjectStore::LockState(GcCtx&) +{ + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_ProjectsLock)); + for (auto& ProjectIt : m_Projects) + { + std::vector<RwLock::SharedLockScope> ProjectLocks = ProjectIt.second->GetGcReferencerLocks(); + for (auto It = std::make_move_iterator(ProjectLocks.begin()); It != std::make_move_iterator(ProjectLocks.end()); It++) + { + Locks.emplace_back(std::move(*It)); + } + } + return Locks; +} + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index fd8443660..1b72a2688 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -58,7 +58,7 @@ static_assert(IsPow2(sizeof(OplogEntry))); package data split into separate chunks for bulk data, exports and header information. */ -class ProjectStore : public RefCounted, public GcStorage, public GcContributor, public GcReferencer +class ProjectStore : public RefCounted, public GcStorage, public GcContributor, public GcReferencer, public GcReferenceLocker { struct OplogStorage; @@ -95,6 +95,7 @@ public: void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn); void IterateOplog(std::function<void(CbObjectView)>&& Fn); void IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Fn); + void IterateOplogLocked(std::function<void(CbObjectView)>&& Fn); size_t GetOplogEntryCount() const; std::optional<CbObject> GetOpByKey(const Oid& Key); @@ -149,13 +150,14 @@ public: void AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid::Hasher>& ChunkMappings); - void CaptureUpdatedLSN(RwLock::ExclusiveLockScope& OplogLock, uint32_t LSN); + void CaptureUpdatedLSNs(std::span<const uint32_t> LSNs); void CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes); - void EnableUpdateCapture(); - void DisableUpdateCapture(); - void IterateUpdatedLSNs(RwLock::SharedLockScope& OplogLock, std::function<bool(const CbObjectView& UpdateOp)>&& Callback); - void IterateAddedAttachments(RwLock::SharedLockScope& OplogLock, std::function<bool(const IoHash& RawHash)>&& Callback); + void EnableUpdateCapture(); + void DisableUpdateCapture(); + void IterateCapturedLSNs(std::function<bool(const CbObjectView& UpdateOp)>&& Callback); + std::vector<IoHash> GetCapturedAttachments(); + RwLock::SharedLockScope GetGcReferencerLock() { return RwLock::SharedLockScope(m_OplogLock); } private: struct FileMapEntry @@ -181,12 +183,13 @@ public: tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key - uint32_t m_UpdateCaptureRefCounter = 0; - std::unique_ptr<std::vector<int>> m_UpdatedLSNs; - std::unique_ptr<std::vector<IoHash>> m_NonGCAttachments; + mutable RwLock m_UpdateCaptureLock; + uint32_t m_UpdateCaptureRefCounter = 0; + std::unique_ptr<std::vector<uint32_t>> m_CapturedLSNs; + std::unique_ptr<std::vector<IoHash>> m_CapturedAttachments; RefPtr<OplogStorage> m_Storage; - std::string m_OplogId; + const std::string m_OplogId; RefPtr<OplogStorage> GetStorage(); @@ -233,6 +236,7 @@ public: void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); + friend class ProjectStoreOplogReferenceChecker; friend class ProjectStoreReferenceChecker; }; @@ -272,6 +276,12 @@ public: uint64_t TotalSize() const; bool PrepareForDelete(std::filesystem::path& OutDeletePath); + void EnableUpdateCapture(); + void DisableUpdateCapture(); + std::vector<std::string> GetCapturedOplogs(); + + std::vector<RwLock::SharedLockScope> GetGcReferencerLocks(); + private: ProjectStore* m_ProjectStore; CidStore& m_CidStore; @@ -280,6 +290,9 @@ public: std::vector<std::unique_ptr<Oplog>> m_DeletedOplogs; std::filesystem::path m_OplogStoragePath; mutable tsl::robin_map<std::string, GcClock::Tick> m_LastAccessTimes; + mutable RwLock m_UpdateCaptureLock; + uint32_t m_UpdateCaptureRefCounter = 0; + std::unique_ptr<std::vector<std::string>> m_CapturedOplogs; std::filesystem::path BasePathForOplog(std::string_view OplogId); bool IsExpired(const RwLock::SharedLockScope&, @@ -288,6 +301,9 @@ public: const GcClock::TimePoint ExpireTime); void WriteAccessTimes(); void ReadAccessTimes(); + + friend class ProjectStoreOplogReferenceChecker; + friend class ProjectStoreReferenceChecker; }; // Oplog* OpenProjectOplog(std::string_view ProjectId, std::string_view OplogId); @@ -323,6 +339,8 @@ public: virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override; + CbArray GetProjectsList(); std::pair<HttpResponseCode, std::string> GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, @@ -392,19 +410,28 @@ public: bool AreDiskWritesAllowed() const; + void EnableUpdateCapture(); + void DisableUpdateCapture(); + std::vector<std::string> GetCapturedProjects(); + private: - LoggerRef m_Log; - GcManager& m_Gc; - CidStore& m_CidStore; - JobQueue& m_JobQueue; - std::filesystem::path m_ProjectBasePath; - mutable RwLock m_ProjectsLock; - std::map<std::string, Ref<Project>> m_Projects; - const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; + LoggerRef m_Log; + GcManager& m_Gc; + CidStore& m_CidStore; + JobQueue& m_JobQueue; + std::filesystem::path m_ProjectBasePath; + mutable RwLock m_ProjectsLock; + std::map<std::string, Ref<Project>> m_Projects; + const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; + mutable RwLock m_UpdateCaptureLock; + uint32_t m_UpdateCaptureRefCounter = 0; + std::unique_ptr<std::vector<std::string>> m_CapturedProjects; std::filesystem::path BasePathForProject(std::string_view ProjectId); friend class ProjectStoreGcStoreCompactor; + friend class ProjectStoreOplogReferenceChecker; + friend class ProjectStoreReferenceChecker; }; void prj_forcelink(); diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 9dd2e4a67..f865e1c3c 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -3383,6 +3383,101 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys)); } +bool +ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHash>& OutReferences) +{ + auto GetAttachments = [&](const void* CbObjectData) { + CbObjectView Obj(CbObjectData); + Obj.IterateAttachments([&](CbFieldView Field) { OutReferences.emplace_back(Field.AsAttachment()); }); + }; + + std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys; + { + std::vector<IoHash> InlineKeys; + std::vector<BlockStoreLocation> InlineLocations; + std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes; + + { + std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes; + + for (const auto& Entry : m_Index) + { + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + + PayloadIndex EntryIndex = Entry.second; + const BucketPayload& Payload = m_Payloads[EntryIndex]; + const DiskLocation& Loc = Payload.Location; + + if (!Loc.IsFlagSet(DiskLocation::kStructured)) + { + continue; + } + const IoHash& Key = Entry.first; + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + { + StandaloneKeys.push_back(std::make_pair(Key, Loc)); + continue; + } + + BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_Configuration.PayloadAlignment); + size_t ChunkIndex = InlineLocations.size(); + InlineLocations.push_back(ChunkLocation); + InlineKeys.push_back(Key); + if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end()) + { + InlineBlockChunkIndexes[It->second].push_back(ChunkIndex); + } + else + { + BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size()); + InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex}); + } + } + } + + for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes) + { + ZEN_ASSERT(!ChunkIndexes.empty()); + + bool Continue = m_BlockStore.IterateBlock( + InlineLocations, + ChunkIndexes, + [&](size_t ChunkIndex, const void* Data, uint64_t Size) { + ZEN_UNUSED(ChunkIndex, Size); + GetAttachments(Data); + return !Ctx.IsCancelledFlag.load(); + }, + [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { + ZEN_UNUSED(ChunkIndex); + GetAttachments(File.GetChunk(Offset, Size).GetData()); + return !Ctx.IsCancelledFlag.load(); + }); + + if (!Continue && Ctx.IsCancelledFlag.load()) + { + return false; + } + } + } + for (const auto& It : StandaloneKeys) + { + if (Ctx.IsCancelledFlag.load()) + { + return false; + } + + IoBuffer Buffer = GetStandaloneCacheValue(It.second, It.first); + if (Buffer) + { + GetAttachments(Buffer.GetData()); + } + } + return true; +} + class DiskBucketReferenceChecker : public GcReferenceChecker { using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex; @@ -3396,7 +3491,6 @@ public: { try { - m_IndexLock.reset(); m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } catch (const std::exception& Ex) @@ -3419,114 +3513,25 @@ public: } ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, - m_PrecachedReferences.size(), + m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - auto GetAttachments = [&](const void* CbObjectData) { - CbObjectView Obj(CbObjectData); - Obj.IterateAttachments([&](CbFieldView Field) { m_PrecachedReferences.emplace_back(Field.AsAttachment()); }); - }; - - // Refresh cache - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); }); + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); }); - std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys; - { - std::vector<IoHash> InlineKeys; - std::vector<BlockStoreLocation> InlineLocations; - std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes; - - { - std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes; - - RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); - for (const auto& Entry : m_CacheBucket.m_Index) - { - if (Ctx.IsCancelledFlag.load()) - { - IndexLock.ReleaseNow(); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } - - PayloadIndex EntryIndex = Entry.second; - const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex]; - const DiskLocation& Loc = Payload.Location; - - if (!Loc.IsFlagSet(DiskLocation::kStructured)) - { - continue; - } - const IoHash& Key = Entry.first; - if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) - { - StandaloneKeys.push_back(std::make_pair(Key, Loc)); - continue; - } - - BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment); - size_t ChunkIndex = InlineLocations.size(); - InlineLocations.push_back(ChunkLocation); - InlineKeys.push_back(Key); - if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end()) - { - InlineBlockChunkIndexes[It->second].push_back(ChunkIndex); - } - else - { - BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size()); - InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex}); - } - } - } - - for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes) - { - ZEN_ASSERT(!ChunkIndexes.empty()); - - bool Continue = m_CacheBucket.m_BlockStore.IterateBlock( - InlineLocations, - ChunkIndexes, - [&](size_t ChunkIndex, const void* Data, uint64_t Size) { - ZEN_UNUSED(ChunkIndex, Size); - GetAttachments(Data); - return !Ctx.IsCancelledFlag.load(); - }, - [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) { - ZEN_UNUSED(ChunkIndex); - GetAttachments(File.GetChunk(Offset, Size).GetData()); - return !Ctx.IsCancelledFlag.load(); - }); - - if (!Continue && Ctx.IsCancelledFlag.load()) - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } - } - } - for (const auto& It : StandaloneKeys) - { - if (Ctx.IsCancelledFlag.load()) - { - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); - return; - } + RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock); + bool Continue = m_CacheBucket.GetReferencesLocked(Ctx, m_References); + IndexLock.ReleaseNow(); - IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(It.second, It.first); - if (Buffer) - { - GetAttachments(Buffer.GetData()); - } - } + if (!Continue) + { + m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); } } - virtual void LockState(GcCtx& Ctx) override + virtual void UpdateLockedState(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Z$::Bucket::LockState"); + ZEN_TRACE_CPU("Z$::Bucket::UpdateLockedState"); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3536,32 +3541,28 @@ public: } ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}", m_CacheBucket.m_BucketDir, - m_PrecachedReferences.size() + m_UncachedReferences.size(), + m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock); if (Ctx.IsCancelledFlag.load()) { - m_UncachedReferences = {}; - m_IndexLock.reset(); - m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); }); + m_References = {}; + m_CacheBucket.m_TrackedReferences.reset(); return; } ZEN_ASSERT(m_CacheBucket.m_TrackedReferences); HashSet& AddedReferences(*m_CacheBucket.m_TrackedReferences); - m_UncachedReferences.reserve(AddedReferences.size()); - m_UncachedReferences.insert(m_UncachedReferences.end(), AddedReferences.begin(), AddedReferences.end()); + m_References.reserve(m_References.size() + AddedReferences.size()); + m_References.insert(m_References.end(), AddedReferences.begin(), AddedReferences.end()); AddedReferences = {}; } virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet"); - - ZEN_ASSERT(m_IndexLock); size_t InitialCount = IoCids.size(); Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -3576,18 +3577,7 @@ public: NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - for (const IoHash& ReferenceHash : m_PrecachedReferences) - { - if (IoCids.erase(ReferenceHash) == 1) - { - if (IoCids.empty()) - { - return; - } - } - } - - for (const IoHash& ReferenceHash : m_UncachedReferences) + for (const IoHash& ReferenceHash : m_References) { if (IoCids.erase(ReferenceHash) == 1) { @@ -3598,10 +3588,8 @@ public: } } } - CacheBucket& m_CacheBucket; - std::unique_ptr<RwLock::SharedLockScope> m_IndexLock; - std::vector<IoHash> m_PrecachedReferences; - std::vector<IoHash> m_UncachedReferences; + CacheBucket& m_CacheBucket; + std::vector<IoHash> m_References; }; std::vector<GcReferenceChecker*> @@ -3673,6 +3661,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&, Reset(m_FreeMemCachedPayloads); } +RwLock::SharedLockScope +ZenCacheDiskLayer::CacheBucket::GetGcReferencerLock() +{ + return RwLock::SharedLockScope(m_IndexLock); +} + #if ZEN_WITH_TESTS void ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time) @@ -3763,7 +3757,12 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) CacheBucket* Result = Bucket.get(); m_Buckets.emplace(BucketName, std::move(Bucket)); - + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedBuckets) + { + m_CapturedBuckets->push_back(std::string(BucketName)); + } + }); return Result; } @@ -4317,6 +4316,60 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st return Details; } +std::vector<RwLock::SharedLockScope> +ZenCacheDiskLayer::GetGcReferencerLocks() +{ + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_Lock)); + for (auto& Kv : m_Buckets) + { + Locks.emplace_back(Kv.second->GetGcReferencerLock()); + } + return Locks; +} + +void +ZenCacheDiskLayer::EnableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedBuckets); + m_CapturedBuckets = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedBuckets); + } + m_UpdateCaptureRefCounter++; + }); +} + +void +ZenCacheDiskLayer::DisableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedBuckets); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedBuckets.reset(); + } + }); +} + +std::vector<std::string> +ZenCacheDiskLayer::GetCapturedBuckets() +{ + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedBuckets) + { + return *m_CapturedBuckets; + } + return {}; +} + void ZenCacheDiskLayer::MemCacheTrim() { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index d9c2d3e59..d7d594af7 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -375,6 +375,23 @@ ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const st return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter); } +std::vector<RwLock::SharedLockScope> +ZenCacheNamespace::GetGcReferencerLocks() +{ + return m_DiskLayer.GetGcReferencerLocks(); +} + +void +ZenCacheNamespace::EnableUpdateCapture() +{ + m_DiskLayer.EnableUpdateCapture(); +} +void +ZenCacheNamespace::DisableUpdateCapture() +{ + m_DiskLayer.DisableUpdateCapture(); +} + #if ZEN_WITH_TESTS void ZenCacheNamespace::SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time) @@ -439,11 +456,15 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName), m_Configuration.NamespaceConfig); } + m_Gc.AddGcReferencer(*this); + m_Gc.AddGcReferenceLocker(*this); } ZenCacheStore::~ZenCacheStore() { ZEN_INFO("closing cache store at '{}'", m_BasePath); + m_Gc.RemoveGcReferenceLocker(*this); + m_Gc.RemoveGcReferencer(*this); SetLoggingConfig({.EnableWriteLog = false, .EnableAccessLog = false}); m_Namespaces.clear(); } @@ -844,6 +865,14 @@ ZenCacheStore::GetNamespace(std::string_view Namespace) m_JobQueue, m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace), m_Configuration.NamespaceConfig)); + + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedNamespaces) + { + m_CapturedNamespaces->push_back(std::string(Namespace)); + } + }); + return NewNamespace.first->second.get(); } @@ -1003,6 +1032,212 @@ ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view Bu return {}; } +std::vector<RwLock::SharedLockScope> +ZenCacheStore::LockState(GcCtx&) +{ + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_NamespacesLock)); + for (auto& NamespaceIt : m_Namespaces) + { + std::vector<RwLock::SharedLockScope> NamespaceLocks = NamespaceIt.second->GetGcReferencerLocks(); + for (auto It = std::make_move_iterator(NamespaceLocks.begin()); It != std::make_move_iterator(NamespaceLocks.end()); It++) + { + Locks.emplace_back(std::move(*It)); + } + } + return Locks; +} + +void +ZenCacheStore::EnableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedNamespaces); + m_CapturedNamespaces = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedNamespaces); + } + m_UpdateCaptureRefCounter++; + }); + for (auto& NamespaceIt : m_Namespaces) + { + NamespaceIt.second->EnableUpdateCapture(); + } +} + +void +ZenCacheStore::DisableUpdateCapture() +{ + for (auto& NamespaceIt : m_Namespaces) + { + NamespaceIt.second->DisableUpdateCapture(); + } + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedNamespaces); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedNamespaces.reset(); + } + }); +} + +std::vector<std::string> +ZenCacheStore::GetCapturedNamespaces() +{ + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedNamespaces) + { + return *m_CapturedNamespaces; + } + return {}; +} + +std::string +ZenCacheStore::GetGcName(GcCtx&) +{ + return fmt::format("zencachestore: '{}'", m_BasePath.string()); +} + +GcStoreCompactor* +ZenCacheStore::RemoveExpiredData(GcCtx&, GcStats&) +{ + return nullptr; +} + +class CacheStoreReferenceChecker : public GcReferenceChecker +{ +public: + CacheStoreReferenceChecker(ZenCacheStore& InCacheStore) : m_CacheStore(InCacheStore) { m_CacheStore.EnableUpdateCapture(); } + + virtual ~CacheStoreReferenceChecker() + { + try + { + m_CacheStore.DisableUpdateCapture(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~CacheStoreReferenceChecker threw exception: '{}'", Ex.what()); + } + } + + virtual std::string GetGcName(GcCtx&) override { return "cachestore"; } + virtual void PreCache(GcCtx&) override {} + virtual void UpdateLockedState(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Z$::UpdateLockedState"); + + Stopwatch Timer; + + std::vector<ZenCacheDiskLayer::CacheBucket*> AddedBuckets; + + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachestore [LOCKSTATE] '{}': found {} references in {} in {} new buckets", + "cachestore", + m_References.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + AddedBuckets.size()); + }); + + std::vector<std::string> AddedNamespaces = m_CacheStore.GetCapturedNamespaces(); + + for (const std::string& AddedNamespace : AddedNamespaces) + { + if (auto It = m_CacheStore.m_Namespaces.find(AddedNamespace); It != m_CacheStore.m_Namespaces.end()) + { + ZenCacheNamespace& Namespace = *It->second; + for (auto& BucketKV : Namespace.m_DiskLayer.m_Buckets) + { + AddedBuckets.push_back(BucketKV.second.get()); + } + } + } + for (auto& NamepaceKV : m_CacheStore.m_Namespaces) + { + ZenCacheNamespace& Namespace = *NamepaceKV.second; + std::vector<std::string> NamespaceAddedBuckets = Namespace.m_DiskLayer.GetCapturedBuckets(); + for (const std::string& AddedBucketName : NamespaceAddedBuckets) + { + if (auto It = Namespace.m_DiskLayer.m_Buckets.find(AddedBucketName); It != Namespace.m_DiskLayer.m_Buckets.end()) + { + AddedBuckets.push_back(It->second.get()); + } + } + } + + for (ZenCacheDiskLayer::CacheBucket* Bucket : AddedBuckets) + { + bool Continue = Bucket->GetReferencesLocked(Ctx, m_References); + if (!Continue) + { + break; + } + } + } + + virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override + { + ZEN_TRACE_CPU("Z$::RemoveUsedReferencesFromSet"); + + size_t InitialCount = IoCids.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + "projectstore", + InitialCount - IoCids.size(), + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + + for (const IoHash& ReferenceHash : m_References) + { + if (IoCids.erase(ReferenceHash) == 1) + { + if (IoCids.empty()) + { + return; + } + } + } + } + +private: + ZenCacheStore& m_CacheStore; + std::vector<IoHash> m_References; +}; + +std::vector<GcReferenceChecker*> +ZenCacheStore::CreateReferenceCheckers(GcCtx& Ctx) +{ + ZEN_TRACE_CPU("CacheStore::CreateReferenceCheckers"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: cachestore [CREATE CHECKERS] '{}': completed in {}", m_BasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + std::vector<GcReferenceChecker*> Checkers; + Checkers.emplace_back(new CacheStoreReferenceChecker(*this)); + return Checkers; +} + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index e8cf6ec5e..8db34b9c5 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -382,7 +382,7 @@ WriteReferencerStats(CbObjectWriter& Writer, const GcReferencerStats& Stats, boo Writer << "CreateReferenceCheckers" << ToTimeSpan(Stats.CreateReferenceCheckersMS); Writer << "PreCacheState" << ToTimeSpan(Stats.PreCacheStateMS); - Writer << "LockState" << ToTimeSpan(Stats.LockStateMS); + Writer << "UpdateLockedState" << ToTimeSpan(Stats.UpdateLockedStateMS); Writer << "Elapsed" << ToTimeSpan(Stats.ElapsedMS); }; @@ -452,6 +452,7 @@ WriteGCResult(CbObjectWriter& Writer, const GcResult& Result, bool HumanReadable Writer << "CreateReferenceCheckers" << ToTimeSpan(Result.CreateReferenceCheckersMS); Writer << "PreCacheState" << ToTimeSpan(Result.PreCacheStateMS); Writer << "LockState" << ToTimeSpan(Result.LockStateMS); + Writer << "UpdateLockedState" << ToTimeSpan(Result.UpdateLockedStateMS); Writer << "CreateReferencePruners" << ToTimeSpan(Result.CreateReferencePrunersMS); Writer << "RemoveUnreferencedData" << ToTimeSpan(Result.RemoveUnreferencedDataMS); @@ -510,7 +511,7 @@ void Sum(GcReferencerStats& Stat) { Stat.ElapsedMS = Stat.RemoveExpiredDataStats.ElapsedMS + Stat.CompactStoreStats.ElapsedMS + Stat.CreateReferenceCheckersMS + - Stat.PreCacheStateMS + Stat.LockStateMS; + Stat.PreCacheStateMS + Stat.UpdateLockedStateMS; } void @@ -521,7 +522,7 @@ Add(GcReferencerStats& Sum, const GcReferencerStats& Sub) Sum.CreateReferenceCheckersMS += Sub.CreateReferenceCheckersMS; Sum.PreCacheStateMS += Sub.PreCacheStateMS; - Sum.LockStateMS += Sub.LockStateMS; + Sum.UpdateLockedStateMS += Sub.UpdateLockedStateMS; Sum.ElapsedMS += Sub.ElapsedMS; } @@ -584,6 +585,19 @@ GcManager::RemoveGcReferencer(GcReferencer& Referencer) } void +GcManager::AddGcReferenceLocker(GcReferenceLocker& ReferenceLocker) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_GcReferencerLockers.push_back(&ReferenceLocker); +} +void +GcManager::RemoveGcReferenceLocker(GcReferenceLocker& ReferenceLocker) +{ + RwLock::ExclusiveLockScope _(m_Lock); + std::erase_if(m_GcReferencerLockers, [&](GcReferenceLocker* $) { return $ == &ReferenceLocker; }); +} + +void GcManager::AddGcReferenceStore(GcReferenceStore& ReferenceStore) { RwLock::ExclusiveLockScope _(m_Lock); @@ -879,59 +893,90 @@ GcManager::CollectGarbage(const GcSettings& Settings) } } + std::vector<RwLock::SharedLockScope> LockerScopes; SCOPED_TIMER(uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); Result.WriteBlockMS = std::chrono::milliseconds(ElapsedMS); ZEN_INFO("GCV2: Writes blocked for {}", NiceTimeSpanMs(ElapsedMS))); { - ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size()); if (!ReferenceCheckers.empty()) { if (CheckGCCancel()) { return Sum(Result, true); } - ZEN_TRACE_CPU("GcV2::LockState"); - - // Locking all references checkers so we have a steady state of which references are used - // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until - // we delete the ReferenceCheckers - Latch WorkLeft(1); - + ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size()); { - SCOPED_TIMER(Result.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()); - if (Ctx.Settings.Verbose) { - ZEN_INFO("GCV2: Locked state using {} reference checkers in {}", - ReferenceCheckers.size(), - NiceTimeSpanMs(Result.LockStateMS.count())); - }); - for (auto& It : ReferenceCheckers) + ZEN_TRACE_CPU("GcV2::LockReferencers"); + // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until + // we delete the ReferenceLockers + Latch WorkLeft(1); { - if (CheckGCCancel()) + SCOPED_TIMER(Result.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()); + if (Ctx.Settings.Verbose) { + ZEN_INFO("GCV2: Locked referencers using {} reference lockers in {}", + ReferenceCheckers.size(), + NiceTimeSpanMs(Result.LockStateMS.count())); + }); + for (GcReferenceLocker* ReferenceLocker : m_GcReferencerLockers) { - WorkLeft.CountDown(); - WorkLeft.Wait(); - return Sum(Result, true); - } - - GcReferenceChecker* Checker = It.first.get(); - size_t Index = It.second; - std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index]; - WorkLeft.AddCount(1); - ThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() { - auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); - try + std::vector<RwLock::SharedLockScope> LockScopes = ReferenceLocker->LockState(Ctx); + for (auto It = std::make_move_iterator(LockScopes.begin()); + It != std::make_move_iterator(LockScopes.end()); + It++) { - SCOPED_TIMER(Stats->second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());); - Checker->LockState(Ctx); + LockerScopes.emplace_back(std::move(*It)); } - catch (const std::exception& Ex) + } + } + } + ZEN_INFO("GCV2: Updating locked state for {} reference checkers", ReferenceCheckers.size()); + { + ZEN_TRACE_CPU("GcV2::UpdateLockedState"); + + // Locking all references checkers so we have a steady state of which references are used + // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until + // we delete the ReferenceCheckers + Latch WorkLeft(1); + + { + SCOPED_TIMER(Result.UpdateLockedStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()); + if (Ctx.Settings.Verbose) { + ZEN_INFO("GCV2: Updated locked state using {} reference checkers in {}", + ReferenceCheckers.size(), + NiceTimeSpanMs(Result.UpdateLockedStateMS.count())); + }); + for (auto& It : ReferenceCheckers) + { + if (CheckGCCancel()) { - ZEN_ERROR("GCV2: Failed locking state for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what()); - SetCancelGC(true); + WorkLeft.CountDown(); + WorkLeft.Wait(); + return Sum(Result, true); } - }); + + GcReferenceChecker* Checker = It.first.get(); + size_t Index = It.second; + std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index]; + WorkLeft.AddCount(1); + ThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() { + auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); }); + try + { + SCOPED_TIMER(Stats->second.UpdateLockedStateMS = + std::chrono::milliseconds(Timer.GetElapsedTimeMs());); + Checker->UpdateLockedState(Ctx); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'", + Checker->GetGcName(Ctx), + Ex.what()); + SetCancelGC(true); + } + }); + } + WorkLeft.CountDown(); + WorkLeft.Wait(); } - WorkLeft.CountDown(); - WorkLeft.Wait(); } } } @@ -1020,6 +1065,7 @@ GcManager::CollectGarbage(const GcSettings& Settings) } } // Let the GcReferencers add new data, we will only change on-disk data at this point, adding new data is allowed + LockerScopes.clear(); ReferenceCheckers.clear(); ReferencePruners.clear(); } diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 9dee4d3f7..537f4396a 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -197,6 +197,12 @@ public: CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; + std::vector<RwLock::SharedLockScope> GetGcReferencerLocks(); + + void EnableUpdateCapture(); + void DisableUpdateCapture(); + std::vector<std::string> GetCapturedBuckets(); + #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS @@ -227,6 +233,8 @@ public: void ScrubStorage(ScrubContext& Ctx); void GatherReferences(GcContext& GcCtx); void CollectGarbage(GcContext& GcCtx); + RwLock::SharedLockScope GetGcReferencerLock(); + bool GetReferencesLocked(GcCtx& Ctx, std::vector<IoHash>& OutReferences); inline GcStorageSize StorageSize() const { @@ -461,12 +469,16 @@ private: mutable RwLock m_Lock; std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets; std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets; + mutable RwLock m_UpdateCaptureLock; + uint32_t m_UpdateCaptureRefCounter = 0; + std::unique_ptr<std::vector<std::string>> m_CapturedBuckets; ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete; ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete; friend class DiskBucketStoreCompactor; friend class DiskBucketReferenceChecker; + friend class CacheStoreReferenceChecker; }; } // namespace zen diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 7460d01ce..9160db667 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -48,6 +48,7 @@ class JobQueue; projects from each other. */ + class ZenCacheNamespace final : public GcStorage, public GcContributor { public: @@ -118,6 +119,11 @@ public: CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; + std::vector<RwLock::SharedLockScope> GetGcReferencerLocks(); + + void EnableUpdateCapture(); + void DisableUpdateCapture(); + #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS @@ -137,6 +143,8 @@ private: ZenCacheNamespace(const ZenCacheNamespace&) = delete; ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete; + + friend class CacheStoreReferenceChecker; }; /** Cache store interface @@ -145,7 +153,7 @@ private: */ -class ZenCacheStore final : public RefCounted, public StatsProvider +class ZenCacheStore final : public RefCounted, public StatsProvider, public GcReferencer, public GcReferenceLocker { public: static constexpr std::string_view DefaultNamespace = @@ -271,6 +279,16 @@ public: // StatsProvider virtual void ReportMetrics(StatsMetrics& Statsd) override; + virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override; + + virtual std::string GetGcName(GcCtx& Ctx) override; + virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + + void EnableUpdateCapture(); + void DisableUpdateCapture(); + std::vector<std::string> GetCapturedNamespaces(); + private: const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const; ZenCacheNamespace* GetNamespace(std::string_view Namespace); @@ -283,6 +301,9 @@ private: mutable RwLock m_NamespacesLock; NamespaceMap m_Namespaces; std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces; + mutable RwLock m_UpdateCaptureLock; + uint32_t m_UpdateCaptureRefCounter = 0; + std::unique_ptr<std::vector<std::string>> m_CapturedNamespaces; GcManager& m_Gc; JobQueue& m_JobQueue; @@ -314,6 +335,8 @@ private: std::thread m_AsyncLoggingThread; std::atomic_bool m_WriteLogEnabled; std::atomic_bool m_AccessLogEnabled; + + friend class CacheStoreReferenceChecker; }; void structured_cachestore_forcelink(); diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h index 5262c6d2e..c3a71baa6 100644 --- a/src/zenstore/include/zenstore/gc.h +++ b/src/zenstore/include/zenstore/gc.h @@ -88,7 +88,7 @@ struct GcReferencerStats std::chrono::milliseconds CreateReferenceCheckersMS = {}; std::chrono::milliseconds PreCacheStateMS = {}; - std::chrono::milliseconds LockStateMS = {}; + std::chrono::milliseconds UpdateLockedStateMS = {}; std::chrono::milliseconds ElapsedMS = {}; }; @@ -115,6 +115,7 @@ struct GcResult std::chrono::milliseconds CreateReferenceCheckersMS = {}; std::chrono::milliseconds PreCacheStateMS = {}; std::chrono::milliseconds LockStateMS = {}; + std::chrono::milliseconds UpdateLockedStateMS = {}; std::chrono::milliseconds CreateReferencePrunersMS = {}; std::chrono::milliseconds RemoveUnreferencedDataMS = {}; @@ -177,13 +178,19 @@ public: virtual std::string GetGcName(GcCtx& Ctx) = 0; + // Read as much of the current state - nothing is locked for you here so you need to lock as appropriate virtual void PreCache(GcCtx& Ctx) = 0; - // Lock the state and make sure no references changes, usually a read-lock is taken until the destruction - // of the instance. Called once before any calls to RemoveUsedReferencesFromSet - // The implementation should be as fast as possible as LockState is part of a stop the world (from changes) - // until all instances of GcReferenceChecker are deleted - virtual void LockState(GcCtx& Ctx) = 0; + // Update the state after all ReferenceCheckers has completed PreCache and all ReferenceLockers has + // completed their LockState operation. + // At this stage all data that UpdateLockedState needs to touch should be locked by the ReferenceLockers. + // *IMPORTANT* Do *not* take any locks (shared or exclusive) in this code. + // This is because we need to acquire the locks in an ordered manner and not end up in a deadlock due to other code + // trying to get exclusive locks halfway through our execution. + // Called once before any calls to RemoveUsedReferencesFromSet. + // The implementation should be as fast as possible as UpdateLockedState is part of a stop the world (from changes) + // until all instances of GcReferenceChecker UpdateLockedState are completed + virtual void UpdateLockedState(GcCtx& Ctx) = 0; // Go through IoCids and see which ones are referenced. If it is the reference must be removed from IoCids // This function should use pre-cached information on what is referenced as we are in stop the world mode @@ -191,6 +198,22 @@ public: }; /** + * @brief An interface to implement a lock for Stop The World (from writing new data) + * + * This interface is registered/unregistered to GcManager vua AddGcReferenceLocker() and RemoveGcReferenceLockerr() + */ +class GcReferenceLocker +{ +public: + virtual ~GcReferenceLocker() = default; + + // Take all the locks needed to execute UpdateLockedState for the all the GcReferenceChecker in your domain + // Once all the GcReferenceChecker has executed UpdateLockedState and RemoveUsedReferencesFromSet for all + // domains has completed, the locks will be disposed and writes are allowed once again + virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) = 0; +}; + +/** * @brief Interface to handle GC of data that references Cid data * * This interface is registered/unregistered to GcManager vua AddGcReferencer() and RemoveGcReferencer() @@ -203,7 +226,7 @@ protected: public: virtual std::string GetGcName(GcCtx& Ctx) = 0; - // Remove expired data based on either GcCtx::Settings CacheExpireTime/ProjectExpireTime + // Remove expired data based on either GcCtx::Settings CacheExpireTime or ProjectExpireTime virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) = 0; // Create 0-n GcReferenceChecker for this GcReferencer. Caller will manage lifetime of @@ -350,6 +373,9 @@ public: void AddGcReferencer(GcReferencer& Referencer); void RemoveGcReferencer(GcReferencer& Referencer); + void AddGcReferenceLocker(GcReferenceLocker& ReferenceLocker); + void RemoveGcReferenceLocker(GcReferenceLocker& ReferenceLocker); + void AddGcReferenceStore(GcReferenceStore& ReferenceStore); void RemoveGcReferenceStore(GcReferenceStore& ReferenceStore); @@ -382,8 +408,9 @@ private: CidStore* m_CidStore = nullptr; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; - std::vector<GcReferencer*> m_GcReferencers; - std::vector<GcReferenceStore*> m_GcReferenceStores; + std::vector<GcReferencer*> m_GcReferencers; + std::vector<GcReferenceLocker*> m_GcReferencerLockers; + std::vector<GcReferenceStore*> m_GcReferenceStores; std::atomic_bool m_CancelGC{false}; }; |