diff options
| author | Dan Engelbrecht <[email protected]> | 2024-06-13 08:53:01 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-06-13 08:53:01 +0200 |
| commit | b71d52375e41a084e661d0f55f044ca8982312a4 (patch) | |
| tree | f44e74a13e29d50ab36d1eebfdb4c7e606d879a9 /src/zenserver | |
| parent | 5.5.3-pre1 (diff) | |
| download | zen-b71d52375e41a084e661d0f55f044ca8982312a4.tar.xz zen-b71d52375e41a084e661d0f55f044ca8982312a4.zip | |
Make sure we monitor for new project, oplogs, namespaces and buckets during GCv2 (#93)
- Bugfix: Make sure we monitor and include new project/oplogs created during GCv2
- Bugfix: Make sure we monitor and include new namespaces/cache buckets created during GCv2
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 469 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 63 |
2 files changed, 415 insertions, 117 deletions
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 4c434c39b..beac1ab97 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1094,6 +1094,12 @@ void ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler) { RwLock::SharedLockScope _(m_OplogLock); + IterateOplogLocked(std::move(Handler)); +} + +void +ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Handler) +{ if (!m_Storage) { return; @@ -1264,22 +1270,25 @@ ProjectStore::Oplog::AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid: } void -ProjectStore::Oplog::CaptureUpdatedLSN(RwLock::ExclusiveLockScope&, uint32_t LSN) +ProjectStore::Oplog::CaptureUpdatedLSNs(std::span<const uint32_t> LSNs) { - if (m_UpdatedLSNs) - { - m_UpdatedLSNs->push_back(LSN); - } + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedLSNs) + { + m_CapturedLSNs->reserve(m_CapturedLSNs->size() + LSNs.size()); + m_CapturedLSNs->insert(m_CapturedLSNs->end(), LSNs.begin(), LSNs.end()); + } + }); } void ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes) { - m_OplogLock.WithExclusiveLock([this, AttachmentHashes]() { - if (m_NonGCAttachments) + m_UpdateCaptureLock.WithExclusiveLock([this, AttachmentHashes]() { + if (m_CapturedAttachments) { - m_NonGCAttachments->reserve(m_NonGCAttachments->size() + AttachmentHashes.size()); - m_NonGCAttachments->insert(m_NonGCAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end()); + m_CapturedAttachments->reserve(m_CapturedAttachments->size() + AttachmentHashes.size()); + m_CapturedAttachments->insert(m_CapturedAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end()); } }); } @@ -1287,18 +1296,18 @@ ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentH void ProjectStore::Oplog::EnableUpdateCapture() { - m_OplogLock.WithExclusiveLock([&]() { + m_UpdateCaptureLock.WithExclusiveLock([&]() { if (m_UpdateCaptureRefCounter == 0) { - ZEN_ASSERT(!m_UpdatedLSNs); - ZEN_ASSERT(!m_NonGCAttachments); - m_UpdatedLSNs = std::make_unique<std::vector<int>>(); - m_NonGCAttachments = std::make_unique<std::vector<IoHash>>(); + ZEN_ASSERT(!m_CapturedLSNs); + ZEN_ASSERT(!m_CapturedAttachments); + m_CapturedLSNs = std::make_unique<std::vector<uint32_t>>(); + m_CapturedAttachments = std::make_unique<std::vector<IoHash>>(); } else { - ZEN_ASSERT(m_UpdatedLSNs); - ZEN_ASSERT(m_NonGCAttachments); + ZEN_ASSERT(m_CapturedLSNs); + ZEN_ASSERT(m_CapturedAttachments); } m_UpdateCaptureRefCounter++; }); @@ -1307,51 +1316,49 @@ ProjectStore::Oplog::EnableUpdateCapture() void ProjectStore::Oplog::DisableUpdateCapture() { - m_OplogLock.WithExclusiveLock([&]() { - ZEN_ASSERT(m_UpdatedLSNs); - ZEN_ASSERT(m_NonGCAttachments); + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedLSNs); + ZEN_ASSERT(m_CapturedAttachments); ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); m_UpdateCaptureRefCounter--; if (m_UpdateCaptureRefCounter == 0) { - m_UpdatedLSNs.reset(); - m_NonGCAttachments.reset(); + m_CapturedLSNs.reset(); + m_CapturedAttachments.reset(); } }); } void -ProjectStore::Oplog::IterateUpdatedLSNs(RwLock::SharedLockScope&, std::function<bool(const CbObjectView& UpdateOp)>&& Callback) +ProjectStore::Oplog::IterateCapturedLSNs(std::function<bool(const CbObjectView& UpdateOp)>&& Callback) { - if (m_UpdatedLSNs) - { - if (!m_Storage) + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedLSNs) { - return; - } - for (int UpdatedLSN : *m_UpdatedLSNs) - { - if (const auto AddressEntryIt = m_OpAddressMap.find(UpdatedLSN); AddressEntryIt != m_OpAddressMap.end()) + if (!m_Storage) { - Callback(m_Storage->GetOp(AddressEntryIt->second)); + return; + } + for (int UpdatedLSN : *m_CapturedLSNs) + { + if (const auto AddressEntryIt = m_OpAddressMap.find(UpdatedLSN); AddressEntryIt != m_OpAddressMap.end()) + { + Callback(m_Storage->GetOp(AddressEntryIt->second)); + } } } - } + }); } -void -ProjectStore::Oplog::IterateAddedAttachments(RwLock::SharedLockScope&, std::function<bool(const IoHash& RawHash)>&& Callback) +std::vector<IoHash> +ProjectStore::Oplog::GetCapturedAttachments() { - if (m_NonGCAttachments) + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedAttachments) { - for (const IoHash& ReferenceHash : *m_NonGCAttachments) - { - if (!Callback(ReferenceHash)) - { - break; - } - } + return *m_CapturedAttachments; } + return {}; } void @@ -1511,9 +1518,6 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn; - - CaptureUpdatedLSN(OplogLock, OpEntry.OpLsn); - return OpEntry.OpLsn; } @@ -1605,6 +1609,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core) RwLock::ExclusiveLockScope OplogLock(m_OplogLock); const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry); + CaptureUpdatedLSNs(std::array<uint32_t, 1u>({EntryId})); return EntryId; } @@ -1645,6 +1650,7 @@ ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores) { EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]); } + CaptureUpdatedLSNs(EntryIds); } return EntryIds; } @@ -1868,6 +1874,14 @@ ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem: .first->second.get(); Log->Write(); + + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedOplogs) + { + m_CapturedOplogs->push_back(std::string(OplogId)); + } + }); + return Log; } catch (const std::exception&) @@ -2125,6 +2139,61 @@ ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath) return true; } +void +ProjectStore::Project::EnableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedOplogs); + m_CapturedOplogs = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedOplogs); + } + m_UpdateCaptureRefCounter++; + }); +} + +void +ProjectStore::Project::DisableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedOplogs); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedOplogs.reset(); + } + }); +} + +std::vector<std::string> +ProjectStore::Project::GetCapturedOplogs() +{ + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedOplogs) + { + return *m_CapturedOplogs; + } + return {}; +} + +std::vector<RwLock::SharedLockScope> +ProjectStore::Project::GetGcReferencerLocks() +{ + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_ProjectLock)); + Locks.reserve(1 + m_Oplogs.size()); + for (auto& Kv : m_Oplogs) + { + Locks.emplace_back(Kv.second->GetGcReferencerLock()); + } + return Locks; +} + bool ProjectStore::Project::IsExpired(const RwLock::SharedLockScope&, const std::string& EntryName, @@ -2221,11 +2290,13 @@ ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcMa m_Gc.AddGcContributor(this); m_Gc.AddGcStorage(this); m_Gc.AddGcReferencer(*this); + m_Gc.AddGcReferenceLocker(*this); } ProjectStore::~ProjectStore() { ZEN_INFO("closing project store at '{}'", m_ProjectBasePath); + m_Gc.RemoveGcReferenceLocker(*this); m_Gc.RemoveGcReferencer(*this); m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcContributor(this); @@ -2559,6 +2630,13 @@ ProjectStore::NewProject(const std::filesystem::path& BasePath, Prj->ProjectFilePath = ProjectFilePath; Prj->Write(); + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_CapturedProjects) + { + m_CapturedProjects->push_back(std::string(ProjectId)); + } + }); + return Prj; } @@ -3753,6 +3831,48 @@ ProjectStore::AreDiskWritesAllowed() const return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } +void +ProjectStore::EnableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + if (m_UpdateCaptureRefCounter == 0) + { + ZEN_ASSERT(!m_CapturedProjects); + m_CapturedProjects = std::make_unique<std::vector<std::string>>(); + } + else + { + ZEN_ASSERT(m_CapturedProjects); + } + m_UpdateCaptureRefCounter++; + }); +} + +void +ProjectStore::DisableUpdateCapture() +{ + m_UpdateCaptureLock.WithExclusiveLock([&]() { + ZEN_ASSERT(m_CapturedProjects); + ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); + m_UpdateCaptureRefCounter--; + if (m_UpdateCaptureRefCounter == 0) + { + m_CapturedProjects.reset(); + } + }); +} + +std::vector<std::string> +ProjectStore::GetCapturedProjects() +{ + RwLock::SharedLockScope _(m_UpdateCaptureLock); + if (m_CapturedProjects) + { + return *m_CapturedProjects; + } + return {}; +} + std::string ProjectStore::GetGcName(GcCtx&) { @@ -3988,17 +4108,13 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) class ProjectStoreReferenceChecker : public GcReferenceChecker { public: - ProjectStoreReferenceChecker(ProjectStore::Oplog& Owner) : m_Oplog(Owner) {} + ProjectStoreReferenceChecker(ProjectStore& InProjectStore) : m_ProjectStore(InProjectStore) { m_ProjectStore.EnableUpdateCapture(); } virtual ~ProjectStoreReferenceChecker() { try { - m_OplogLock.reset(); - if (m_OplogCaptureEnabled) - { - m_Oplog.DisableUpdateCapture(); - } + m_ProjectStore.DisableUpdateCapture(); } catch (const std::exception& Ex) { @@ -4006,42 +4122,190 @@ public: } } - virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}'", m_Oplog.m_BasePath); } + virtual std::string GetGcName(GcCtx&) override { return "projectstore"; } + virtual void PreCache(GcCtx&) override {} - virtual void PreCache(GcCtx& Ctx) override + virtual void UpdateLockedState(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Store::PreCache"); + ZEN_TRACE_CPU("Store::UpdateLockedState"); + + Stopwatch Timer; + + std::vector<ProjectStore::Oplog*> AddedOplogs; - Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) { return; } - ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}", - m_Oplog.m_BasePath, + ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} in {} new oplogs", + "projectstore", m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - m_Oplog.m_OuterProject->Identifier, - m_Oplog.OplogId()); + AddedOplogs.size()); + }); + + std::vector<std::string> AddedProjects = m_ProjectStore.GetCapturedProjects(); + for (const std::string& AddedProject : AddedProjects) + { + if (auto It = m_ProjectStore.m_Projects.find(AddedProject); It != m_ProjectStore.m_Projects.end()) + { + ProjectStore::Project& Project = *It->second; + for (auto& OplogPair : Project.m_Oplogs) + { + ProjectStore::Oplog* Oplog = OplogPair.second.get(); + AddedOplogs.push_back(Oplog); + } + } + } + for (auto& ProjectPair : m_ProjectStore.m_Projects) + { + ProjectStore::Project& Project = *ProjectPair.second; + std::vector<std::string> AddedOplogNames(Project.GetCapturedOplogs()); + for (const std::string& OplogName : AddedOplogNames) + { + if (auto It = Project.m_Oplogs.find(OplogName); It != Project.m_Oplogs.end()) + { + ProjectStore::Oplog* Oplog = It->second.get(); + AddedOplogs.push_back(Oplog); + } + } + } + + for (ProjectStore::Oplog* Oplog : AddedOplogs) + { + size_t BaseReferenceCount = m_References.size(); + + Stopwatch InnerTimer; + const auto __ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}", + Oplog->m_BasePath, + m_References.size() - BaseReferenceCount, + NiceTimeSpanMs(InnerTimer.GetElapsedTimeMs()), + Oplog->OplogId()); + }); + + Oplog->IterateOplogLocked([&](const CbObjectView& UpdateOp) -> bool { + UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); + return true; + }); + } + } + + virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override + { + ZEN_TRACE_CPU("Store::RemoveUsedReferencesFromSet"); + + size_t InitialCount = IoCids.size(); + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", + "projectstore", + InitialCount - IoCids.size(), + InitialCount, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - m_Oplog.EnableUpdateCapture(); - m_OplogCaptureEnabled = true; + for (const IoHash& ReferenceHash : m_References) + { + if (IoCids.erase(ReferenceHash) == 1) + { + if (IoCids.empty()) + { + return; + } + } + } + } - RwLock::SharedLockScope __(m_Oplog.m_OplogLock); - if (Ctx.IsCancelledFlag) +private: + ProjectStore& m_ProjectStore; + std::vector<IoHash> m_References; +}; + +class ProjectStoreOplogReferenceChecker : public GcReferenceChecker +{ +public: + ProjectStoreOplogReferenceChecker(ProjectStore& InProjectStore, Ref<ProjectStore::Project> InProject, std::string_view InOplog) + : m_ProjectStore(InProjectStore) + , m_Project(InProject) + , m_OplogName(InOplog) + { + m_Project->EnableUpdateCapture(); + } + + virtual ~ProjectStoreOplogReferenceChecker() + { + try { - return; + m_Project->DisableUpdateCapture(); + if (m_OplogCaptureEnabled) + { + ZEN_ASSERT(m_Oplog); + m_Oplog->DisableUpdateCapture(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~ProjectStoreOplogReferenceChecker threw exception: '{}'", Ex.what()); } - m_Oplog.IterateOplog([&](CbObjectView Op) { - Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); + } + + virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}/{}'", m_Project->Identifier, m_OplogName); } + + virtual void PreCache(GcCtx& Ctx) override + { + ZEN_TRACE_CPU("Store::Oplog::PreCache"); + + Stopwatch Timer; + const auto _ = MakeGuard([&] { + if (!Ctx.Settings.Verbose) + { + return; + } + if (m_Oplog) + { + ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}", + m_Oplog->m_BasePath, + m_References.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs()), + m_Project->Identifier, + m_Oplog->OplogId()); + } }); + + if (auto It = m_Project->m_Oplogs.find(m_OplogName); It != m_Project->m_Oplogs.end()) + { + m_Oplog = It->second.get(); + m_Oplog->EnableUpdateCapture(); + m_OplogCaptureEnabled = true; + + RwLock::SharedLockScope __(m_Oplog->m_OplogLock); + if (Ctx.IsCancelledFlag) + { + return; + } + m_Oplog->IterateOplog([&](CbObjectView Op) { + Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); + }); + } } - virtual void LockState(GcCtx& Ctx) override + virtual void UpdateLockedState(GcCtx& Ctx) override { - ZEN_TRACE_CPU("Store::LockState"); + ZEN_TRACE_CPU("Store::Oplog::UpdateLockedState"); + if (!m_Oplog) + { + return; + } Stopwatch Timer; const auto _ = MakeGuard([&] { @@ -4050,25 +4314,28 @@ public: return; } ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}/{}", - m_Oplog.m_BasePath, + m_Oplog->m_BasePath, m_References.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs()), - m_Oplog.m_OuterProject->Identifier, - m_Oplog.OplogId()); + m_Project->Identifier, + m_Oplog->OplogId()); }); - m_OplogLock = std::make_unique<RwLock::SharedLockScope>(m_Oplog.m_OplogLock); - m_Oplog.IterateUpdatedLSNs(*m_OplogLock, [&](const CbObjectView& UpdateOp) -> bool { + m_Oplog->IterateCapturedLSNs([&](const CbObjectView& UpdateOp) -> bool { UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); }); return true; }); + std::vector<IoHash> AddedAttachments = m_Oplog->GetCapturedAttachments(); + m_References.insert(m_References.end(), AddedAttachments.begin(), AddedAttachments.end()); } virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override { - ZEN_TRACE_CPU("Store::RemoveUsedReferencesFromSet"); - - ZEN_ASSERT(m_OplogLock); + ZEN_TRACE_CPU("Store::Oplog::RemoveUsedReferencesFromSet"); + if (!m_Oplog) + { + return; + } size_t InitialCount = IoCids.size(); Stopwatch Timer; @@ -4078,7 +4345,7 @@ public: return; } ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", - m_Oplog.m_BasePath, + m_Oplog->m_BasePath, InitialCount - IoCids.size(), InitialCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); @@ -4094,21 +4361,13 @@ public: } } } - m_Oplog.IterateAddedAttachments(*m_OplogLock, [&](const IoHash& RawHash) -> bool { - if (IoCids.erase(RawHash) == 1) - { - if (IoCids.empty()) - { - return false; - } - } - return true; - }); } - ProjectStore::Oplog& m_Oplog; - std::unique_ptr<RwLock::SharedLockScope> m_OplogLock; - std::vector<IoHash> m_References; - bool m_OplogCaptureEnabled = false; + ProjectStore& m_ProjectStore; + Ref<ProjectStore::Project> m_Project; + std::string m_OplogName; + ProjectStore::Oplog* m_Oplog = nullptr; + std::vector<IoHash> m_References; + bool m_OplogCaptureEnabled = false; }; std::vector<GcReferenceChecker*> @@ -4135,6 +4394,8 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) DiscoverProjects(); std::vector<Ref<ProjectStore::Project>> Projects; + std::vector<GcReferenceChecker*> Checkers; + Checkers.emplace_back(new ProjectStoreReferenceChecker(*this)); { RwLock::SharedLockScope Lock(m_ProjectsLock); Projects.reserve(m_Projects.size()); @@ -4145,21 +4406,15 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) } } ProjectCount += Projects.size(); - std::vector<GcReferenceChecker*> Checkers; try { for (const Ref<ProjectStore::Project>& Project : Projects) { std::vector<std::string> OpLogs = Project->ScanForOplogs(); - Checkers.reserve(OpLogs.size()); + Checkers.reserve(Checkers.size() + OpLogs.size()); for (const std::string& OpLogId : OpLogs) { - ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId); - if (Oplog == nullptr) - { - continue; - } - Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog)); + Checkers.emplace_back(new ProjectStoreOplogReferenceChecker(*this, Project, OpLogId)); OplogCount++; } } @@ -4177,6 +4432,22 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) return Checkers; } +std::vector<RwLock::SharedLockScope> +ProjectStore::LockState(GcCtx&) +{ + std::vector<RwLock::SharedLockScope> Locks; + Locks.emplace_back(RwLock::SharedLockScope(m_ProjectsLock)); + for (auto& ProjectIt : m_Projects) + { + std::vector<RwLock::SharedLockScope> ProjectLocks = ProjectIt.second->GetGcReferencerLocks(); + for (auto It = std::make_move_iterator(ProjectLocks.begin()); It != std::make_move_iterator(ProjectLocks.end()); It++) + { + Locks.emplace_back(std::move(*It)); + } + } + return Locks; +} + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index fd8443660..1b72a2688 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -58,7 +58,7 @@ static_assert(IsPow2(sizeof(OplogEntry))); package data split into separate chunks for bulk data, exports and header information. */ -class ProjectStore : public RefCounted, public GcStorage, public GcContributor, public GcReferencer +class ProjectStore : public RefCounted, public GcStorage, public GcContributor, public GcReferencer, public GcReferenceLocker { struct OplogStorage; @@ -95,6 +95,7 @@ public: void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn); void IterateOplog(std::function<void(CbObjectView)>&& Fn); void IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Fn); + void IterateOplogLocked(std::function<void(CbObjectView)>&& Fn); size_t GetOplogEntryCount() const; std::optional<CbObject> GetOpByKey(const Oid& Key); @@ -149,13 +150,14 @@ public: void AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid::Hasher>& ChunkMappings); - void CaptureUpdatedLSN(RwLock::ExclusiveLockScope& OplogLock, uint32_t LSN); + void CaptureUpdatedLSNs(std::span<const uint32_t> LSNs); void CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes); - void EnableUpdateCapture(); - void DisableUpdateCapture(); - void IterateUpdatedLSNs(RwLock::SharedLockScope& OplogLock, std::function<bool(const CbObjectView& UpdateOp)>&& Callback); - void IterateAddedAttachments(RwLock::SharedLockScope& OplogLock, std::function<bool(const IoHash& RawHash)>&& Callback); + void EnableUpdateCapture(); + void DisableUpdateCapture(); + void IterateCapturedLSNs(std::function<bool(const CbObjectView& UpdateOp)>&& Callback); + std::vector<IoHash> GetCapturedAttachments(); + RwLock::SharedLockScope GetGcReferencerLock() { return RwLock::SharedLockScope(m_OplogLock); } private: struct FileMapEntry @@ -181,12 +183,13 @@ public: tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key - uint32_t m_UpdateCaptureRefCounter = 0; - std::unique_ptr<std::vector<int>> m_UpdatedLSNs; - std::unique_ptr<std::vector<IoHash>> m_NonGCAttachments; + mutable RwLock m_UpdateCaptureLock; + uint32_t m_UpdateCaptureRefCounter = 0; + std::unique_ptr<std::vector<uint32_t>> m_CapturedLSNs; + std::unique_ptr<std::vector<IoHash>> m_CapturedAttachments; RefPtr<OplogStorage> m_Storage; - std::string m_OplogId; + const std::string m_OplogId; RefPtr<OplogStorage> GetStorage(); @@ -233,6 +236,7 @@ public: void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash); + friend class ProjectStoreOplogReferenceChecker; friend class ProjectStoreReferenceChecker; }; @@ -272,6 +276,12 @@ public: uint64_t TotalSize() const; bool PrepareForDelete(std::filesystem::path& OutDeletePath); + void EnableUpdateCapture(); + void DisableUpdateCapture(); + std::vector<std::string> GetCapturedOplogs(); + + std::vector<RwLock::SharedLockScope> GetGcReferencerLocks(); + private: ProjectStore* m_ProjectStore; CidStore& m_CidStore; @@ -280,6 +290,9 @@ public: std::vector<std::unique_ptr<Oplog>> m_DeletedOplogs; std::filesystem::path m_OplogStoragePath; mutable tsl::robin_map<std::string, GcClock::Tick> m_LastAccessTimes; + mutable RwLock m_UpdateCaptureLock; + uint32_t m_UpdateCaptureRefCounter = 0; + std::unique_ptr<std::vector<std::string>> m_CapturedOplogs; std::filesystem::path BasePathForOplog(std::string_view OplogId); bool IsExpired(const RwLock::SharedLockScope&, @@ -288,6 +301,9 @@ public: const GcClock::TimePoint ExpireTime); void WriteAccessTimes(); void ReadAccessTimes(); + + friend class ProjectStoreOplogReferenceChecker; + friend class ProjectStoreReferenceChecker; }; // Oplog* OpenProjectOplog(std::string_view ProjectId, std::string_view OplogId); @@ -323,6 +339,8 @@ public: virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override; + CbArray GetProjectsList(); std::pair<HttpResponseCode, std::string> GetProjectFiles(const std::string_view ProjectId, const std::string_view OplogId, @@ -392,19 +410,28 @@ public: bool AreDiskWritesAllowed() const; + void EnableUpdateCapture(); + void DisableUpdateCapture(); + std::vector<std::string> GetCapturedProjects(); + private: - LoggerRef m_Log; - GcManager& m_Gc; - CidStore& m_CidStore; - JobQueue& m_JobQueue; - std::filesystem::path m_ProjectBasePath; - mutable RwLock m_ProjectsLock; - std::map<std::string, Ref<Project>> m_Projects; - const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; + LoggerRef m_Log; + GcManager& m_Gc; + CidStore& m_CidStore; + JobQueue& m_JobQueue; + std::filesystem::path m_ProjectBasePath; + mutable RwLock m_ProjectsLock; + std::map<std::string, Ref<Project>> m_Projects; + const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; + mutable RwLock m_UpdateCaptureLock; + uint32_t m_UpdateCaptureRefCounter = 0; + std::unique_ptr<std::vector<std::string>> m_CapturedProjects; std::filesystem::path BasePathForProject(std::string_view ProjectId); friend class ProjectStoreGcStoreCompactor; + friend class ProjectStoreOplogReferenceChecker; + friend class ProjectStoreReferenceChecker; }; void prj_forcelink(); |