diff options
Diffstat (limited to 'src/zenstore/cache/structuredcachestore.cpp')
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 428 |
1 files changed, 112 insertions, 316 deletions
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index c98561ebb..c14ea73a8 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -42,8 +42,20 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <unordered_map> #endif +#include <zencore/memory/llm.h> + +////////////////////////////////////////////////////////////////////////// + namespace zen { +const FLLMTag& +GetCacheStoreTag() +{ + static FLLMTag _("store", FLLMTag("cache")); + + return _; +} + bool IsKnownBadBucketName(std::string_view Bucket) { @@ -122,19 +134,18 @@ ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const st , m_Configuration(Config) , m_DiskLayer(m_Gc, m_JobQueue, m_RootDir, m_Configuration.DiskLayerConfig) { + ZEN_MEMSCOPE(GetCacheStoreTag()); ZEN_INFO("initializing structured cache at '{}'", m_RootDir); CreateDirectories(m_RootDir); m_DiskLayer.DiscoverBuckets(); - m_Gc.AddGcContributor(this); m_Gc.AddGcStorage(this); } ZenCacheNamespace::~ZenCacheNamespace() { m_Gc.RemoveGcStorage(this); - m_Gc.RemoveGcContributor(this); } struct ZenCacheNamespace::PutBatchHandle @@ -307,26 +318,6 @@ ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx) m_DiskLayer.ScrubStorage(Ctx); } -void -ZenCacheNamespace::GatherReferences(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::ZenCacheNamespace::GatherReferences"); - - Stopwatch Timer; - const auto Guard = - MakeGuard([&] { ZEN_DEBUG("cache gathered all references from '{}' in {}", m_RootDir, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - - m_DiskLayer.GatherReferences(GcCtx); -} - -void -ZenCacheNamespace::CollectGarbage(GcContext& GcCtx) -{ - ZEN_TRACE_CPU("Z$::Namespace::CollectGarbage"); - - m_DiskLayer.CollectGarbage(GcCtx); -} - GcStorageSize ZenCacheNamespace::StorageSize() const { @@ -386,12 +377,19 @@ ZenCacheNamespace::EnableUpdateCapture() { m_DiskLayer.EnableUpdateCapture(); } + void ZenCacheNamespace::DisableUpdateCapture() { m_DiskLayer.DisableUpdateCapture(); } +bool +ZenCacheNamespace::GetContentStats(std::string_view BucketName, CacheContentStats& OutContentStats) const +{ + return m_DiskLayer.GetContentStats(BucketName, OutContentStats); +} + #if ZEN_WITH_TESTS void ZenCacheNamespace::SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time) @@ -418,6 +416,8 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, , m_Configuration(Configuration) , m_ExitLogging(false) { + ZEN_MEMSCOPE(GetCacheStoreTag()); + SetLoggingConfig(m_Configuration.Logging); CreateDirectories(m_BasePath); @@ -474,6 +474,8 @@ ZenCacheStore::LogWorker() { SetCurrentThreadName("ZenCacheStore::LogWorker"); + ZEN_MEMSCOPE(GetCacheStoreTag()); + LoggerRef ZCacheLog(logging::Get("z$")); auto Log = [&ZCacheLog]() -> LoggerRef { return ZCacheLog; }; @@ -554,6 +556,7 @@ ZenCacheStore::LogWorker() ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<bool>& OutResult) : m_CacheStore(CacheStore) { + ZEN_MEMSCOPE(GetCacheStoreTag()); if (m_Store = m_CacheStore.GetNamespace(InNamespace); m_Store) { m_NamespaceBatchHandle = m_Store->BeginPutBatch(OutResult); @@ -566,6 +569,7 @@ ZenCacheStore::PutBatch::~PutBatch() { if (m_Store) { + ZEN_MEMSCOPE(GetCacheStoreTag()); ZEN_ASSERT(m_NamespaceBatchHandle); m_Store->EndPutBatch(m_NamespaceBatchHandle); } @@ -580,6 +584,7 @@ ZenCacheStore::GetBatch::GetBatch(ZenCacheStore& CacheStore, std::string_view In : m_CacheStore(CacheStore) , Results(OutResult) { + ZEN_MEMSCOPE(GetCacheStoreTag()); if (m_Store = m_CacheStore.GetNamespace(InNamespace); m_Store) { m_NamespaceBatchHandle = m_Store->BeginGetBatch(OutResult); @@ -592,6 +597,7 @@ ZenCacheStore::GetBatch::~GetBatch() { if (m_Store) { + ZEN_MEMSCOPE(GetCacheStoreTag()); ZEN_ASSERT(m_NamespaceBatchHandle); m_Store->EndGetBatch(m_NamespaceBatchHandle); @@ -628,6 +634,7 @@ ZenCacheStore::Get(const CacheRequestContext& Context, return false; } + ZEN_MEMSCOPE(GetCacheStoreTag()); ZEN_TRACE_CPU("Z$::Get"); metrics::RequestStats::Scope OpScope(m_GetOps, 0); @@ -688,6 +695,8 @@ ZenCacheStore::Get(const CacheRequestContext& Context, m_RejectedReadCount++; return; } + + ZEN_MEMSCOPE(GetCacheStoreTag()); ZEN_TRACE_CPU("Z$::Get"); metrics::RequestStats::Scope OpScope(m_GetOps, 0); @@ -724,6 +733,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, return; } + ZEN_MEMSCOPE(GetCacheStoreTag()); ZEN_TRACE_CPU("Z$::Put"); metrics::RequestStats::Scope $(m_PutOps, Value.Value.GetSize()); @@ -791,6 +801,8 @@ ZenCacheStore::DropNamespace(std::string_view InNamespace) void ZenCacheStore::Flush() { + ZEN_MEMSCOPE(GetCacheStoreTag()); + ZEN_INFO("flushing cache store at '{}'", m_BasePath); IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); }); } @@ -866,12 +878,10 @@ ZenCacheStore::GetNamespace(std::string_view Namespace) m_BasePath / fmt::format("{}{}", NamespaceDiskPrefix, Namespace), m_Configuration.NamespaceConfig)); - m_UpdateCaptureLock.WithExclusiveLock([&]() { - if (m_CapturedNamespaces) - { - m_CapturedNamespaces->push_back(std::string(Namespace)); - } - }); + if (m_CapturedNamespaces) + { + m_CapturedNamespaces->push_back(std::string(Namespace)); + } return NewNamespace.first->second.get(); } @@ -1055,7 +1065,8 @@ ZenCacheStore::LockState(GcCtx& Ctx) void ZenCacheStore::EnableUpdateCapture() { - m_UpdateCaptureLock.WithExclusiveLock([&]() { + std::vector<ZenCacheNamespace*> Namespaces; + m_NamespacesLock.WithExclusiveLock([&]() { if (m_UpdateCaptureRefCounter == 0) { ZEN_ASSERT(!m_CapturedNamespaces); @@ -1066,21 +1077,24 @@ ZenCacheStore::EnableUpdateCapture() ZEN_ASSERT(m_CapturedNamespaces); } m_UpdateCaptureRefCounter++; + Namespaces.reserve(m_Namespaces.size()); + for (auto& NamespaceIt : m_Namespaces) + { + Namespaces.push_back(NamespaceIt.second.get()); + } }); - for (auto& NamespaceIt : m_Namespaces) + + for (ZenCacheNamespace* Namespace : Namespaces) { - NamespaceIt.second->EnableUpdateCapture(); + Namespace->EnableUpdateCapture(); } } void ZenCacheStore::DisableUpdateCapture() { - for (auto& NamespaceIt : m_Namespaces) - { - NamespaceIt.second->DisableUpdateCapture(); - } - m_UpdateCaptureLock.WithExclusiveLock([&]() { + std::vector<ZenCacheNamespace*> Namespaces; + m_NamespacesLock.WithExclusiveLock([&]() { ZEN_ASSERT(m_CapturedNamespaces); ZEN_ASSERT(m_UpdateCaptureRefCounter > 0); m_UpdateCaptureRefCounter--; @@ -1088,13 +1102,21 @@ ZenCacheStore::DisableUpdateCapture() { m_CapturedNamespaces.reset(); } + Namespaces.reserve(m_Namespaces.size()); + for (auto& NamespaceIt : m_Namespaces) + { + Namespaces.push_back(NamespaceIt.second.get()); + } }); + for (ZenCacheNamespace* Namespace : Namespaces) + { + Namespace->DisableUpdateCapture(); + } } std::vector<std::string> -ZenCacheStore::GetCapturedNamespaces() +ZenCacheStore::GetCapturedNamespacesLocked() { - RwLock::SharedLockScope _(m_UpdateCaptureLock); if (m_CapturedNamespaces) { return *m_CapturedNamespaces; @@ -1102,6 +1124,16 @@ ZenCacheStore::GetCapturedNamespaces() return {}; } +bool +ZenCacheStore::GetContentStats(std::string_view NamespaceName, std::string_view BucketName, CacheContentStats& OutContentStats) const +{ + if (const ZenCacheNamespace* Namespace = FindNamespace(NamespaceName); Namespace) + { + return Namespace->GetContentStats(BucketName, OutContentStats); + } + return false; +} + std::string ZenCacheStore::GetGcName(GcCtx&) { @@ -1155,7 +1187,7 @@ public: AddedBuckets.size()); }); - std::vector<std::string> AddedNamespaces = m_CacheStore.GetCapturedNamespaces(); + std::vector<std::string> AddedNamespaces = m_CacheStore.GetCapturedNamespacesLocked(); for (const std::string& AddedNamespace : AddedNamespaces) { @@ -1171,7 +1203,7 @@ public: for (auto& NamepaceKV : m_CacheStore.m_Namespaces) { ZenCacheNamespace& Namespace = *NamepaceKV.second; - std::vector<std::string> NamespaceAddedBuckets = Namespace.m_DiskLayer.GetCapturedBuckets(); + std::vector<std::string> NamespaceAddedBuckets = Namespace.m_DiskLayer.GetCapturedBucketsLocked(); for (const std::string& AddedBucketName : NamespaceAddedBuckets) { if (auto It = Namespace.m_DiskLayer.m_Buckets.find(AddedBucketName); It != Namespace.m_DiskLayer.m_Buckets.end()) @@ -1183,21 +1215,30 @@ public: for (ZenCacheDiskLayer::CacheBucket* Bucket : AddedBuckets) { - bool Continue = Bucket->GetReferencesLocked(Ctx, m_References); + bool Continue = Bucket->GetReferences(Ctx.Logger, + Ctx.IsCancelledFlag, + /*StateIsAlreadyLocked*/ true, + Ctx.Settings.StoreCacheAttachmentMetaData, + Ctx.Settings.StoreCacheAttachmentMetaData, + m_References, + nullptr); if (!Continue) { break; } } + FilterReferences(Ctx, fmt::format("cachestore [LOCKSTATE] '{}'", "cachestore"), m_References); } - virtual void RemoveUsedReferencesFromSet(GcCtx& Ctx, HashSet& IoCids) override + virtual std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override { - ZEN_TRACE_CPU("Z$::RemoveUsedReferencesFromSet"); + ZEN_TRACE_CPU("Z$::GetUnusedReferences"); auto Log = [&Ctx]() { return Ctx.Logger; }; - size_t InitialCount = IoCids.size(); + const size_t InitialCount = IoCids.size(); + size_t UsedCount = InitialCount; + Stopwatch Timer; const auto _ = MakeGuard([&] { if (!Ctx.Settings.Verbose) @@ -1206,21 +1247,14 @@ public: } ZEN_INFO("GCV2: projectstore [FILTER REFERENCES] '{}': filtered out {} used references out of {} in {}", "projectstore", - InitialCount - IoCids.size(), + UsedCount, InitialCount, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - for (const IoHash& ReferenceHash : m_References) - { - if (IoCids.erase(ReferenceHash) == 1) - { - if (IoCids.empty()) - { - return; - } - } - } + std::span<IoHash> UnusedReferences = KeepUnusedReferences(m_References, IoCids); + UsedCount = IoCids.size() - UnusedReferences.size(); + return UnusedReferences; } private: @@ -1248,6 +1282,12 @@ ZenCacheStore::CreateReferenceCheckers(GcCtx& Ctx) return Checkers; } +std::vector<GcReferenceValidator*> +ZenCacheStore::CreateReferenceValidators(GcCtx& /*Ctx*/) +{ + return {}; +} + ////////////////////////////////////////////////////////////////////////// #if ZEN_WITH_TESTS @@ -1456,186 +1496,7 @@ TEST_CASE("cachestore.size") } } -TEST_CASE("cachestore.gc") -{ - using namespace testutils; - - auto JobQueue = MakeJobQueue(1, "testqueue"); - - SUBCASE("gather references does NOT add references for expired cache entries") - { - ScopedTemporaryDirectory TempDir; - std::vector<IoHash> Cids{CreateKey(1), CreateKey(2), CreateKey(3)}; - - const auto CollectAndFilter = [](GcManager& Gc, - GcClock::TimePoint Time, - GcClock::Duration MaxDuration, - std::span<const IoHash> Cids, - std::vector<IoHash>& OutKeep) { - GcContext GcCtx(Time - MaxDuration, Time - MaxDuration); - Gc.CollectGarbage(GcCtx); - OutKeep.clear(); - GcCtx.FilterCids(Cids, [&OutKeep](const IoHash& Hash) { OutKeep.push_back(Hash); }); - }; - - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - const auto Bucket = "teardrinker"sv; - - // Create a cache record - const IoHash Key = CreateKey(42); - CbObjectWriter Record; - Record << "Key"sv - << "SomeRecord"sv; - - for (size_t Idx = 0; auto& Cid : Cids) - { - Record.AddBinaryAttachment(fmt::format("attachment-{}", Idx++), Cid); - } - - IoBuffer Buffer = Record.Save().GetBuffer().AsIoBuffer(); - Buffer.SetContentType(ZenContentType::kCbObject); - - Zcs.Put(Bucket, Key, {.Value = Buffer}, Cids); - - std::vector<IoHash> Keep; - - // Collect garbage with 1 hour max cache duration - { - CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(Cids.size(), Keep.size()); - } - - // Move forward in time - { - CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(0, Keep.size()); - } - } - - // Expect timestamps to be serialized - { - GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - std::vector<IoHash> Keep; - - // Collect garbage with 1 hour max cache duration - { - CollectAndFilter(Gc, GcClock::Now(), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(3, Keep.size()); - } - - // Move forward in time - { - CollectAndFilter(Gc, GcClock::Now() + std::chrono::hours(2), std::chrono::hours(1), Cids, Keep); - CHECK_EQ(0, Keep.size()); - } - } - } - - SUBCASE("gc removes standalone values") - { - ScopedTemporaryDirectory TempDir; - GcManager Gc; - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - const auto Bucket = "fortysixandtwo"sv; - const GcClock::TimePoint CurrentTime = GcClock::Now(); - - std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; - - for (const auto& Key : Keys) - { - IoBuffer Value = CreateRandomBlob(128 << 10); - Zcs.Put(Bucket, Key, {.Value = Value}, {}); - } - - { - GcContext GcCtx(CurrentTime - std::chrono::hours(46), CurrentTime - std::chrono::hours(46)); - - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(Exists); - } - } - - // Move forward in time and collect again - { - GcContext GcCtx(CurrentTime + std::chrono::minutes(2), CurrentTime + std::chrono::minutes(2)); - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(!Exists); - } - - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - } - } - - SUBCASE("gc removes small objects") - { - ScopedTemporaryDirectory TempDir; - GcManager Gc; - { - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - const auto Bucket = "rightintwo"sv; - - std::vector<IoHash> Keys{CreateKey(1), CreateKey(2), CreateKey(3)}; - - for (const auto& Key : Keys) - { - IoBuffer Value = CreateRandomBlob(128); - Zcs.Put(Bucket, Key, {.Value = Value}, {}); - } - - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(2), GcClock::Now() - std::chrono::hours(2)); - GcCtx.CollectSmallObjects(true); - - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(Exists); - } - } - - // Move forward in time and collect again - { - GcContext GcCtx(GcClock::Now() + std::chrono::minutes(2), GcClock::Now() + std::chrono::minutes(2)); - GcCtx.CollectSmallObjects(true); - - Zcs.Flush(); - Gc.CollectGarbage(GcCtx); - - for (const auto& Key : Keys) - { - ZenCacheValue CacheValue; - const bool Exists = Zcs.Get(Bucket, Key, CacheValue); - CHECK(!Exists); - } - // GC could not remove the currently written block so size will not be zero - CHECK_NE(0, Zcs.StorageSize().DiskSize); - } - } - { - // Unreferenced blocks will be pruned so size should now be zero - ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - } - } -} - -TEST_CASE_TEMPLATE("cachestore.threadedinsert", GCV2, FalseType, TrueType) // * doctest::skip(true)) +TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) { // for (uint32_t i = 0; i < 100; ++i) { @@ -1703,39 +1564,24 @@ TEST_CASE_TEMPLATE("cachestore.threadedinsert", GCV2, FalseType, TrueType) // * } } - auto DoGC = [](GcManager& Gc, - ZenCacheNamespace& Zcs, - std::unordered_map<IoHash, std::string, IoHash::Hasher>& GcChunkHashes, - const std::vector<IoHash>& KeepHashes) { - if (GCV2::Enabled) + auto DoGC = [](GcManager& Gc, ZenCacheNamespace& Zcs, std::unordered_map<IoHash, std::string, IoHash::Hasher>& GcChunkHashes) { + GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), + .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), + .CollectSmallObjects = true, + .IsDeleteMode = true, + .CompactBlockUsageThresholdPercent = 100}; + Gc.CollectGarbage(Settings); + // Cheating as we don't get the list of deleted hashes back from this call + std::unordered_map<IoHash, std::string, IoHash::Hasher> RemainingChunkHashes; + for (const auto& It : GcChunkHashes) { - GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), - .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), - .CollectSmallObjects = true, - .IsDeleteMode = true, - .CompactBlockUsageThresholdPercent = 100}; - Gc.CollectGarbage(Settings); - // Cheating as we don't get the list of deleted hashes back from this call - std::unordered_map<IoHash, std::string, IoHash::Hasher> RemainingChunkHashes; - for (const auto& It : GcChunkHashes) + ZenCacheValue Tmp; + if (Zcs.Get(It.second, It.first, Tmp)) { - ZenCacheValue Tmp; - if (Zcs.Get(It.second, It.first, Tmp)) - { - RemainingChunkHashes.insert(It); - } + RemainingChunkHashes.insert(It); } - GcChunkHashes.swap(RemainingChunkHashes); - } - else - { - GcContext GcCtx(GcClock::Now() - std::chrono::hours(24), GcClock::Now() - std::chrono::hours(24)); - GcCtx.CollectSmallObjects(true); - GcCtx.AddRetainedCids(KeepHashes); - Zcs.CollectGarbage(GcCtx); - const HashKeySet& Deleted = GcCtx.DeletedCids(); - Deleted.IterateHashes([&GcChunkHashes](const IoHash& ChunkHash) { GcChunkHashes.erase(ChunkHash); }); } + GcChunkHashes.swap(RemainingChunkHashes); }; const uint64_t TotalSize = Zcs.StorageSize().DiskSize; @@ -1817,32 +1663,7 @@ TEST_CASE_TEMPLATE("cachestore.threadedinsert", GCV2, FalseType, TrueType) // * GcChunkHashes[Chunk.first] = Chunk.second.Bucket; } } - std::vector<IoHash> KeepHashes; - KeepHashes.reserve(GcChunkHashes.size()); - for (const auto& Entry : GcChunkHashes) - { - KeepHashes.push_back(Entry.first); - } - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) - { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - } - C++; - } - - DoGC(Gc, Zcs, GcChunkHashes, KeepHashes); + DoGC(Gc, Zcs, GcChunkHashes); } while (WorkCompleted < NewChunks.size() + Chunks.size()) @@ -1860,32 +1681,7 @@ TEST_CASE_TEMPLATE("cachestore.threadedinsert", GCV2, FalseType, TrueType) // * GcChunkHashes[Chunk.first] = Chunk.second.Bucket; } } - std::vector<IoHash> KeepHashes; - KeepHashes.reserve(GcChunkHashes.size()); - for (const auto& Entry : GcChunkHashes) - { - KeepHashes.push_back(Entry.first); - } - size_t C = 0; - while (C < KeepHashes.size()) - { - if (C % 155 == 0) - { - if (C < KeepHashes.size() - 1) - { - KeepHashes[C] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - if (C + 3 < KeepHashes.size() - 1) - { - KeepHashes[C + 3] = KeepHashes[KeepHashes.size() - 1]; - KeepHashes.pop_back(); - } - } - C++; - } - - DoGC(Gc, Zcs, GcChunkHashes, KeepHashes); + DoGC(Gc, Zcs, GcChunkHashes); } } { |