diff options
| author | Dan Engelbrecht <[email protected]> | 2025-08-20 12:33:03 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-08-20 12:33:03 +0200 |
| commit | 4c05d1041461b630cd5770dae5e8d03147d5674b (patch) | |
| tree | 3f5d6b1b4b2b3f167f94e98f902a5f60c2e3d753 /src/zenstore/include | |
| parent | zen print fixes/improvements (#469) (diff) | |
| download | zen-4c05d1041461b630cd5770dae5e8d03147d5674b.tar.xz zen-4c05d1041461b630cd5770dae5e8d03147d5674b.zip | |
per namespace/project cas prep refactor (#470)
- Refactor so we can have more than one cas store for project store and cache.
- Refactor `UpstreamCacheClient` so it is not tied to a specific CidStore
- Refactor scrub to keep the GC interface ScrubStorage function separate from scrub accessor functions (renamed to Scrub).
- Refactor storage size to keep GC interface StorageSize function separate from size accessor functions (renamed to TotalSize)
- Refactor cache storage so `ZenCacheDiskLayer::CacheBucket` implements GcStorage interface rather than `ZenCacheNamespace`
Diffstat (limited to 'src/zenstore/include')
5 files changed, 64 insertions, 42 deletions
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index ae5b2014d..49c52f847 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -106,6 +106,12 @@ static_assert(sizeof(DiskIndexEntry) == 32); ////////////////////////////////////////////////////////////////////////// +struct CacheStoreSize +{ + uint64_t DiskSize = 0; + uint64_t MemorySize = 0; +}; + class ZenCacheDiskLayer { public: @@ -131,8 +137,8 @@ public: struct BucketInfo { - uint64_t EntryCount = 0; - GcStorageSize StorageSize; + uint64_t EntryCount = 0; + CacheStoreSize StorageSize; }; struct Info @@ -141,7 +147,7 @@ public: Configuration Config; std::vector<std::string> BucketNames; uint64_t EntryCount = 0; - GcStorageSize StorageSize; + CacheStoreSize StorageSize; }; struct BucketStats @@ -199,11 +205,10 @@ public: std::function<void()> Drop(); std::function<void()> DropBucket(std::string_view Bucket); void Flush(); - void ScrubStorage(ScrubContext& Ctx); - void DiscoverBuckets(); - GcStorageSize StorageSize() const; - DiskStats Stats() const; + void DiscoverBuckets(); + CacheStoreSize TotalSize() const; + DiskStats Stats() const; Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; @@ -220,6 +225,7 @@ public: #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); + void Scrub(ScrubContext& Ctx); #endif // ZEN_WITH_TESTS bool GetContentStats(std::string_view BucketName, CacheContentStats& OutContentStats) const; @@ -227,7 +233,7 @@ public: /** A cache bucket manages a single directory containing metadata and data for that bucket */ - struct CacheBucket : public GcReferencer + struct CacheBucket : public GcReferencer, public GcStorage { CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, @@ -244,17 +250,22 @@ public: void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult); - void EndPutBatch(PutBatchHandle* Batch) noexcept; - PutResult Put(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - bool Overwrite, - PutBatchHandle* OptionalBatchHandle); - uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); - std::function<void()> Drop(); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); + PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult); + void EndPutBatch(PutBatchHandle* Batch) noexcept; + PutResult Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + std::function<void()> Drop(); + void Flush(); + inline CacheStoreSize TotalSize() const + { + return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), + .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; + } + RwLock::SharedLockScope GetGcReferencerLock(); struct ReferencesStats @@ -277,11 +288,6 @@ public: std::span<const std::size_t> ChunkIndexes, std::vector<IoHash>& OutReferences) const; - inline GcStorageSize StorageSize() const - { - return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), - .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; - } uint64_t EntryCount() const; BucketStats Stats(); @@ -293,6 +299,20 @@ public: void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS + // GcReferencer + virtual std::string GetGcName(GcCtx& Ctx) override; + virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; + + // GcStorage + virtual void ScrubStorage(ScrubContext& Ctx) override; + virtual GcStorageSize StorageSize() const override + { + CacheStoreSize Size = TotalSize(); + return {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; + } + private: #pragma pack(push) #pragma pack(1) @@ -391,11 +411,6 @@ public: std::atomic_uint64_t m_StandaloneSize{}; std::atomic_uint64_t m_MemCachedSize{}; - virtual std::string GetGcName(GcCtx& Ctx) override; - virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; - virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; - virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; bool ShouldRejectPut(const IoHash& HashKey, ZenCacheValue& InOutValue, bool Overwrite, ZenCacheDiskLayer::PutResult& OutPutResult); void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index 104746aba..80340d72c 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -70,11 +70,13 @@ IsCompressedBinary(ZenContentType Type) struct CacheRpcHandler { + typedef std::function<CidStore&(std::string_view Context)> GetCidStoreFunc; + CacheRpcHandler(LoggerRef InLog, CacheStats& InCacheStats, UpstreamCacheClient& InUpstreamCache, ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& InGetCidStore, const DiskWriteBlocker* InDiskWriteBlocker); ~CacheRpcHandler(); @@ -94,6 +96,8 @@ struct CacheRpcHandler int& OutTargetProcessId, CbPackage& OutPackage); + CidStore& GetCidStore(std::string_view Namespace); + private: CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest); CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest); @@ -142,7 +146,7 @@ private: CacheStats& m_CacheStats; UpstreamCacheClient& m_UpstreamCache; ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; + GetCidStoreFunc m_GetCidStore; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; bool AreDiskWritesAllowed() const; diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index b6e8e7565..c51d7312c 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -49,7 +49,7 @@ class JobQueue; */ -class ZenCacheNamespace final : public GcStorage +class ZenCacheNamespace final { public: struct Configuration @@ -107,10 +107,7 @@ public: std::function<void()> Drop(); void Flush(); - // GcStorage - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; - virtual GcStorageSize StorageSize() const override; - + CacheStoreSize TotalSize() const; Configuration GetConfig() const { return m_Configuration; } Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; @@ -127,6 +124,7 @@ public: #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); + void Scrub(ScrubContext& ScrubCtx); #endif // ZEN_WITH_TESTS private: @@ -140,7 +138,6 @@ private: std::atomic<uint64_t> m_WriteCount{}; metrics::RequestStats m_PutOps; metrics::RequestStats m_GetOps; - uint64_t m_LastScrubTime = 0; ZenCacheNamespace(const ZenCacheNamespace&) = delete; ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete; @@ -178,7 +175,7 @@ public: Configuration Config; std::vector<std::string> NamespaceNames; uint64_t DiskEntryCount = 0; - GcStorageSize StorageSize; + CacheStoreSize StorageSize; }; struct NamedNamespaceStats @@ -260,13 +257,12 @@ public: bool DropBucket(std::string_view Namespace, std::string_view Bucket); bool DropNamespace(std::string_view Namespace); void Flush(); - void ScrubStorage(ScrubContext& Ctx); CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter, const std::string_view BucketFilter, const std::string_view ValueFilter) const; - GcStorageSize StorageSize() const; + CacheStoreSize TotalSize() const; CacheStoreStats Stats(bool IncludeNamespaceStats = true); Configuration GetConfiguration() const { return m_Configuration; } @@ -296,6 +292,10 @@ public: bool GetContentStats(std::string_view Namespace, std::string_view BucketName, CacheContentStats& OutContentStats) const; +#if ZEN_WITH_TESTS + void Scrub(ScrubContext& Ctx); +#endif // ZEN_WITH_TESTS + private: const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const; ZenCacheNamespace* GetNamespace(std::string_view Namespace); diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h index 152031c3a..c3993c028 100644 --- a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h +++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h @@ -113,7 +113,7 @@ public: std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheChunksGetComplete&& OnComplete) = 0; - virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) = 0; }; } // namespace zen diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index b3d00fec0..8918b119f 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -87,12 +87,15 @@ public: bool ContainsChunk(const IoHash& DecompressedId); void FilterChunks(HashKeySet& InOutChunks); void Flush(); - void ScrubStorage(ScrubContext& Ctx); CidStoreSize TotalSize() const; CidStoreStats Stats() const; virtual void ReportMetrics(StatsMetrics& Statsd) override; +#if ZEN_WITH_TESTS + void Scrub(ScrubContext& Ctx); +#endif // ZEN_WITH_TESTS + private: struct Impl; std::unique_ptr<CasStore> m_CasStore; |