aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-06-13 08:53:01 +0200
committerGitHub Enterprise <[email protected]>2024-06-13 08:53:01 +0200
commitb71d52375e41a084e661d0f55f044ca8982312a4 (patch)
treef44e74a13e29d50ab36d1eebfdb4c7e606d879a9 /src/zenstore/cache/cachedisklayer.cpp
parent5.5.3-pre1 (diff)
downloadzen-b71d52375e41a084e661d0f55f044ca8982312a4.tar.xz
zen-b71d52375e41a084e661d0f55f044ca8982312a4.zip
Make sure we monitor for new project, oplogs, namespaces and buckets during GCv2 (#93)
- Bugfix: Make sure we monitor and include new project/oplogs created during GCv2 - Bugfix: Make sure we monitor and include new namespaces/cache buckets created during GCv2
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp305
1 files changed, 179 insertions, 126 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 9dd2e4a67..f865e1c3c 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -3383,6 +3383,101 @@ ZenCacheDiskLayer::CacheBucket::RemoveExpiredData(GcCtx& Ctx, GcStats& Stats)
return new DiskBucketStoreCompactor(*this, std::move(ExpiredStandaloneKeys));
}
+bool
+ZenCacheDiskLayer::CacheBucket::GetReferencesLocked(GcCtx& Ctx, std::vector<IoHash>& OutReferences)
+{
+ auto GetAttachments = [&](const void* CbObjectData) {
+ CbObjectView Obj(CbObjectData);
+ Obj.IterateAttachments([&](CbFieldView Field) { OutReferences.emplace_back(Field.AsAttachment()); });
+ };
+
+ std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys;
+ {
+ std::vector<IoHash> InlineKeys;
+ std::vector<BlockStoreLocation> InlineLocations;
+ std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes;
+
+ {
+ std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes;
+
+ for (const auto& Entry : m_Index)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+
+ PayloadIndex EntryIndex = Entry.second;
+ const BucketPayload& Payload = m_Payloads[EntryIndex];
+ const DiskLocation& Loc = Payload.Location;
+
+ if (!Loc.IsFlagSet(DiskLocation::kStructured))
+ {
+ continue;
+ }
+ const IoHash& Key = Entry.first;
+ if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
+ {
+ StandaloneKeys.push_back(std::make_pair(Key, Loc));
+ continue;
+ }
+
+ BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_Configuration.PayloadAlignment);
+ size_t ChunkIndex = InlineLocations.size();
+ InlineLocations.push_back(ChunkLocation);
+ InlineKeys.push_back(Key);
+ if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end())
+ {
+ InlineBlockChunkIndexes[It->second].push_back(ChunkIndex);
+ }
+ else
+ {
+ BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size());
+ InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex});
+ }
+ }
+ }
+
+ for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes)
+ {
+ ZEN_ASSERT(!ChunkIndexes.empty());
+
+ bool Continue = m_BlockStore.IterateBlock(
+ InlineLocations,
+ ChunkIndexes,
+ [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
+ ZEN_UNUSED(ChunkIndex, Size);
+ GetAttachments(Data);
+ return !Ctx.IsCancelledFlag.load();
+ },
+ [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
+ ZEN_UNUSED(ChunkIndex);
+ GetAttachments(File.GetChunk(Offset, Size).GetData());
+ return !Ctx.IsCancelledFlag.load();
+ });
+
+ if (!Continue && Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+ }
+ }
+ for (const auto& It : StandaloneKeys)
+ {
+ if (Ctx.IsCancelledFlag.load())
+ {
+ return false;
+ }
+
+ IoBuffer Buffer = GetStandaloneCacheValue(It.second, It.first);
+ if (Buffer)
+ {
+ GetAttachments(Buffer.GetData());
+ }
+ }
+ return true;
+}
+
class DiskBucketReferenceChecker : public GcReferenceChecker
{
using PayloadIndex = ZenCacheDiskLayer::CacheBucket::PayloadIndex;
@@ -3396,7 +3491,6 @@ public:
{
try
{
- m_IndexLock.reset();
m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
}
catch (const std::exception& Ex)
@@ -3419,114 +3513,25 @@ public:
}
ZEN_INFO("GCV2: cachebucket [PRECACHE] '{}': found {} references in {}",
m_CacheBucket.m_BucketDir,
- m_PrecachedReferences.size(),
+ m_References.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- auto GetAttachments = [&](const void* CbObjectData) {
- CbObjectView Obj(CbObjectData);
- Obj.IterateAttachments([&](CbFieldView Field) { m_PrecachedReferences.emplace_back(Field.AsAttachment()); });
- };
-
- // Refresh cache
- {
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); });
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences = std::make_unique<HashSet>(); });
- std::vector<std::pair<IoHash, DiskLocation>> StandaloneKeys;
- {
- std::vector<IoHash> InlineKeys;
- std::vector<BlockStoreLocation> InlineLocations;
- std::vector<std::vector<std::size_t>> InlineBlockChunkIndexes;
-
- {
- std::unordered_map<uint32_t, std::size_t> BlockIndexToChunkIndexes;
-
- RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
- for (const auto& Entry : m_CacheBucket.m_Index)
- {
- if (Ctx.IsCancelledFlag.load())
- {
- IndexLock.ReleaseNow();
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
- return;
- }
-
- PayloadIndex EntryIndex = Entry.second;
- const BucketPayload& Payload = m_CacheBucket.m_Payloads[EntryIndex];
- const DiskLocation& Loc = Payload.Location;
-
- if (!Loc.IsFlagSet(DiskLocation::kStructured))
- {
- continue;
- }
- const IoHash& Key = Entry.first;
- if (Loc.IsFlagSet(DiskLocation::kStandaloneFile))
- {
- StandaloneKeys.push_back(std::make_pair(Key, Loc));
- continue;
- }
-
- BlockStoreLocation ChunkLocation = Loc.GetBlockLocation(m_CacheBucket.m_Configuration.PayloadAlignment);
- size_t ChunkIndex = InlineLocations.size();
- InlineLocations.push_back(ChunkLocation);
- InlineKeys.push_back(Key);
- if (auto It = BlockIndexToChunkIndexes.find(ChunkLocation.BlockIndex); It != BlockIndexToChunkIndexes.end())
- {
- InlineBlockChunkIndexes[It->second].push_back(ChunkIndex);
- }
- else
- {
- BlockIndexToChunkIndexes.insert_or_assign(ChunkLocation.BlockIndex, InlineBlockChunkIndexes.size());
- InlineBlockChunkIndexes.emplace_back(std::vector<size_t>{ChunkIndex});
- }
- }
- }
-
- for (std::vector<std::size_t> ChunkIndexes : InlineBlockChunkIndexes)
- {
- ZEN_ASSERT(!ChunkIndexes.empty());
-
- bool Continue = m_CacheBucket.m_BlockStore.IterateBlock(
- InlineLocations,
- ChunkIndexes,
- [&](size_t ChunkIndex, const void* Data, uint64_t Size) {
- ZEN_UNUSED(ChunkIndex, Size);
- GetAttachments(Data);
- return !Ctx.IsCancelledFlag.load();
- },
- [&](size_t ChunkIndex, BlockStoreFile& File, uint64_t Offset, uint64_t Size) {
- ZEN_UNUSED(ChunkIndex);
- GetAttachments(File.GetChunk(Offset, Size).GetData());
- return !Ctx.IsCancelledFlag.load();
- });
-
- if (!Continue && Ctx.IsCancelledFlag.load())
- {
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
- return;
- }
- }
- }
- for (const auto& It : StandaloneKeys)
- {
- if (Ctx.IsCancelledFlag.load())
- {
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
- return;
- }
+ RwLock::SharedLockScope IndexLock(m_CacheBucket.m_IndexLock);
+ bool Continue = m_CacheBucket.GetReferencesLocked(Ctx, m_References);
+ IndexLock.ReleaseNow();
- IoBuffer Buffer = m_CacheBucket.GetStandaloneCacheValue(It.second, It.first);
- if (Buffer)
- {
- GetAttachments(Buffer.GetData());
- }
- }
+ if (!Continue)
+ {
+ m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
}
}
- virtual void LockState(GcCtx& Ctx) override
+ virtual void UpdateLockedState(GcCtx& Ctx) override
{
- ZEN_TRACE_CPU("Z$::Bucket::LockState");
+ ZEN_TRACE_CPU("Z$::Bucket::UpdateLockedState");
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3536,32 +3541,28 @@ public:
}
ZEN_INFO("GCV2: cachebucket [LOCKSTATE] '{}': found {} references in {}",
m_CacheBucket.m_BucketDir,
- m_PrecachedReferences.size() + m_UncachedReferences.size(),
+ m_References.size(),
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- m_IndexLock = std::make_unique<RwLock::SharedLockScope>(m_CacheBucket.m_IndexLock);
if (Ctx.IsCancelledFlag.load())
{
- m_UncachedReferences = {};
- m_IndexLock.reset();
- m_CacheBucket.m_IndexLock.WithExclusiveLock([&]() { m_CacheBucket.m_TrackedReferences.reset(); });
+ m_References = {};
+ m_CacheBucket.m_TrackedReferences.reset();
return;
}
ZEN_ASSERT(m_CacheBucket.m_TrackedReferences);
HashSet& AddedReferences(*m_CacheBucket.m_TrackedReferences);
- m_UncachedReferences.reserve(AddedReferences.size());
- m_UncachedReferences.insert(m_UncachedReferences.end(), AddedReferences.begin(), AddedReferences.end());
+ m_References.reserve(m_References.size() + AddedReferences.size());
+ m_References.insert(m_References.end(), AddedReferences.begin(), AddedReferences.end());
AddedReferences = {};
}
virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override
{
ZEN_TRACE_CPU("Z$::Bucket::RemoveUsedReferencesFromSet");
-
- ZEN_ASSERT(m_IndexLock);
size_t InitialCount = IoCids.size();
Stopwatch Timer;
const auto _ = MakeGuard([&] {
@@ -3576,18 +3577,7 @@ public:
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
- for (const IoHash& ReferenceHash : m_PrecachedReferences)
- {
- if (IoCids.erase(ReferenceHash) == 1)
- {
- if (IoCids.empty())
- {
- return;
- }
- }
- }
-
- for (const IoHash& ReferenceHash : m_UncachedReferences)
+ for (const IoHash& ReferenceHash : m_References)
{
if (IoCids.erase(ReferenceHash) == 1)
{
@@ -3598,10 +3588,8 @@ public:
}
}
}
- CacheBucket& m_CacheBucket;
- std::unique_ptr<RwLock::SharedLockScope> m_IndexLock;
- std::vector<IoHash> m_PrecachedReferences;
- std::vector<IoHash> m_UncachedReferences;
+ CacheBucket& m_CacheBucket;
+ std::vector<IoHash> m_References;
};
std::vector<GcReferenceChecker*>
@@ -3673,6 +3661,12 @@ ZenCacheDiskLayer::CacheBucket::CompactState(RwLock::ExclusiveLockScope&,
Reset(m_FreeMemCachedPayloads);
}
+RwLock::SharedLockScope
+ZenCacheDiskLayer::CacheBucket::GetGcReferencerLock()
+{
+ return RwLock::SharedLockScope(m_IndexLock);
+}
+
#if ZEN_WITH_TESTS
void
ZenCacheDiskLayer::CacheBucket::SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time)
@@ -3763,7 +3757,12 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
CacheBucket* Result = Bucket.get();
m_Buckets.emplace(BucketName, std::move(Bucket));
-
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_CapturedBuckets)
+ {
+ m_CapturedBuckets->push_back(std::string(BucketName));
+ }
+ });
return Result;
}
@@ -4317,6 +4316,60 @@ ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const st
return Details;
}
+std::vector<RwLock::SharedLockScope>
+ZenCacheDiskLayer::GetGcReferencerLocks()
+{
+ std::vector<RwLock::SharedLockScope> Locks;
+ Locks.emplace_back(RwLock::SharedLockScope(m_Lock));
+ for (auto& Kv : m_Buckets)
+ {
+ Locks.emplace_back(Kv.second->GetGcReferencerLock());
+ }
+ return Locks;
+}
+
+void
+ZenCacheDiskLayer::EnableUpdateCapture()
+{
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ ZEN_ASSERT(!m_CapturedBuckets);
+ m_CapturedBuckets = std::make_unique<std::vector<std::string>>();
+ }
+ else
+ {
+ ZEN_ASSERT(m_CapturedBuckets);
+ }
+ m_UpdateCaptureRefCounter++;
+ });
+}
+
+void
+ZenCacheDiskLayer::DisableUpdateCapture()
+{
+ m_UpdateCaptureLock.WithExclusiveLock([&]() {
+ ZEN_ASSERT(m_CapturedBuckets);
+ ZEN_ASSERT(m_UpdateCaptureRefCounter > 0);
+ m_UpdateCaptureRefCounter--;
+ if (m_UpdateCaptureRefCounter == 0)
+ {
+ m_CapturedBuckets.reset();
+ }
+ });
+}
+
+std::vector<std::string>
+ZenCacheDiskLayer::GetCapturedBuckets()
+{
+ RwLock::SharedLockScope _(m_UpdateCaptureLock);
+ if (m_CapturedBuckets)
+ {
+ return *m_CapturedBuckets;
+ }
+ return {};
+}
+
void
ZenCacheDiskLayer::MemCacheTrim()
{