aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-06-13 08:53:01 +0200
committerGitHub Enterprise <[email protected]>2024-06-13 08:53:01 +0200
commitb71d52375e41a084e661d0f55f044ca8982312a4 (patch)
treef44e74a13e29d50ab36d1eebfdb4c7e606d879a9
parent5.5.3-pre1 (diff)
downloadzen-b71d52375e41a084e661d0f55f044ca8982312a4.tar.xz
zen-b71d52375e41a084e661d0f55f044ca8982312a4.zip
Make sure we monitor for new project, oplogs, namespaces and buckets during GCv2 (#93)
- Bugfix: Make sure we monitor and include new project/oplogs created during GCv2 - Bugfix: Make sure we monitor and include new namespaces/cache buckets created during GCv2
-rw-r--r--CHANGELOG.md2
-rw-r--r--src/zencore/include/zencore/thread.h10
-rw-r--r--src/zenserver/projectstore/projectstore.cpp469
-rw-r--r--src/zenserver/projectstore/projectstore.h63
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp305
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp235
-rw-r--r--src/zenstore/gc.cpp124
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h12
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h25
-rw-r--r--src/zenstore/include/zenstore/gc.h45
10 files changed, 998 insertions, 292 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4461ccc0e..38024fed0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -77,6 +77,8 @@
- `--alias` - alias name of the share - replaces `--workspace` and `--share` options
- `--chunks` - the chunk ids for the chunk or the share local paths for the chunk
- Bugfix: Removed test data added at current folder when running test
+- Bugfix: Make sure we monitor and include new project/oplogs created during GCv2
+- Bugfix: Make sure we monitor and include new namespaces/cache buckets created during GCv2
- Improvement: Various minor optimizations in cache package formatting
- Improvement: Add batch fetch of cache values in the GetCacheValues request
- Improvement: Use a smaller thread pool for network operations when doing oplog import to reduce risk of NIC/router failure
diff --git a/src/zencore/include/zencore/thread.h b/src/zencore/include/zencore/thread.h
index bae630db9..9362802a1 100644
--- a/src/zencore/include/zencore/thread.h
+++ b/src/zencore/include/zencore/thread.h
@@ -35,6 +35,16 @@ public:
struct SharedLockScope
{
+ SharedLockScope(const SharedLockScope& Rhs) = delete;
+ SharedLockScope(SharedLockScope&& Rhs) : m_Lock(Rhs.m_Lock) { Rhs.m_Lock = nullptr; }
+ SharedLockScope& operator=(SharedLockScope&& Rhs)
+ {
+ ReleaseNow();
+ m_Lock = Rhs.m_Lock;
+ Rhs.m_Lock = nullptr;
+ return *this;
+ }
+ SharedLockScope& operator=(const SharedLockScope& Rhs) = delete;
SharedLockScope(RwLock& Lock) : m_Lock(&Lock) { Lock.AcquireShared(); }
~SharedLockScope() { ReleaseNow(); }
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 4c434c39b..beac1ab97 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1094,6 +1094,12 @@ void
ProjectStore::Oplog::IterateOplog(std::function<void(CbObjectView)>&& Handler)
{
RwLock::SharedLockScope _(m_OplogLock);
+ IterateOplogLocked(std::move(Handler));
+}
+
+void
+ProjectStore::Oplog::IterateOplogLocked(std::function<void(CbObjectView)>&& Handler)
+{
if (!m_Storage)
{
return;
@@ -1264,22 +1270,25 @@ ProjectStore::Oplog::AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid:
}
void
-ProjectStore::Oplog::CaptureUpdatedLSN(RwLock::ExclusiveLockScope&, uint32_t LSN)
+ProjectStore::Oplog::CaptureUpdatedLSNs(std::span<const uint32_t> LSNs)
{
- if (m_UpdatedLSNs)
- {
- m_UpdatedLSNs->push_back(LSN);
- }
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_CapturedLSNs)
+ {
+ m_CapturedLSNs->reserve(m_CapturedLSNs->size() + LSNs.size());
+ m_CapturedLSNs->insert(m_CapturedLSNs->end(), LSNs.begin(), LSNs.end());
+ }
+ });
}
void
ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes)
{
- m_OplogLock.WithExclusiveLock([this, AttachmentHashes]() {
- if (m_NonGCAttachments)
+ m_UpdateCaptureLock.WithExclusiveLock([this, AttachmentHashes]() {
+ if (m_CapturedAttachments)
{
- m_NonGCAttachments->reserve(m_NonGCAttachments->size() + AttachmentHashes.size());
- m_NonGCAttachments->insert(m_NonGCAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end());
+ m_CapturedAttachments->reserve(m_CapturedAttachments->size() + AttachmentHashes.size());
+ m_CapturedAttachments->insert(m_CapturedAttachments->end(), AttachmentHashes.begin(), AttachmentHashes.end());
}
});
}
@@ -1287,18 +1296,18 @@ ProjectStore::Oplog::CaptureAddedAttachments(std::span<const IoHash> AttachmentH
void
ProjectStore::Oplog::EnableUpdateCapture()
{
- m_OplogLock.WithExclusiveLock([&]() {
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
if (m_UpdateCaptureRefCounter == 0)
{
- ZEN_ASSERT(!m_UpdatedLSNs);
- ZEN_ASSERT(!m_NonGCAttachments);
- m_UpdatedLSNs = std::make_unique<std::vector<int>>();
- m_NonGCAttachments = std::make_unique<std::vector<IoHash>>();
+ ZEN_ASSERT(!m_CapturedLSNs);
+ ZEN_ASSERT(!m_CapturedAttachments);
+ m_CapturedLSNs = std::make_unique<std::vector<uint32_t>>();
+ m_CapturedAttachments = std::make_unique<std::vector<IoHash>>();
}
else
{
- ZEN_ASSERT(m_UpdatedLSNs);
- ZEN_ASSERT(m_NonGCAttachments);
+ ZEN_ASSERT(m_CapturedLSNs);
+ ZEN_ASSERT(m_CapturedAttachments);
}
m_UpdateCaptureRefCounter++;
});
@@ -1307,51 +1316,49 @@ ProjectStore::Oplog::EnableUpdateCapture()
void
ProjectStore::Oplog::DisableUpdateCapture()
{
- m_OplogLock.WithExclusiveLock([&]() {
- ZEN_ASSERT(m_UpdatedLSNs);
- ZEN_ASSERT(m_NonGCAttachments);
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ ZEN_ASSERT(m_CapturedLSNs);
+ ZEN_ASSERT(m_CapturedAttachments);
ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
m_UpdateCaptureRefCounter--;
if (m_UpdateCaptureRefCounter == 0)
{
- m_UpdatedLSNs.reset();
- m_NonGCAttachments.reset();
+ m_CapturedLSNs.reset();
+ m_CapturedAttachments.reset();
}
});
}
void
-ProjectStore::Oplog::IterateUpdatedLSNs(RwLock::SharedLockScope&, std::function<bool(const CbObjectView& UpdateOp)>&& Callback)
+ProjectStore::Oplog::IterateCapturedLSNs(std::function<bool(const CbObjectView& UpdateOp)>&& Callback)
{
- if (m_UpdatedLSNs)
- {
- if (!m_Storage)
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_CapturedLSNs)
{
- return;
- }
- for (int UpdatedLSN : *m_UpdatedLSNs)
- {
- if (const auto AddressEntryIt = m_OpAddressMap.find(UpdatedLSN); AddressEntryIt != m_OpAddressMap.end())
+ if (!m_Storage)
{
- Callback(m_Storage->GetOp(AddressEntryIt->second));
+ return;
+ }
+ for (int UpdatedLSN : *m_CapturedLSNs)
+ {
+ if (const auto AddressEntryIt = m_OpAddressMap.find(UpdatedLSN); AddressEntryIt != m_OpAddressMap.end())
+ {
+ Callback(m_Storage->GetOp(AddressEntryIt->second));
+ }
}
}
- }
+ });
}
-void
-ProjectStore::Oplog::IterateAddedAttachments(RwLock::SharedLockScope&, std::function<bool(const IoHash& RawHash)>&& Callback)
+std::vector<IoHash>
+ProjectStore::Oplog::GetCapturedAttachments()
{
- if (m_NonGCAttachments)
+ RwLock::SharedLockScope _(m_UpdateCaptureLock);
+ if (m_CapturedAttachments)
{
- for (const IoHash& ReferenceHash : *m_NonGCAttachments)
- {
- if (!Callback(ReferenceHash))
- {
- break;
- }
- }
+ return *m_CapturedAttachments;
}
+ return {};
}
void
@@ -1511,9 +1518,6 @@ ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
m_LatestOpMap[OpEntry.OpKeyHash] = OpEntry.OpLsn;
-
- CaptureUpdatedLSN(OplogLock, OpEntry.OpLsn);
-
return OpEntry.OpLsn;
}
@@ -1605,6 +1609,7 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObjectView Core)
RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry);
+ CaptureUpdatedLSNs(std::array<uint32_t, 1u>({EntryId}));
return EntryId;
}
@@ -1645,6 +1650,7 @@ ProjectStore::Oplog::AppendNewOplogEntries(std::span<CbObjectView> Cores)
{
EntryIds[OpIndex] = RegisterOplogEntry(OplogLock, Mappings[OpIndex], OpEntries[OpIndex]);
}
+ CaptureUpdatedLSNs(EntryIds);
}
return EntryIds;
}
@@ -1868,6 +1874,14 @@ ProjectStore::Project::NewOplog(std::string_view OplogId, const std::filesystem:
.first->second.get();
Log->Write();
+
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_CapturedOplogs)
+ {
+ m_CapturedOplogs->push_back(std::string(OplogId));
+ }
+ });
+
return Log;
}
catch (const std::exception&)
@@ -2125,6 +2139,61 @@ ProjectStore::Project::PrepareForDelete(std::filesystem::path& OutDeletePath)
return true;
}
+void
+ProjectStore::Project::EnableUpdateCapture()
+{
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ ZEN_ASSERT(!m_CapturedOplogs);
+ m_CapturedOplogs = std::make_unique<std::vector<std::string>>();
+ }
+ else
+ {
+ ZEN_ASSERT(m_CapturedOplogs);
+ }
+ m_UpdateCaptureRefCounter++;
+ });
+}
+
+void
+ProjectStore::Project::DisableUpdateCapture()
+{
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ ZEN_ASSERT(m_CapturedOplogs);
+ ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
+ m_UpdateCaptureRefCounter--;
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ m_CapturedOplogs.reset();
+ }
+ });
+}
+
+std::vector<std::string>
+ProjectStore::Project::GetCapturedOplogs()
+{
+ RwLock::SharedLockScope _(m_UpdateCaptureLock);
+ if (m_CapturedOplogs)
+ {
+ return *m_CapturedOplogs;
+ }
+ return {};
+}
+
+std::vector<RwLock::SharedLockScope>
+ProjectStore::Project::GetGcReferencerLocks()
+{
+ std::vector<RwLock::SharedLockScope> Locks;
+ Locks.emplace_back(RwLock::SharedLockScope(m_ProjectLock));
+ Locks.reserve(1 + m_Oplogs.size());
+ for (auto& Kv : m_Oplogs)
+ {
+ Locks.emplace_back(Kv.second->GetGcReferencerLock());
+ }
+ return Locks;
+}
+
bool
ProjectStore::Project::IsExpired(const RwLock::SharedLockScope&,
const std::string& EntryName,
@@ -2221,11 +2290,13 @@ ProjectStore::ProjectStore(CidStore& Store, std::filesystem::path BasePath, GcMa
m_Gc.AddGcContributor(this);
m_Gc.AddGcStorage(this);
m_Gc.AddGcReferencer(*this);
+ m_Gc.AddGcReferenceLocker(*this);
}
ProjectStore::~ProjectStore()
{
ZEN_INFO("closing project store at '{}'", m_ProjectBasePath);
+ m_Gc.RemoveGcReferenceLocker(*this);
m_Gc.RemoveGcReferencer(*this);
m_Gc.RemoveGcStorage(this);
m_Gc.RemoveGcContributor(this);
@@ -2559,6 +2630,13 @@ ProjectStore::NewProject(const std::filesystem::path& BasePath,
Prj->ProjectFilePath = ProjectFilePath;
Prj->Write();
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_CapturedProjects)
+ {
+ m_CapturedProjects->push_back(std::string(ProjectId));
+ }
+ });
+
return Prj;
}
@@ -3753,6 +3831,48 @@ ProjectStore::AreDiskWritesAllowed() const
return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed());
}
+void
+ProjectStore::EnableUpdateCapture()
+{
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ ZEN_ASSERT(!m_CapturedProjects);
+ m_CapturedProjects = std::make_unique<std::vector<std::string>>();
+ }
+ else
+ {
+ ZEN_ASSERT(m_CapturedProjects);
+ }
+ m_UpdateCaptureRefCounter++;
+ });
+}
+
+void
+ProjectStore::DisableUpdateCapture()
+{
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ ZEN_ASSERT(m_CapturedProjects);
+ ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
+ m_UpdateCaptureRefCounter--;
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ m_CapturedProjects.reset();
+ }
+ });
+}
+
+std::vector<std::string>
+ProjectStore::GetCapturedProjects()
+{
+ RwLock::SharedLockScope _(m_UpdateCaptureLock);
+ if (m_CapturedProjects)
+ {
+ return *m_CapturedProjects;
+ }
+ return {};
+}
+
std::string
ProjectStore::GetGcName(GcCtx&)
{
@@ -3988,17 +4108,13 @@ ProjectStore::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
class ProjectStoreReferenceChecker : public GcReferenceChecker
{
public:
- ProjectStoreReferenceChecker(ProjectStore::Oplog& Owner) : m_Oplog(Owner) {}
+ ProjectStoreReferenceChecker(ProjectStore& InProjectStore) : m_ProjectStore(InProjectStore) { m_ProjectStore.EnableUpdateCapture(); }
virtual ~ProjectStoreReferenceChecker()
{
try
{
- m_OplogLock.reset();
- if (m_OplogCaptureEnabled)
- {
- m_Oplog.DisableUpdateCapture();
- }
+ m_ProjectStore.DisableUpdateCapture();
}
catch (const std::exception& Ex)
{
@@ -4006,42 +4122,190 @@ public:
}
}
- virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}'", m_Oplog.m_BasePath); }
+ virtual std::string GetGcName(GcCtx&) override { return "projectstore"; }
+ virtual void PreCache(GcCtx&) override {}
- virtual void PreCache(GcCtx& Ctx) override
+ virtual void UpdateLockedState(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Store::PreCache");
+ ZEN_TRACE_CPU("Store::UpdateLockedState");
+
+ Stopwatch Timer;
+
+ std::vector<ProjectStore::Oplog*> AddedOplogs;
- Stopwatch Timer;
const auto _ = MakeGuard([&] {
if (!Ctx.Settings.Verbose)
{
return;
}
- ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}",
- m_Oplog.m_BasePath,
+ ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} in {} new oplogs",
+ "projectstore",
m_References.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- m_Oplog.m_OuterProject->Identifier,
- m_Oplog.OplogId());
+ AddedOplogs.size());
+ });
+
+ std::vector<std::string> AddedProjects = m_ProjectStore.GetCapturedProjects();
+ for (const std::string& AddedProject : AddedProjects)
+ {
+ if (auto It = m_ProjectStore.m_Projects.find(AddedProject); It != m_ProjectStore.m_Projects.end())
+ {
+ ProjectStore::Project& Project = *It->second;
+ for (auto& OplogPair : Project.m_Oplogs)
+ {
+ ProjectStore::Oplog* Oplog = OplogPair.second.get();
+ AddedOplogs.push_back(Oplog);
+ }
+ }
+ }
+ for (auto& ProjectPair : m_ProjectStore.m_Projects)
+ {
+ ProjectStore::Project& Project = *ProjectPair.second;
+ std::vector<std::string> AddedOplogNames(Project.GetCapturedOplogs());
+ for (const std::string& OplogName : AddedOplogNames)
+ {
+ if (auto It = Project.m_Oplogs.find(OplogName); It != Project.m_Oplogs.end())
+ {
+ ProjectStore::Oplog* Oplog = It->second.get();
+ AddedOplogs.push_back(Oplog);
+ }
+ }
+ }
+
+ for (ProjectStore::Oplog* Oplog : AddedOplogs)
+ {
+ size_t BaseReferenceCount = m_References.size();
+
+ Stopwatch InnerTimer;
+ const auto __ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}",
+ Oplog->m_BasePath,
+ m_References.size() - BaseReferenceCount,
+ NiceTimeSpanMs(InnerTimer.GetElapsedTimeMs()),
+ Oplog->OplogId());
+ });
+
+ Oplog->IterateOplogLocked([&](const CbObjectView& UpdateOp) -> bool {
+ UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
+ return true;
+ });
+ }
+ }
+
+ virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
+ {
+ ZEN_TRACE_CPU("Store::RemoveUsedReferencesFromSet");
+
+ size_t InitialCount = IoCids.size();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}",
+ "projectstore",
+ InitialCount - IoCids.size(),
+ InitialCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- m_Oplog.EnableUpdateCapture();
- m_OplogCaptureEnabled = true;
+ for (const IoHash& ReferenceHash : m_References)
+ {
+ if (IoCids.erase(ReferenceHash) == 1)
+ {
+ if (IoCids.empty())
+ {
+ return;
+ }
+ }
+ }
+ }
- RwLock::SharedLockScope __(m_Oplog.m_OplogLock);
- if (Ctx.IsCancelledFlag)
+private:
+ ProjectStore& m_ProjectStore;
+ std::vector<IoHash> m_References;
+};
+
+class ProjectStoreOplogReferenceChecker : public GcReferenceChecker
+{
+public:
+ ProjectStoreOplogReferenceChecker(ProjectStore& InProjectStore, Ref<ProjectStore::Project> InProject, std::string_view InOplog)
+ : m_ProjectStore(InProjectStore)
+ , m_Project(InProject)
+ , m_OplogName(InOplog)
+ {
+ m_Project->EnableUpdateCapture();
+ }
+
+ virtual ~ProjectStoreOplogReferenceChecker()
+ {
+ try
{
- return;
+ m_Project->DisableUpdateCapture();
+ if (m_OplogCaptureEnabled)
+ {
+ ZEN_ASSERT(m_Oplog);
+ m_Oplog->DisableUpdateCapture();
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~ProjectStoreOplogReferenceChecker threw exception: '{}'", Ex.what());
}
- m_Oplog.IterateOplog([&](CbObjectView Op) {
- Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
+ }
+
+ virtual std::string GetGcName(GcCtx&) override { return fmt::format("oplog: '{}/{}'", m_Project->Identifier, m_OplogName); }
+
+ virtual void PreCache(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Store::Oplog::PreCache");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ if (m_Oplog)
+ {
+ ZEN_INFO("GCV2: projectstore [PRECACHE] '{}': precached {} references in {} from {}/{}",
+ m_Oplog->m_BasePath,
+ m_References.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ m_Project->Identifier,
+ m_Oplog->OplogId());
+ }
});
+
+ if (auto It = m_Project->m_Oplogs.find(m_OplogName); It != m_Project->m_Oplogs.end())
+ {
+ m_Oplog = It->second.get();
+ m_Oplog->EnableUpdateCapture();
+ m_OplogCaptureEnabled = true;
+
+ RwLock::SharedLockScope __(m_Oplog->m_OplogLock);
+ if (Ctx.IsCancelledFlag)
+ {
+ return;
+ }
+ m_Oplog->IterateOplog([&](CbObjectView Op) {
+ Op.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
+ });
+ }
}
- virtual void LockState(GcCtx& Ctx) override
+ virtual void UpdateLockedState(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Store::LockState");
+ ZEN_TRACE_CPU("Store::Oplog::UpdateLockedState");
+ if (!m_Oplog)
+ {
+ return;
+ }
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -4050,25 +4314,28 @@ public:
return;
}
ZEN_INFO("GCV2: projectstore [LOCKSTATE] '{}': found {} references in {} from {}/{}",
- m_Oplog.m_BasePath,
+ m_Oplog->m_BasePath,
m_References.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
- m_Oplog.m_OuterProject->Identifier,
- m_Oplog.OplogId());
+ m_Project->Identifier,
+ m_Oplog->OplogId());
});
- m_OplogLock = std::make_unique<RwLock::SharedLockScope>(m_Oplog.m_OplogLock);
- m_Oplog.IterateUpdatedLSNs(*m_OplogLock, [&](const CbObjectView& UpdateOp) -> bool {
+ m_Oplog->IterateCapturedLSNs([&](const CbObjectView& UpdateOp) -> bool {
UpdateOp.IterateAttachments([&](CbFieldView Visitor) { m_References.emplace_back(Visitor.AsAttachment()); });
return true;
});
+ std::vector<IoHash> AddedAttachments = m_Oplog->GetCapturedAttachments();
+ m_References.insert(m_References.end(), AddedAttachments.begin(), AddedAttachments.end());
}
virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
{
- ZEN_TRACE_CPU("Store::RemoveUsedReferencesFromSet");
-
- ZEN_ASSERT(m_OplogLock);
+ ZEN_TRACE_CPU("Store::Oplog::RemoveUsedReferencesFromSet");
+ if (!m_Oplog)
+ {
+ return;
+ }
size_t InitialCount = IoCids.size();
Stopwatch Timer;
@@ -4078,7 +4345,7 @@ public:
return;
}
ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}",
- m_Oplog.m_BasePath,
+ m_Oplog->m_BasePath,
InitialCount - IoCids.size(),
InitialCount,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
@@ -4094,21 +4361,13 @@ public:
}
}
}
- m_Oplog.IterateAddedAttachments(*m_OplogLock, [&](const IoHash& RawHash) -> bool {
- if (IoCids.erase(RawHash) == 1)
- {
- if (IoCids.empty())
- {
- return false;
- }
- }
- return true;
- });
}
- ProjectStore::Oplog& m_Oplog;
- std::unique_ptr<RwLock::SharedLockScope> m_OplogLock;
- std::vector<IoHash> m_References;
- bool m_OplogCaptureEnabled = false;
+ ProjectStore& m_ProjectStore;
+ Ref<ProjectStore::Project> m_Project;
+ std::string m_OplogName;
+ ProjectStore::Oplog* m_Oplog = nullptr;
+ std::vector<IoHash> m_References;
+ bool m_OplogCaptureEnabled = false;
};
std::vector<GcReferenceChecker*>
@@ -4135,6 +4394,8 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx)
DiscoverProjects();
std::vector<Ref<ProjectStore::Project>> Projects;
+ std::vector<GcReferenceChecker*> Checkers;
+ Checkers.emplace_back(new ProjectStoreReferenceChecker(*this));
{
RwLock::SharedLockScope Lock(m_ProjectsLock);
Projects.reserve(m_Projects.size());
@@ -4145,21 +4406,15 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx)
}
}
ProjectCount += Projects.size();
- std::vector<GcReferenceChecker*> Checkers;
try
{
for (const Ref<ProjectStore::Project>& Project : Projects)
{
std::vector<std::string> OpLogs = Project->ScanForOplogs();
- Checkers.reserve(OpLogs.size());
+ Checkers.reserve(Checkers.size() + OpLogs.size());
for (const std::string& OpLogId : OpLogs)
{
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId);
- if (Oplog == nullptr)
- {
- continue;
- }
- Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog));
+ Checkers.emplace_back(new ProjectStoreOplogReferenceChecker(*this, Project, OpLogId));
OplogCount++;
}
}
@@ -4177,6 +4432,22 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx)
return Checkers;
}
+std::vector<RwLock::SharedLockScope>
+ProjectStore::LockState(GcCtx&)
+{
+ std::vector<RwLock::SharedLockScope> Locks;
+ Locks.emplace_back(RwLock::SharedLockScope(m_ProjectsLock));
+ for (auto& ProjectIt : m_Projects)
+ {
+ std::vector<RwLock::SharedLockScope> ProjectLocks = ProjectIt.second->GetGcReferencerLocks();
+ for (auto It = std::make_move_iterator(ProjectLocks.begin()); It != std::make_move_iterator(ProjectLocks.end()); It++)
+ {
+ Locks.emplace_back(std::move(*It));
+ }
+ }
+ return Locks;
+}
+
//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h
index fd8443660..1b72a2688 100644
--- a/src/zenserver/projectstore/projectstore.h
+++ b/src/zenserver/projectstore/projectstore.h
@@ -58,7 +58,7 @@ static_assert(IsPow2(sizeof(OplogEntry)));
package data split into separate chunks for bulk data, exports and header
information.
*/
-class ProjectStore : public RefCounted, public GcStorage, public GcContributor, public GcReferencer
+class ProjectStore : public RefCounted, public GcStorage, public GcContributor, public GcReferencer, public GcReferenceLocker
{
struct OplogStorage;
@@ -95,6 +95,7 @@ public:
void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn);
void IterateOplog(std::function<void(CbObjectView)>&& Fn);
void IterateOplogWithKey(std::function<void(int, const Oid&, CbObjectView)>&& Fn);
+ void IterateOplogLocked(std::function<void(CbObjectView)>&& Fn);
size_t GetOplogEntryCount() const;
std::optional<CbObject> GetOpByKey(const Oid& Key);
@@ -149,13 +150,14 @@ public:
void AddChunkMappings(const std::unordered_map<Oid, IoHash, Oid::Hasher>& ChunkMappings);
- void CaptureUpdatedLSN(RwLock::ExclusiveLockScope& OplogLock, uint32_t LSN);
+ void CaptureUpdatedLSNs(std::span<const uint32_t> LSNs);
void CaptureAddedAttachments(std::span<const IoHash> AttachmentHashes);
- void EnableUpdateCapture();
- void DisableUpdateCapture();
- void IterateUpdatedLSNs(RwLock::SharedLockScope& OplogLock, std::function<bool(const CbObjectView& UpdateOp)>&& Callback);
- void IterateAddedAttachments(RwLock::SharedLockScope& OplogLock, std::function<bool(const IoHash& RawHash)>&& Callback);
+ void EnableUpdateCapture();
+ void DisableUpdateCapture();
+ void IterateCapturedLSNs(std::function<bool(const CbObjectView& UpdateOp)>&& Callback);
+ std::vector<IoHash> GetCapturedAttachments();
+ RwLock::SharedLockScope GetGcReferencerLock() { return RwLock::SharedLockScope(m_OplogLock); }
private:
struct FileMapEntry
@@ -181,12 +183,13 @@ public:
tsl::robin_map<int, OplogEntryAddress> m_OpAddressMap; // Index LSN -> op data in ops blob file
OidMap<int> m_LatestOpMap; // op key -> latest op LSN for key
- uint32_t m_UpdateCaptureRefCounter = 0;
- std::unique_ptr<std::vector<int>> m_UpdatedLSNs;
- std::unique_ptr<std::vector<IoHash>> m_NonGCAttachments;
+ mutable RwLock m_UpdateCaptureLock;
+ uint32_t m_UpdateCaptureRefCounter = 0;
+ std::unique_ptr<std::vector<uint32_t>> m_CapturedLSNs;
+ std::unique_ptr<std::vector<IoHash>> m_CapturedAttachments;
RefPtr<OplogStorage> m_Storage;
- std::string m_OplogId;
+ const std::string m_OplogId;
RefPtr<OplogStorage> GetStorage();
@@ -233,6 +236,7 @@ public:
void AddChunkMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash);
void AddMetaMapping(const RwLock::ExclusiveLockScope& OplogLock, const Oid& ChunkId, const IoHash& Hash);
+ friend class ProjectStoreOplogReferenceChecker;
friend class ProjectStoreReferenceChecker;
};
@@ -272,6 +276,12 @@ public:
uint64_t TotalSize() const;
bool PrepareForDelete(std::filesystem::path& OutDeletePath);
+ void EnableUpdateCapture();
+ void DisableUpdateCapture();
+ std::vector<std::string> GetCapturedOplogs();
+
+ std::vector<RwLock::SharedLockScope> GetGcReferencerLocks();
+
private:
ProjectStore* m_ProjectStore;
CidStore& m_CidStore;
@@ -280,6 +290,9 @@ public:
std::vector<std::unique_ptr<Oplog>> m_DeletedOplogs;
std::filesystem::path m_OplogStoragePath;
mutable tsl::robin_map<std::string, GcClock::Tick> m_LastAccessTimes;
+ mutable RwLock m_UpdateCaptureLock;
+ uint32_t m_UpdateCaptureRefCounter = 0;
+ std::unique_ptr<std::vector<std::string>> m_CapturedOplogs;
std::filesystem::path BasePathForOplog(std::string_view OplogId);
bool IsExpired(const RwLock::SharedLockScope&,
@@ -288,6 +301,9 @@ public:
const GcClock::TimePoint ExpireTime);
void WriteAccessTimes();
void ReadAccessTimes();
+
+ friend class ProjectStoreOplogReferenceChecker;
+ friend class ProjectStoreReferenceChecker;
};
// Oplog* OpenProjectOplog(std::string_view ProjectId, std::string_view OplogId);
@@ -323,6 +339,8 @@ public:
virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override;
virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
+ virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override;
+
CbArray GetProjectsList();
std::pair<HttpResponseCode, std::string> GetProjectFiles(const std::string_view ProjectId,
const std::string_view OplogId,
@@ -392,19 +410,28 @@ public:
bool AreDiskWritesAllowed() const;
+ void EnableUpdateCapture();
+ void DisableUpdateCapture();
+ std::vector<std::string> GetCapturedProjects();
+
private:
- LoggerRef m_Log;
- GcManager& m_Gc;
- CidStore& m_CidStore;
- JobQueue& m_JobQueue;
- std::filesystem::path m_ProjectBasePath;
- mutable RwLock m_ProjectsLock;
- std::map<std::string, Ref<Project>> m_Projects;
- const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
+ LoggerRef m_Log;
+ GcManager& m_Gc;
+ CidStore& m_CidStore;
+ JobQueue& m_JobQueue;
+ std::filesystem::path m_ProjectBasePath;
+ mutable RwLock m_ProjectsLock;
+ std::map<std::string, Ref<Project>> m_Projects;
+ const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
+ mutable RwLock m_UpdateCaptureLock;
+ uint32_t m_UpdateCaptureRefCounter = 0;
+ std::unique_ptr<std::vector<std::string>> m_CapturedProjects;
std::filesystem::path BasePathForProject(std::string_view ProjectId);
friend class ProjectStoreGcStoreCompactor;
+ friend class ProjectStoreOplogReferenceChecker;
+ friend class ProjectStoreReferenceChecker;
};
void prj_forcelink();
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 9dd2e4a67..f865e1c3c 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -3383,6 +3383,101 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys));
}
+bool
+ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHash>& OutReferences)
+{
+ auto GetAttachments = [&](const void* CbObjectData) {
+ CbObjectView Obj(CbObjectData);
+ Obj.IterateAttachments([&](CbFieldView Field) { OutReferences.emplace_back(Field.AsAttachment()); });
+ };
+
+ std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys;
+ {
+ std::vector<IoHash> InlineKeys;
+ std::vector<BlockStoreLocation> InlineLocations;
+ std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes;
+
+ {
+ std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes;
+
+ for (const auto& Entry : m_Index)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+
+ PayloadIndex EntryIndex = Entry.second;
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+ const IoHash& Key = Entry.first;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneKeys.push_back(std::make_pair(Key, Loc));
+ continue;
+ }
+
+ BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_Configuration.PayloadAlignment);
+ size_t ChunkIndex = InlineLocations.size();
+ InlineLocations.push_back(ChunkLocation);
+ InlineKeys.push_back(Key);
+ if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end())
+ {
+ InlineBlockChunkIndexes[It->second].push_back(ChunkIndex);
+ }
+ else
+ {
+ BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size());
+ InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex});
+ }
+ }
+ }
+
+ for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes)
+ {
+ ZEN_ASSERT(!ChunkIndexes.empty());
+
+ bool Continue = m_BlockStore.IterateBlock(
+ InlineLocations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ZEN_UNUSED(ChunkIndex, Size);
+ GetAttachments(Data);
+ return !Ctx.IsCancelledFlag.load();
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ ZEN_UNUSED(ChunkIndex);
+ GetAttachments(File.GetChunk(Offset, Size).GetData());
+ return !Ctx.IsCancelledFlag.load();
+ });
+
+ if (!Continue && Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+ }
+ }
+ for (const auto& It : StandaloneKeys)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+
+ IoBuffer Buffer = GetStandaloneCacheValue(It.second, It.first);
+ if (Buffer)
+ {
+ GetAttachments(Buffer.GetData());
+ }
+ }
+ return true;
+}
+
class DiskBucketReferenceChecker : public GcReferenceChecker
{
using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex;
@@ -3396,7 +3491,6 @@ public:
{
try
{
- m_IndexLock.reset();
m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
}
catch (const std::exception& Ex)
@@ -3419,114 +3513,25 @@ public:
}
ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}",
m_CacheBucket.m_BucketDir,
- m_PrecachedReferences.size(),
+ m_References.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- auto GetAttachments = [&](const void* CbObjectData) {
- CbObjectView Obj(CbObjectData);
- Obj.IterateAttachments([&](CbFieldView Field) { m_PrecachedReferences.emplace_back(Field.AsAttachment()); });
- };
-
- // Refresh cache
- {
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); });
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); });
- std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys;
- {
- std::vector<IoHash> InlineKeys;
- std::vector<BlockStoreLocation> InlineLocations;
- std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes;
-
- {
- std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes;
-
- RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
- for (const auto& Entry : m_CacheBucket.m_Index)
- {
- if (Ctx.IsCancelledFlag.load())
- {
- IndexLock.ReleaseNow();
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
- return;
- }
-
- PayloadIndex EntryIndex = Entry.second;
- const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex];
- const DiskLocation& Loc = Payload.Location;
-
- if (!Loc.IsFlagSet(DiskLocation::kStructured))
- {
- continue;
- }
- const IoHash& Key = Entry.first;
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- StandaloneKeys.push_back(std::make_pair(Key, Loc));
- continue;
- }
-
- BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment);
- size_t ChunkIndex = InlineLocations.size();
- InlineLocations.push_back(ChunkLocation);
- InlineKeys.push_back(Key);
- if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end())
- {
- InlineBlockChunkIndexes[It->second].push_back(ChunkIndex);
- }
- else
- {
- BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size());
- InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex});
- }
- }
- }
-
- for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes)
- {
- ZEN_ASSERT(!ChunkIndexes.empty());
-
- bool Continue = m_CacheBucket.m_BlockStore.IterateBlock(
- InlineLocations,
- ChunkIndexes,
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- ZEN_UNUSED(ChunkIndex, Size);
- GetAttachments(Data);
- return !Ctx.IsCancelledFlag.load();
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- ZEN_UNUSED(ChunkIndex);
- GetAttachments(File.GetChunk(Offset, Size).GetData());
- return !Ctx.IsCancelledFlag.load();
- });
-
- if (!Continue && Ctx.IsCancelledFlag.load())
- {
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
- return;
- }
- }
- }
- for (const auto& It : StandaloneKeys)
- {
- if (Ctx.IsCancelledFlag.load())
- {
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
- return;
- }
+ RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
+ bool Continue = m_CacheBucket.GetReferencesLocked(Ctx, m_References);
+ IndexLock.ReleaseNow();
- IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(It.second, It.first);
- if (Buffer)
- {
- GetAttachments(Buffer.GetData());
- }
- }
+ if (!Continue)
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
}
}
- virtual void LockState(GcCtx& Ctx) override
+ virtual void UpdateLockedState(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Z$::Bucket::LockState");
+ ZEN_TRACE_CPU("Z$::Bucket::UpdateLockedState");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3536,32 +3541,28 @@ public:
}
ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}",
m_CacheBucket.m_BucketDir,
- m_PrecachedReferences.size() + m_UncachedReferences.size(),
+ m_References.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock);
if (Ctx.IsCancelledFlag.load())
{
- m_UncachedReferences = {};
- m_IndexLock.reset();
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
+ m_References = {};
+ m_CacheBucket.m_TrackedReferences.reset();
return;
}
ZEN_ASSERT(m_CacheBucket.m_TrackedReferences);
HashSet& AddedReferences(*m_CacheBucket.m_TrackedReferences);
- m_UncachedReferences.reserve(AddedReferences.size());
- m_UncachedReferences.insert(m_UncachedReferences.end(), AddedReferences.begin(), AddedReferences.end());
+ m_References.reserve(m_References.size() + AddedReferences.size());
+ m_References.insert(m_References.end(), AddedReferences.begin(), AddedReferences.end());
AddedReferences = {};
}
virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
{
ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet");
-
- ZEN_ASSERT(m_IndexLock);
size_t InitialCount = IoCids.size();
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3576,18 +3577,7 @@ public:
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- for (const IoHash& ReferenceHash : m_PrecachedReferences)
- {
- if (IoCids.erase(ReferenceHash) == 1)
- {
- if (IoCids.empty())
- {
- return;
- }
- }
- }
-
- for (const IoHash& ReferenceHash : m_UncachedReferences)
+ for (const IoHash& ReferenceHash : m_References)
{
if (IoCids.erase(ReferenceHash) == 1)
{
@@ -3598,10 +3588,8 @@ public:
}
}
}
- CacheBucket& m_CacheBucket;
- std::unique_ptr<RwLock::SharedLockScope> m_IndexLock;
- std::vector<IoHash> m_PrecachedReferences;
- std::vector<IoHash> m_UncachedReferences;
+ CacheBucket& m_CacheBucket;
+ std::vector<IoHash> m_References;
};
std::vector<GcReferenceChecker*>
@@ -3673,6 +3661,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
Reset(m_FreeMemCachedPayloads);
}
+RwLock::SharedLockScope
+ZenCacheDiskLayer::CacheBucket::GetGcReferencerLock()
+{
+ return RwLock::SharedLockScope(m_IndexLock);
+}
+
#if ZEN_WITH_TESTS
void
ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time)
@@ -3763,7 +3757,12 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
CacheBucket* Result = Bucket.get();
m_Buckets.emplace(BucketName, std::move(Bucket));
-
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_CapturedBuckets)
+ {
+ m_CapturedBuckets->push_back(std::string(BucketName));
+ }
+ });
return Result;
}
@@ -4317,6 +4316,60 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st
return Details;
}
+std::vector<RwLock::SharedLockScope>
+ZenCacheDiskLayer::GetGcReferencerLocks()
+{
+ std::vector<RwLock::SharedLockScope> Locks;
+ Locks.emplace_back(RwLock::SharedLockScope(m_Lock));
+ for (auto& Kv : m_Buckets)
+ {
+ Locks.emplace_back(Kv.second->GetGcReferencerLock());
+ }
+ return Locks;
+}
+
+void
+ZenCacheDiskLayer::EnableUpdateCapture()
+{
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ ZEN_ASSERT(!m_CapturedBuckets);
+ m_CapturedBuckets = std::make_unique<std::vector<std::string>>();
+ }
+ else
+ {
+ ZEN_ASSERT(m_CapturedBuckets);
+ }
+ m_UpdateCaptureRefCounter++;
+ });
+}
+
+void
+ZenCacheDiskLayer::DisableUpdateCapture()
+{
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ ZEN_ASSERT(m_CapturedBuckets);
+ ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
+ m_UpdateCaptureRefCounter--;
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ m_CapturedBuckets.reset();
+ }
+ });
+}
+
+std::vector<std::string>
+ZenCacheDiskLayer::GetCapturedBuckets()
+{
+ RwLock::SharedLockScope _(m_UpdateCaptureLock);
+ if (m_CapturedBuckets)
+ {
+ return *m_CapturedBuckets;
+ }
+ return {};
+}
+
void
ZenCacheDiskLayer::MemCacheTrim()
{
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index d9c2d3e59..d7d594af7 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -375,6 +375,23 @@ ZenCacheNamespace::GetValueDetails(const std::string_view BucketFilter, const st
return m_DiskLayer.GetValueDetails(BucketFilter, ValueFilter);
}
+std::vector<RwLock::SharedLockScope>
+ZenCacheNamespace::GetGcReferencerLocks()
+{
+ return m_DiskLayer.GetGcReferencerLocks();
+}
+
+void
+ZenCacheNamespace::EnableUpdateCapture()
+{
+ m_DiskLayer.EnableUpdateCapture();
+}
+void
+ZenCacheNamespace::DisableUpdateCapture()
+{
+ m_DiskLayer.DisableUpdateCapture();
+}
+
#if ZEN_WITH_TESTS
void
ZenCacheNamespace::SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time)
@@ -439,11 +456,15 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc,
m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, NamespaceName),
m_Configuration.NamespaceConfig);
}
+ m_Gc.AddGcReferencer(*this);
+ m_Gc.AddGcReferenceLocker(*this);
}
ZenCacheStore::~ZenCacheStore()
{
ZEN_INFO("closing cache store at '{}'", m_BasePath);
+ m_Gc.RemoveGcReferenceLocker(*this);
+ m_Gc.RemoveGcReferencer(*this);
SetLoggingConfig({.EnableWriteLog = false, .EnableAccessLog = false});
m_Namespaces.clear();
}
@@ -844,6 +865,14 @@ ZenCacheStore::GetNamespace(std::string_view Namespace)
m_JobQueue,
m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace),
m_Configuration.NamespaceConfig));
+
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_CapturedNamespaces)
+ {
+ m_CapturedNamespaces->push_back(std::string(Namespace));
+ }
+ });
+
return NewNamespace.first->second.get();
}
@@ -1003,6 +1032,212 @@ ZenCacheStore::GetBucketInfo(std::string_view NamespaceName, std::string_view Bu
return {};
}
+std::vector<RwLock::SharedLockScope>
+ZenCacheStore::LockState(GcCtx&)
+{
+ std::vector<RwLock::SharedLockScope> Locks;
+ Locks.emplace_back(RwLock::SharedLockScope(m_NamespacesLock));
+ for (auto& NamespaceIt : m_Namespaces)
+ {
+ std::vector<RwLock::SharedLockScope> NamespaceLocks = NamespaceIt.second->GetGcReferencerLocks();
+ for (auto It = std::make_move_iterator(NamespaceLocks.begin()); It != std::make_move_iterator(NamespaceLocks.end()); It++)
+ {
+ Locks.emplace_back(std::move(*It));
+ }
+ }
+ return Locks;
+}
+
+void
+ZenCacheStore::EnableUpdateCapture()
+{
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ ZEN_ASSERT(!m_CapturedNamespaces);
+ m_CapturedNamespaces = std::make_unique<std::vector<std::string>>();
+ }
+ else
+ {
+ ZEN_ASSERT(m_CapturedNamespaces);
+ }
+ m_UpdateCaptureRefCounter++;
+ });
+ for (auto& NamespaceIt : m_Namespaces)
+ {
+ NamespaceIt.second->EnableUpdateCapture();
+ }
+}
+
+void
+ZenCacheStore::DisableUpdateCapture()
+{
+ for (auto& NamespaceIt : m_Namespaces)
+ {
+ NamespaceIt.second->DisableUpdateCapture();
+ }
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ ZEN_ASSERT(m_CapturedNamespaces);
+ ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
+ m_UpdateCaptureRefCounter--;
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ m_CapturedNamespaces.reset();
+ }
+ });
+}
+
+std::vector<std::string>
+ZenCacheStore::GetCapturedNamespaces()
+{
+ RwLock::SharedLockScope _(m_UpdateCaptureLock);
+ if (m_CapturedNamespaces)
+ {
+ return *m_CapturedNamespaces;
+ }
+ return {};
+}
+
+std::string
+ZenCacheStore::GetGcName(GcCtx&)
+{
+ return fmt::format("zencachestore: '{}'", m_BasePath.string());
+}
+
+GcStoreCompactor*
+ZenCacheStore::RemoveExpiredData(GcCtx&, GcStats&)
+{
+ return nullptr;
+}
+
+class CacheStoreReferenceChecker : public GcReferenceChecker
+{
+public:
+ CacheStoreReferenceChecker(ZenCacheStore& InCacheStore) : m_CacheStore(InCacheStore) { m_CacheStore.EnableUpdateCapture(); }
+
+ virtual ~CacheStoreReferenceChecker()
+ {
+ try
+ {
+ m_CacheStore.DisableUpdateCapture();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("~CacheStoreReferenceChecker threw exception: '{}'", Ex.what());
+ }
+ }
+
+ virtual std::string GetGcName(GcCtx&) override { return "cachestore"; }
+ virtual void PreCache(GcCtx&) override {}
+ virtual void UpdateLockedState(GcCtx& Ctx) override
+ {
+ ZEN_TRACE_CPU("Z$::UpdateLockedState");
+
+ Stopwatch Timer;
+
+ std::vector<ZenCacheDiskLayer::CacheBucket*> AddedBuckets;
+
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachestore [LOCKSTATE] '{}': found {} references in {} in {} new buckets",
+ "cachestore",
+ m_References.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()),
+ AddedBuckets.size());
+ });
+
+ std::vector<std::string> AddedNamespaces = m_CacheStore.GetCapturedNamespaces();
+
+ for (const std::string& AddedNamespace : AddedNamespaces)
+ {
+ if (auto It = m_CacheStore.m_Namespaces.find(AddedNamespace); It != m_CacheStore.m_Namespaces.end())
+ {
+ ZenCacheNamespace& Namespace = *It->second;
+ for (auto& BucketKV : Namespace.m_DiskLayer.m_Buckets)
+ {
+ AddedBuckets.push_back(BucketKV.second.get());
+ }
+ }
+ }
+ for (auto& NamepaceKV : m_CacheStore.m_Namespaces)
+ {
+ ZenCacheNamespace& Namespace = *NamepaceKV.second;
+ std::vector<std::string> NamespaceAddedBuckets = Namespace.m_DiskLayer.GetCapturedBuckets();
+ for (const std::string& AddedBucketName : NamespaceAddedBuckets)
+ {
+ if (auto It = Namespace.m_DiskLayer.m_Buckets.find(AddedBucketName); It != Namespace.m_DiskLayer.m_Buckets.end())
+ {
+ AddedBuckets.push_back(It->second.get());
+ }
+ }
+ }
+
+ for (ZenCacheDiskLayer::CacheBucket* Bucket : AddedBuckets)
+ {
+ bool Continue = Bucket->GetReferencesLocked(Ctx, m_References);
+ if (!Continue)
+ {
+ break;
+ }
+ }
+ }
+
+ virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
+ {
+ ZEN_TRACE_CPU("Z$::RemoveUsedReferencesFromSet");
+
+ size_t InitialCount = IoCids.size();
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}",
+ "projectstore",
+ InitialCount - IoCids.size(),
+ InitialCount,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+
+ for (const IoHash& ReferenceHash : m_References)
+ {
+ if (IoCids.erase(ReferenceHash) == 1)
+ {
+ if (IoCids.empty())
+ {
+ return;
+ }
+ }
+ }
+ }
+
+private:
+ ZenCacheStore& m_CacheStore;
+ std::vector<IoHash> m_References;
+};
+
+std::vector<GcReferenceChecker*>
+ZenCacheStore::CreateReferenceCheckers(GcCtx& Ctx)
+{
+ ZEN_TRACE_CPU("CacheStore::CreateReferenceCheckers");
+
+ Stopwatch Timer;
+ const auto _ = MakeGuard([&] {
+ if (!Ctx.Settings.Verbose)
+ {
+ return;
+ }
+ ZEN_INFO("GCV2: cachestore [CREATE CHECKERS] '{}': completed in {}", m_BasePath, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ });
+ std::vector<GcReferenceChecker*> Checkers;
+ Checkers.emplace_back(new CacheStoreReferenceChecker(*this));
+ return Checkers;
+}
+
//////////////////////////////////////////////////////////////////////////
#if ZEN_WITH_TESTS
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index e8cf6ec5e..8db34b9c5 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -382,7 +382,7 @@ WriteReferencerStats(CbObjectWriter& Writer, const GcReferencerStats& Stats, boo
Writer << "CreateReferenceCheckers" << ToTimeSpan(Stats.CreateReferenceCheckersMS);
Writer << "PreCacheState" << ToTimeSpan(Stats.PreCacheStateMS);
- Writer << "LockState" << ToTimeSpan(Stats.LockStateMS);
+ Writer << "UpdateLockedState" << ToTimeSpan(Stats.UpdateLockedStateMS);
Writer << "Elapsed" << ToTimeSpan(Stats.ElapsedMS);
};
@@ -452,6 +452,7 @@ WriteGCResult(CbObjectWriter& Writer, const GcResult& Result, bool HumanReadable
Writer << "CreateReferenceCheckers" << ToTimeSpan(Result.CreateReferenceCheckersMS);
Writer << "PreCacheState" << ToTimeSpan(Result.PreCacheStateMS);
Writer << "LockState" << ToTimeSpan(Result.LockStateMS);
+ Writer << "UpdateLockedState" << ToTimeSpan(Result.UpdateLockedStateMS);
Writer << "CreateReferencePruners" << ToTimeSpan(Result.CreateReferencePrunersMS);
Writer << "RemoveUnreferencedData" << ToTimeSpan(Result.RemoveUnreferencedDataMS);
@@ -510,7 +511,7 @@ void
Sum(GcReferencerStats& Stat)
{
Stat.ElapsedMS = Stat.RemoveExpiredDataStats.ElapsedMS + Stat.CompactStoreStats.ElapsedMS + Stat.CreateReferenceCheckersMS +
- Stat.PreCacheStateMS + Stat.LockStateMS;
+ Stat.PreCacheStateMS + Stat.UpdateLockedStateMS;
}
void
@@ -521,7 +522,7 @@ Add(GcReferencerStats& Sum, const GcReferencerStats& Sub)
Sum.CreateReferenceCheckersMS += Sub.CreateReferenceCheckersMS;
Sum.PreCacheStateMS += Sub.PreCacheStateMS;
- Sum.LockStateMS += Sub.LockStateMS;
+ Sum.UpdateLockedStateMS += Sub.UpdateLockedStateMS;
Sum.ElapsedMS += Sub.ElapsedMS;
}
@@ -584,6 +585,19 @@ GcManager::RemoveGcReferencer(GcReferencer& Referencer)
}
void
+GcManager::AddGcReferenceLocker(GcReferenceLocker& ReferenceLocker)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_GcReferencerLockers.push_back(&ReferenceLocker);
+}
+void
+GcManager::RemoveGcReferenceLocker(GcReferenceLocker& ReferenceLocker)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ std::erase_if(m_GcReferencerLockers, [&](GcReferenceLocker* $) { return $ == &ReferenceLocker; });
+}
+
+void
GcManager::AddGcReferenceStore(GcReferenceStore& ReferenceStore)
{
RwLock::ExclusiveLockScope _(m_Lock);
@@ -879,59 +893,90 @@ GcManager::CollectGarbage(const GcSettings& Settings)
}
}
+ std::vector<RwLock::SharedLockScope> LockerScopes;
SCOPED_TIMER(uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); Result.WriteBlockMS = std::chrono::milliseconds(ElapsedMS);
ZEN_INFO("GCV2: Writes blocked for {}", NiceTimeSpanMs(ElapsedMS)));
{
- ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size());
if (!ReferenceCheckers.empty())
{
if (CheckGCCancel())
{
return Sum(Result, true);
}
- ZEN_TRACE_CPU("GcV2::LockState");
-
- // Locking all references checkers so we have a steady state of which references are used
- // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until
- // we delete the ReferenceCheckers
- Latch WorkLeft(1);
-
+ ZEN_INFO("GCV2: Locking state for {} reference checkers", ReferenceCheckers.size());
{
- SCOPED_TIMER(Result.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());
- if (Ctx.Settings.Verbose) {
- ZEN_INFO("GCV2: Locked state using {} reference checkers in {}",
- ReferenceCheckers.size(),
- NiceTimeSpanMs(Result.LockStateMS.count()));
- });
- for (auto& It : ReferenceCheckers)
+ ZEN_TRACE_CPU("GcV2::LockReferencers");
+ // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until
+ // we delete the ReferenceLockers
+ Latch WorkLeft(1);
{
- if (CheckGCCancel())
+ SCOPED_TIMER(Result.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());
+ if (Ctx.Settings.Verbose) {
+ ZEN_INFO("GCV2: Locked referencers using {} reference lockers in {}",
+ ReferenceCheckers.size(),
+ NiceTimeSpanMs(Result.LockStateMS.count()));
+ });
+ for (GcReferenceLocker* ReferenceLocker : m_GcReferencerLockers)
{
- WorkLeft.CountDown();
- WorkLeft.Wait();
- return Sum(Result, true);
- }
-
- GcReferenceChecker* Checker = It.first.get();
- size_t Index = It.second;
- std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index];
- WorkLeft.AddCount(1);
- ThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() {
- auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
- try
+ std::vector<RwLock::SharedLockScope> LockScopes = ReferenceLocker->LockState(Ctx);
+ for (auto It = std::make_move_iterator(LockScopes.begin());
+ It != std::make_move_iterator(LockScopes.end());
+ It++)
{
- SCOPED_TIMER(Stats->second.LockStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
- Checker->LockState(Ctx);
+ LockerScopes.emplace_back(std::move(*It));
}
- catch (const std::exception& Ex)
+ }
+ }
+ }
+ ZEN_INFO("GCV2: Updating locked state for {} reference checkers", ReferenceCheckers.size());
+ {
+ ZEN_TRACE_CPU("GcV2::UpdateLockedState");
+
+ // Locking all references checkers so we have a steady state of which references are used
+ // From this point we have blocked all writes to all References (DiskBucket/ProjectStore) until
+ // we delete the ReferenceCheckers
+ Latch WorkLeft(1);
+
+ {
+ SCOPED_TIMER(Result.UpdateLockedStateMS = std::chrono::milliseconds(Timer.GetElapsedTimeMs());
+ if (Ctx.Settings.Verbose) {
+ ZEN_INFO("GCV2: Updated locked state using {} reference checkers in {}",
+ ReferenceCheckers.size(),
+ NiceTimeSpanMs(Result.UpdateLockedStateMS.count()));
+ });
+ for (auto& It : ReferenceCheckers)
+ {
+ if (CheckGCCancel())
{
- ZEN_ERROR("GCV2: Failed locking state for {}. Reason: '{}'", Checker->GetGcName(Ctx), Ex.what());
- SetCancelGC(true);
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
+ return Sum(Result, true);
}
- });
+
+ GcReferenceChecker* Checker = It.first.get();
+ size_t Index = It.second;
+ std::pair<std::string, GcReferencerStats>* Stats = &Result.ReferencerStats[Index];
+ WorkLeft.AddCount(1);
+ ThreadPool.ScheduleWork([this, &Ctx, Checker, Index, Stats, &WorkLeft]() {
+ auto _ = MakeGuard([&WorkLeft]() { WorkLeft.CountDown(); });
+ try
+ {
+ SCOPED_TIMER(Stats->second.UpdateLockedStateMS =
+ std::chrono::milliseconds(Timer.GetElapsedTimeMs()););
+ Checker->UpdateLockedState(Ctx);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("GCV2: Failed Updating locked state for {}. Reason: '{}'",
+ Checker->GetGcName(Ctx),
+ Ex.what());
+ SetCancelGC(true);
+ }
+ });
+ }
+ WorkLeft.CountDown();
+ WorkLeft.Wait();
}
- WorkLeft.CountDown();
- WorkLeft.Wait();
}
}
}
@@ -1020,6 +1065,7 @@ GcManager::CollectGarbage(const GcSettings& Settings)
}
}
// Let the GcReferencers add new data, we will only change on-disk data at this point, adding new data is allowed
+ LockerScopes.clear();
ReferenceCheckers.clear();
ReferencePruners.clear();
}
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 9dee4d3f7..537f4396a 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -197,6 +197,12 @@ public:
CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const;
+ std::vector<RwLock::SharedLockScope> GetGcReferencerLocks();
+
+ void EnableUpdateCapture();
+ void DisableUpdateCapture();
+ std::vector<std::string> GetCapturedBuckets();
+
#if ZEN_WITH_TESTS
void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
#endif // ZEN_WITH_TESTS
@@ -227,6 +233,8 @@ public:
void ScrubStorage(ScrubContext& Ctx);
void GatherReferences(GcContext& GcCtx);
void CollectGarbage(GcContext& GcCtx);
+ RwLock::SharedLockScope GetGcReferencerLock();
+ bool GetReferencesLocked(GcCtx& Ctx, std::vector<IoHash>& OutReferences);
inline GcStorageSize StorageSize() const
{
@@ -461,12 +469,16 @@ private:
mutable RwLock m_Lock;
std::unordered_map<std::string, std::unique_ptr<CacheBucket>> m_Buckets;
std::vector<std::unique_ptr<CacheBucket>> m_DroppedBuckets;
+ mutable RwLock m_UpdateCaptureLock;
+ uint32_t m_UpdateCaptureRefCounter = 0;
+ std::unique_ptr<std::vector<std::string>> m_CapturedBuckets;
ZenCacheDiskLayer(const ZenCacheDiskLayer&) = delete;
ZenCacheDiskLayer& operator=(const ZenCacheDiskLayer&) = delete;
friend class DiskBucketStoreCompactor;
friend class DiskBucketReferenceChecker;
+ friend class CacheStoreReferenceChecker;
};
} // namespace zen
diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 7460d01ce..9160db667 100644
--- a/src/zenstore/include/zenstore/cache/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -48,6 +48,7 @@ class JobQueue;
projects from each other.
*/
+
class ZenCacheNamespace final : public GcStorage, public GcContributor
{
public:
@@ -118,6 +119,11 @@ public:
CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const;
+ std::vector<RwLock::SharedLockScope> GetGcReferencerLocks();
+
+ void EnableUpdateCapture();
+ void DisableUpdateCapture();
+
#if ZEN_WITH_TESTS
void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time);
#endif // ZEN_WITH_TESTS
@@ -137,6 +143,8 @@ private:
ZenCacheNamespace(const ZenCacheNamespace&) = delete;
ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete;
+
+ friend class CacheStoreReferenceChecker;
};
/** Cache store interface
@@ -145,7 +153,7 @@ private:
*/
-class ZenCacheStore final : public RefCounted, public StatsProvider
+class ZenCacheStore final : public RefCounted, public StatsProvider, public GcReferencer, public GcReferenceLocker
{
public:
static constexpr std::string_view DefaultNamespace =
@@ -271,6 +279,16 @@ public:
// StatsProvider
virtual void ReportMetrics(StatsMetrics& Statsd) override;
+ virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) override;
+
+ virtual std::string GetGcName(GcCtx& Ctx) override;
+ virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override;
+ virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override;
+
+ void EnableUpdateCapture();
+ void DisableUpdateCapture();
+ std::vector<std::string> GetCapturedNamespaces();
+
private:
const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const;
ZenCacheNamespace* GetNamespace(std::string_view Namespace);
@@ -283,6 +301,9 @@ private:
mutable RwLock m_NamespacesLock;
NamespaceMap m_Namespaces;
std::vector<std::unique_ptr<ZenCacheNamespace>> m_DroppedNamespaces;
+ mutable RwLock m_UpdateCaptureLock;
+ uint32_t m_UpdateCaptureRefCounter = 0;
+ std::unique_ptr<std::vector<std::string>> m_CapturedNamespaces;
GcManager& m_Gc;
JobQueue& m_JobQueue;
@@ -314,6 +335,8 @@ private:
std::thread m_AsyncLoggingThread;
std::atomic_bool m_WriteLogEnabled;
std::atomic_bool m_AccessLogEnabled;
+
+ friend class CacheStoreReferenceChecker;
};
void structured_cachestore_forcelink();
diff --git a/src/zenstore/include/zenstore/gc.h b/src/zenstore/include/zenstore/gc.h
index 5262c6d2e..c3a71baa6 100644
--- a/src/zenstore/include/zenstore/gc.h
+++ b/src/zenstore/include/zenstore/gc.h
@@ -88,7 +88,7 @@ struct GcReferencerStats
std::chrono::milliseconds CreateReferenceCheckersMS = {};
std::chrono::milliseconds PreCacheStateMS = {};
- std::chrono::milliseconds LockStateMS = {};
+ std::chrono::milliseconds UpdateLockedStateMS = {};
std::chrono::milliseconds ElapsedMS = {};
};
@@ -115,6 +115,7 @@ struct GcResult
std::chrono::milliseconds CreateReferenceCheckersMS = {};
std::chrono::milliseconds PreCacheStateMS = {};
std::chrono::milliseconds LockStateMS = {};
+ std::chrono::milliseconds UpdateLockedStateMS = {};
std::chrono::milliseconds CreateReferencePrunersMS = {};
std::chrono::milliseconds RemoveUnreferencedDataMS = {};
@@ -177,13 +178,19 @@ public:
virtual std::string GetGcName(GcCtx& Ctx) = 0;
+ // Read as much of the current state - nothing is locked for you here so you need to lock as appropriate
virtual void PreCache(GcCtx& Ctx) = 0;
- // Lock the state and make sure no references changes, usually a read-lock is taken until the destruction
- // of the instance. Called once before any calls to RemoveUsedReferencesFromSet
- // The implementation should be as fast as possible as LockState is part of a stop the world (from changes)
- // until all instances of GcReferenceChecker are deleted
- virtual void LockState(GcCtx& Ctx) = 0;
+ // Update the state after all ReferenceCheckers has completed PreCache and all ReferenceLockers has
+ // completed their LockState operation.
+ // At this stage all data that UpdateLockedState needs to touch should be locked by the ReferenceLockers.
+ // *IMPORTANT* Do *not* take any locks (shared or exclusive) in this code.
+ // This is because we need to acquire the locks in an ordered manner and not end up in a deadlock due to other code
+ // trying to get exclusive locks halfway through our execution.
+ // Called once before any calls to RemoveUsedReferencesFromSet.
+ // The implementation should be as fast as possible as UpdateLockedState is part of a stop the world (from changes)
+ // until all instances of GcReferenceChecker UpdateLockedState are completed
+ virtual void UpdateLockedState(GcCtx& Ctx) = 0;
// Go through IoCids and see which ones are referenced. If it is the reference must be removed from IoCids
// This function should use pre-cached information on what is referenced as we are in stop the world mode
@@ -191,6 +198,22 @@ public:
};
/**
+ * @brief An interface to implement a lock for Stop The World (from writing new data)
+ *
+ * This interface is registered/unregistered to GcManager vua AddGcReferenceLocker() and RemoveGcReferenceLockerr()
+ */
+class GcReferenceLocker
+{
+public:
+ virtual ~GcReferenceLocker() = default;
+
+ // Take all the locks needed to execute UpdateLockedState for the all the GcReferenceChecker in your domain
+ // Once all the GcReferenceChecker has executed UpdateLockedState and RemoveUsedReferencesFromSet for all
+ // domains has completed, the locks will be disposed and writes are allowed once again
+ virtual std::vector<RwLock::SharedLockScope> LockState(GcCtx& Ctx) = 0;
+};
+
+/**
* @brief Interface to handle GC of data that references Cid data
*
* This interface is registered/unregistered to GcManager vua AddGcReferencer() and RemoveGcReferencer()
@@ -203,7 +226,7 @@ protected:
public:
virtual std::string GetGcName(GcCtx& Ctx) = 0;
- // Remove expired data based on either GcCtx::Settings CacheExpireTime/ProjectExpireTime
+ // Remove expired data based on either GcCtx::Settings CacheExpireTime or ProjectExpireTime
virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) = 0;
// Create 0-n GcReferenceChecker for this GcReferencer. Caller will manage lifetime of
@@ -350,6 +373,9 @@ public:
void AddGcReferencer(GcReferencer& Referencer);
void RemoveGcReferencer(GcReferencer& Referencer);
+ void AddGcReferenceLocker(GcReferenceLocker& ReferenceLocker);
+ void RemoveGcReferenceLocker(GcReferenceLocker& ReferenceLocker);
+
void AddGcReferenceStore(GcReferenceStore& ReferenceStore);
void RemoveGcReferenceStore(GcReferenceStore& ReferenceStore);
@@ -382,8 +408,9 @@ private:
CidStore* m_CidStore = nullptr;
const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
- std::vector<GcReferencer*> m_GcReferencers;
- std::vector<GcReferenceStore*> m_GcReferenceStores;
+ std::vector<GcReferencer*> m_GcReferencers;
+ std::vector<GcReferenceLocker*> m_GcReferencerLockers;
+ std::vector<GcReferenceStore*> m_GcReferenceStores;
std::atomic_bool m_CancelGC{false};
};