diff options
| author | Dan Engelbrecht <[email protected]> | 2025-08-20 12:33:03 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-08-20 12:33:03 +0200 |
| commit | 4c05d1041461b630cd5770dae5e8d03147d5674b (patch) | |
| tree | 3f5d6b1b4b2b3f167f94e98f902a5f60c2e3d753 /src/zenstore | |
| parent | zen print fixes/improvements (#469) (diff) | |
| download | zen-4c05d1041461b630cd5770dae5e8d03147d5674b.tar.xz zen-4c05d1041461b630cd5770dae5e8d03147d5674b.zip | |
per namespace/project cas prep refactor (#470)
- Refactor so we can have more than one cas store for project store and cache.
- Refactor `UpstreamCacheClient` so it is not tied to a specific CidStore
- Refactor scrub to keep the GC interface ScrubStorage function separate from scrub accessor functions (renamed to Scrub).
- Refactor storage size to keep GC interface StorageSize function separate from size accessor functions (renamed to TotalSize)
- Refactor cache storage so `ZenCacheDiskLayer::CacheBucket` implements GcStorage interface rather than `ZenCacheNamespace`
Diffstat (limited to 'src/zenstore')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 28 | ||||
| -rw-r--r-- | src/zenstore/cache/cacherpc.cpp | 54 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 68 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 18 | ||||
| -rw-r--r-- | src/zenstore/cas.h | 5 | ||||
| -rw-r--r-- | src/zenstore/cidstore.cpp | 20 | ||||
| -rw-r--r-- | src/zenstore/compactcas.h | 2 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cachedisklayer.h | 73 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/cacherpc.h | 8 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/structuredcachestore.h | 18 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cache/upstreamcacheclient.h | 2 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/cidstore.h | 5 |
12 files changed, 165 insertions, 136 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 219caca01..cacbbd966 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -798,6 +798,7 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, IoStoreDDCOverrideSize); } m_Gc.AddGcReferencer(*this); + m_Gc.AddGcStorage(this); } ZenCacheDiskLayer::CacheBucket::~CacheBucket() @@ -812,6 +813,7 @@ ZenCacheDiskLayer::CacheBucket::~CacheBucket() { ZEN_ERROR("~CacheBucket() failed with: ", Ex.what()); } + m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcReferencer(*this); } @@ -2587,7 +2589,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { - GcStorageSize Size = StorageSize(); + CacheStoreSize Size = TotalSize(); return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize, .DiskHitCount = m_DiskHitCount, @@ -4417,8 +4419,9 @@ ZenCacheDiskLayer::Flush() } } +#if ZEN_WITH_TESTS void -ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) +ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::ScrubStorage"); @@ -4429,13 +4432,13 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) for (auto& Kv : m_Buckets) { -#if 1 +# if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); -#else +# else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); -#endif +# endif } for (auto& Result : Results) @@ -4451,16 +4454,17 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) } } } +#endif // ZEN_WITH_TESTS -GcStorageSize -ZenCacheDiskLayer::StorageSize() const +CacheStoreSize +ZenCacheDiskLayer::TotalSize() const { - GcStorageSize StorageSize{}; + CacheStoreSize StorageSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { - GcStorageSize BucketSize = Kv.second->StorageSize(); + CacheStoreSize BucketSize = Kv.second->TotalSize(); StorageSize.DiskSize += BucketSize.DiskSize; StorageSize.MemorySize += BucketSize.MemorySize; } @@ -4471,7 +4475,7 @@ ZenCacheDiskLayer::StorageSize() const ZenCacheDiskLayer::DiskStats ZenCacheDiskLayer::Stats() const { - GcStorageSize Size = StorageSize(); + CacheStoreSize Size = TotalSize(); ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; { RwLock::SharedLockScope _(m_Lock); @@ -4495,7 +4499,7 @@ ZenCacheDiskLayer::GetInfo() const { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); - GcStorageSize BucketSize = Kv.second->StorageSize(); + CacheStoreSize BucketSize = Kv.second->TotalSize(); Info.StorageSize.DiskSize += BucketSize.DiskSize; Info.StorageSize.MemorySize += BucketSize.MemorySize; } @@ -4510,7 +4514,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { - return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; + return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->TotalSize()}; } return {}; } diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 5d9a68919..ff21d1ede 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -153,13 +153,13 @@ CacheRpcHandler::CacheRpcHandler(LoggerRef InLog, CacheStats& InCacheStats, UpstreamCacheClient& InUpstreamCache, ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& InGetCidStore, const DiskWriteBlocker* InDiskWriteBlocker) : m_Log(InLog) , m_CacheStats(InCacheStats) , m_UpstreamCache(InUpstreamCache) , m_CacheStore(InCacheStore) -, m_CidStore(InCidStore) +, m_GetCidStore(std::move(InGetCidStore)) , m_DiskWriteBlocker(InDiskWriteBlocker) { } @@ -174,6 +174,12 @@ CacheRpcHandler::AreDiskWritesAllowed() const return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } +CidStore& +CacheRpcHandler::GetCidStore(std::string_view Namespace) +{ + return m_GetCidStore(Namespace); +} + CacheRpcHandler::RpcResponseCode CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, std::string_view UriNamespace, @@ -381,9 +387,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Stopwatch Timer; + CidStore& ChunkStore = m_GetCidStore(Request.Namespace); + Request.RecordObject.IterateAttachments([this, &Request, Package, + &ChunkStore, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, @@ -412,7 +421,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Count.Invalid++; } } - else if (m_CidStore.ContainsChunk(ValueHash)) + else if (ChunkStore.ContainsChunk(ValueHash)) { ValidAttachments.emplace_back(ValueHash); Count.Valid++; @@ -448,7 +457,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (!WriteAttachmentBuffers.empty()) { - std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + std::vector<CidStore::InsertResult> InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (size_t Index = 0; Index < InsertResults.size(); Index++) { if (InsertResults[Index].New) @@ -475,10 +484,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Request.Namespace, - .Key = Request.Key, - .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, + .Namespace = Request.Namespace, + .Key = Request.Key, + .ValueContentIds = std::move(ValidAttachments)}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } return PutStatus::Success; } @@ -521,6 +532,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb return CbPackage{}; } + CidStore& ChunkStore = m_GetCidStore(Namespace.value()); + const bool HasUpstream = m_UpstreamCache.IsActive(); eastl::fixed_vector<RecordRequestData, 16> Requests; @@ -620,7 +633,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { - if (m_CidStore.ContainsChunk(Value.ContentId)) + if (ChunkStore.ContainsChunk(Value.ContentId)) { Value.Exists = true; } @@ -641,7 +654,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } else { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) + if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Value.ContentId)) { if (Chunk.GetSize() > 0) { @@ -665,7 +678,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } if (!RequestValueIndexes.empty()) { - m_CidStore.IterateChunks( + ChunkStore.IterateChunks( CidHashes, [this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { try @@ -758,7 +771,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } } - const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &ChunkStore, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -827,7 +840,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Value.Exists = true; if (StoreLocal) { - m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); + ChunkStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { @@ -944,6 +957,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + CidStore& ChunkStore = m_GetCidStore(Namespace.value()); + std::vector<ZenCacheStore::PutResult> BatchResults; eastl::fixed_vector<size_t, 32> BatchResultIndexes; eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results; @@ -1094,7 +1109,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty) { m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); + {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } } { @@ -1546,6 +1562,8 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); + CidStore& ChunkStore = m_GetCidStore(Namespace); + // TODO: BatchGet records? std::vector<CacheKeyRequest*> UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) @@ -1684,12 +1702,12 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) { - if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) + if (ChunkStore.ContainsChunk(Request->Key->ChunkId)) { Request->Exists = true; } } - else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) + else if (IoBuffer Payload = ChunkStore.FindChunkByCid(Request->Key->ChunkId)) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { @@ -1822,6 +1840,8 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, return; } + CidStore& ChunkStore = m_GetCidStore(Namespace); + CacheChunkRequest& Key = Params.Request; size_t RequestIndex = std::distance(RequestKeys.data(), &Key); ChunkRequest& Request = Requests[RequestIndex]; @@ -1841,7 +1861,7 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal); if (Request.IsRecordRequest) { - m_CidStore.AddChunk(Params.Value, Params.RawHash); + ChunkStore.AddChunk(Params.Value, Params.RawHash); } else { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 973af52b2..1f2d6c37f 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -139,13 +139,10 @@ ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const st CreateDirectories(m_RootDir); m_DiskLayer.DiscoverBuckets(); - - m_Gc.AddGcStorage(this); } ZenCacheNamespace::~ZenCacheNamespace() { - m_Gc.RemoveGcStorage(this); } struct ZenCacheNamespace::PutBatchHandle @@ -302,7 +299,6 @@ ZenCacheNamespace::EnumerateBucketContents(std::string_view std::function<void()> ZenCacheNamespace::Drop() { - m_Gc.RemoveGcStorage(this); return m_DiskLayer.Drop(); } @@ -312,25 +308,19 @@ ZenCacheNamespace::Flush() m_DiskLayer.Flush(); } +#if ZEN_WITH_TESTS void -ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx) +ZenCacheNamespace::Scrub(ScrubContext& Ctx) { - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - ZEN_INFO("scrubbing '{}'", m_RootDir); - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_DiskLayer.ScrubStorage(Ctx); + m_DiskLayer.Scrub(Ctx); } +#endif // ZEN_WITH_TESTS -GcStorageSize -ZenCacheNamespace::StorageSize() const +CacheStoreSize +ZenCacheNamespace::TotalSize() const { - return m_DiskLayer.StorageSize(); + return m_DiskLayer.TotalSize(); } ZenCacheNamespace::Info @@ -836,11 +826,13 @@ ZenCacheStore::Flush() IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); }); } +#if ZEN_WITH_TESTS void -ZenCacheStore::ScrubStorage(ScrubContext& Ctx) +ZenCacheStore::Scrub(ScrubContext& Ctx) { - IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.ScrubStorage(Ctx); }); + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); }); } +#endif // ZEN_WITH_TESTS CacheValueDetails ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter, @@ -965,12 +957,12 @@ ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Names } } -GcStorageSize -ZenCacheStore::StorageSize() const +CacheStoreSize +ZenCacheStore::TotalSize() const { - GcStorageSize Size; + CacheStoreSize Size; IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { - GcStorageSize StoreSize = Store.StorageSize(); + CacheStoreSize StoreSize = Store.TotalSize(); Size.MemorySize += StoreSize.MemorySize; Size.DiskSize += StoreSize.DiskSize; }); @@ -1040,7 +1032,7 @@ ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig) ZenCacheStore::Info ZenCacheStore::GetInfo() const { - ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()}; + ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = TotalSize()}; IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) { Info.NamespaceNames.push_back(std::string(NamespaceName)); @@ -1428,7 +1420,7 @@ TEST_CASE("cachestore.size") const size_t Count = 16; ScopedTemporaryDirectory TempDir; - GcStorageSize CacheSize; + CacheStoreSize CacheSize; { GcManager Gc; @@ -1449,7 +1441,7 @@ TEST_CASE("cachestore.size") Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false); Keys.push_back({BucketName, Hash}); } - CacheSize = Zcs.StorageSize(); + CacheSize = Zcs.TotalSize(); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_EQ(0, CacheSize.MemorySize); @@ -1459,7 +1451,7 @@ TEST_CASE("cachestore.size") Zcs.Get(Key.first, Key.second, _); } - CacheSize = Zcs.StorageSize(); + CacheSize = Zcs.TotalSize(); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize); } @@ -1468,7 +1460,7 @@ TEST_CASE("cachestore.size") GcManager Gc; ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - const GcStorageSize SerializedSize = Zcs.StorageSize(); + const CacheStoreSize SerializedSize = Zcs.TotalSize(); CHECK_EQ(SerializedSize.MemorySize, 0); CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); @@ -1476,8 +1468,8 @@ TEST_CASE("cachestore.size") { Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); } - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - CHECK_EQ(0, Zcs.StorageSize().MemorySize); + CHECK_EQ(0, Zcs.TotalSize().DiskSize); + CHECK_EQ(0, Zcs.TotalSize().MemorySize); } } @@ -1486,7 +1478,7 @@ TEST_CASE("cachestore.size") const size_t Count = 16; ScopedTemporaryDirectory TempDir; - GcStorageSize CacheSize; + CacheStoreSize CacheSize; { GcManager Gc; @@ -1503,7 +1495,7 @@ TEST_CASE("cachestore.size") Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false); } - CacheSize = Zcs.StorageSize(); + CacheSize = Zcs.TotalSize(); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_EQ(0, CacheSize.MemorySize); } @@ -1512,7 +1504,7 @@ TEST_CASE("cachestore.size") GcManager Gc; ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - const GcStorageSize SerializedSize = Zcs.StorageSize(); + const CacheStoreSize SerializedSize = Zcs.TotalSize(); CHECK_EQ(SerializedSize.MemorySize, 0); CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); @@ -1520,7 +1512,7 @@ TEST_CASE("cachestore.size") { Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); } - CHECK_EQ(0, Zcs.StorageSize().DiskSize); + CHECK_EQ(0, Zcs.TotalSize().DiskSize); } } } @@ -1613,7 +1605,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) GcChunkHashes.swap(RemainingChunkHashes); }; - const uint64_t TotalSize = Zcs.StorageSize().DiskSize; + const uint64_t TotalSize = Zcs.TotalSize().DiskSize; CHECK_LE(kChunkSize * Chunks.size(), TotalSize); { @@ -1971,8 +1963,6 @@ TEST_CASE("cachestore.blocked.disklayer.put") { ScopedTemporaryDirectory TempDir; - GcStorageSize CacheSize; - const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector<uint8_t> Buf; Buf.resize(Size, Size & 0xff); @@ -2108,8 +2098,8 @@ TEST_CASE("cachestore.scrub") WorkerThreadPool ThreadPool{1}; ScrubContext ScrubCtx{ThreadPool}; - Zcs.ScrubStorage(ScrubCtx); - CidStore.ScrubStorage(ScrubCtx); + Zcs.Scrub(ScrubCtx); + CidStore.Scrub(ScrubCtx); CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size()); CHECK(ScrubCtx.BadCids().GetSize() == 0); } diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 5879a6742..6b89beb3d 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -73,9 +73,12 @@ public: WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) override; virtual void Flush() override; - virtual void ScrubStorage(ScrubContext& Ctx) override; virtual CidStoreSize TotalSize() const override; +#if ZEN_WITH_TESTS + virtual void Scrub(ScrubContext& Ctx) override; +#endif // ZEN_WITH_TESTS + private: CasContainerStrategy m_TinyStrategy; CasContainerStrategy m_SmallStrategy; @@ -463,24 +466,19 @@ CasImpl::Flush() m_LargeStrategy.Flush(); } +#if ZEN_WITH_TESTS void -CasImpl::ScrubStorage(ScrubContext& Ctx) +CasImpl::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetCasTag()); ZEN_TRACE_CPU("Cas::ScrubStorage"); - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - - m_LastScrubTime = Ctx.ScrubTimestamp(); - m_SmallStrategy.ScrubStorage(Ctx); m_TinyStrategy.ScrubStorage(Ctx); m_LargeStrategy.ScrubStorage(Ctx); } +#endif // ZEN_WITH_TESTS CidStoreSize CasImpl::TotalSize() const @@ -523,7 +521,7 @@ TEST_CASE("CasStore") WorkerThreadPool ThreadPool{1}; ScrubContext Ctx{ThreadPool}; - Store->ScrubStorage(Ctx); + Store->Scrub(Ctx); IoBuffer Value1{16}; memcpy(Value1.MutableData(), "1234567890123456", 16); diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h index e279dd2cc..0f6e2ba9d 100644 --- a/src/zenstore/cas.h +++ b/src/zenstore/cas.h @@ -50,12 +50,13 @@ public: WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) = 0; virtual void Flush() = 0; - virtual void ScrubStorage(ScrubContext& Ctx) = 0; virtual CidStoreSize TotalSize() const = 0; +#if ZEN_WITH_TESTS + virtual void Scrub(ScrubContext& Ctx) = 0; +#endif // ZEN_WITH_TESTS protected: CidStoreConfiguration m_Config; - uint64_t m_LastScrubTime = 0; }; ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc); diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp index 2ab769d04..ae1b59dc0 100644 --- a/src/zenstore/cidstore.cpp +++ b/src/zenstore/cidstore.cpp @@ -127,17 +127,9 @@ struct CidStore::Impl void Flush() { m_CasStore.Flush(); } - void ScrubStorage(ScrubContext& Ctx) - { - if (Ctx.ScrubTimestamp() == m_LastScrubTime) - { - return; - } - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_CasStore.ScrubStorage(Ctx); - } +#if ZEN_WITH_TESTS + void Scrub(ScrubContext& Ctx) { m_CasStore.Scrub(Ctx); } +#endif // ZEN_WITH_TESTS CidStoreStats Stats() { @@ -236,11 +228,13 @@ CidStore::Flush() m_Impl->Flush(); } +#if ZEN_WITH_TESTS void -CidStore::ScrubStorage(ScrubContext& Ctx) +CidStore::Scrub(ScrubContext& Ctx) { - m_Impl->ScrubStorage(Ctx); + m_Impl->Scrub(Ctx); } +#endif // ZEN_WITH_TESTS CidStoreSize CidStore::TotalSize() const diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index 15e4cbf81..32c256a42 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -68,10 +68,10 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore void Flush(); // GcStorage - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; virtual GcStorageSize StorageSize() const override; + // GcReferenceStore virtual std::string GetGcName(GcCtx& Ctx) override; virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override; diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index ae5b2014d..49c52f847 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -106,6 +106,12 @@ static_assert(sizeof(DiskIndexEntry) == 32); ////////////////////////////////////////////////////////////////////////// +struct CacheStoreSize +{ + uint64_t DiskSize = 0; + uint64_t MemorySize = 0; +}; + class ZenCacheDiskLayer { public: @@ -131,8 +137,8 @@ public: struct BucketInfo { - uint64_t EntryCount = 0; - GcStorageSize StorageSize; + uint64_t EntryCount = 0; + CacheStoreSize StorageSize; }; struct Info @@ -141,7 +147,7 @@ public: Configuration Config; std::vector<std::string> BucketNames; uint64_t EntryCount = 0; - GcStorageSize StorageSize; + CacheStoreSize StorageSize; }; struct BucketStats @@ -199,11 +205,10 @@ public: std::function<void()> Drop(); std::function<void()> DropBucket(std::string_view Bucket); void Flush(); - void ScrubStorage(ScrubContext& Ctx); - void DiscoverBuckets(); - GcStorageSize StorageSize() const; - DiskStats Stats() const; + void DiscoverBuckets(); + CacheStoreSize TotalSize() const; + DiskStats Stats() const; Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; @@ -220,6 +225,7 @@ public: #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); + void Scrub(ScrubContext& Ctx); #endif // ZEN_WITH_TESTS bool GetContentStats(std::string_view BucketName, CacheContentStats& OutContentStats) const; @@ -227,7 +233,7 @@ public: /** A cache bucket manages a single directory containing metadata and data for that bucket */ - struct CacheBucket : public GcReferencer + struct CacheBucket : public GcReferencer, public GcStorage { CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, @@ -244,17 +250,22 @@ public: void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult); - void EndPutBatch(PutBatchHandle* Batch) noexcept; - PutResult Put(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - bool Overwrite, - PutBatchHandle* OptionalBatchHandle); - uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); - std::function<void()> Drop(); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); + PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult); + void EndPutBatch(PutBatchHandle* Batch) noexcept; + PutResult Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + std::function<void()> Drop(); + void Flush(); + inline CacheStoreSize TotalSize() const + { + return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), + .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; + } + RwLock::SharedLockScope GetGcReferencerLock(); struct ReferencesStats @@ -277,11 +288,6 @@ public: std::span<const std::size_t> ChunkIndexes, std::vector<IoHash>& OutReferences) const; - inline GcStorageSize StorageSize() const - { - return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), - .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; - } uint64_t EntryCount() const; BucketStats Stats(); @@ -293,6 +299,20 @@ public: void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS + // GcReferencer + virtual std::string GetGcName(GcCtx& Ctx) override; + virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; + + // GcStorage + virtual void ScrubStorage(ScrubContext& Ctx) override; + virtual GcStorageSize StorageSize() const override + { + CacheStoreSize Size = TotalSize(); + return {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; + } + private: #pragma pack(push) #pragma pack(1) @@ -391,11 +411,6 @@ public: std::atomic_uint64_t m_StandaloneSize{}; std::atomic_uint64_t m_MemCachedSize{}; - virtual std::string GetGcName(GcCtx& Ctx) override; - virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; - virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; - virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; bool ShouldRejectPut(const IoHash& HashKey, ZenCacheValue& InOutValue, bool Overwrite, ZenCacheDiskLayer::PutResult& OutPutResult); void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index 104746aba..80340d72c 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -70,11 +70,13 @@ IsCompressedBinary(ZenContentType Type) struct CacheRpcHandler { + typedef std::function<CidStore&(std::string_view Context)> GetCidStoreFunc; + CacheRpcHandler(LoggerRef InLog, CacheStats& InCacheStats, UpstreamCacheClient& InUpstreamCache, ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& InGetCidStore, const DiskWriteBlocker* InDiskWriteBlocker); ~CacheRpcHandler(); @@ -94,6 +96,8 @@ struct CacheRpcHandler int& OutTargetProcessId, CbPackage& OutPackage); + CidStore& GetCidStore(std::string_view Namespace); + private: CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest); CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest); @@ -142,7 +146,7 @@ private: CacheStats& m_CacheStats; UpstreamCacheClient& m_UpstreamCache; ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; + GetCidStoreFunc m_GetCidStore; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; bool AreDiskWritesAllowed() const; diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index b6e8e7565..c51d7312c 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -49,7 +49,7 @@ class JobQueue; */ -class ZenCacheNamespace final : public GcStorage +class ZenCacheNamespace final { public: struct Configuration @@ -107,10 +107,7 @@ public: std::function<void()> Drop(); void Flush(); - // GcStorage - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; - virtual GcStorageSize StorageSize() const override; - + CacheStoreSize TotalSize() const; Configuration GetConfig() const { return m_Configuration; } Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; @@ -127,6 +124,7 @@ public: #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); + void Scrub(ScrubContext& ScrubCtx); #endif // ZEN_WITH_TESTS private: @@ -140,7 +138,6 @@ private: std::atomic<uint64_t> m_WriteCount{}; metrics::RequestStats m_PutOps; metrics::RequestStats m_GetOps; - uint64_t m_LastScrubTime = 0; ZenCacheNamespace(const ZenCacheNamespace&) = delete; ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete; @@ -178,7 +175,7 @@ public: Configuration Config; std::vector<std::string> NamespaceNames; uint64_t DiskEntryCount = 0; - GcStorageSize StorageSize; + CacheStoreSize StorageSize; }; struct NamedNamespaceStats @@ -260,13 +257,12 @@ public: bool DropBucket(std::string_view Namespace, std::string_view Bucket); bool DropNamespace(std::string_view Namespace); void Flush(); - void ScrubStorage(ScrubContext& Ctx); CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter, const std::string_view BucketFilter, const std::string_view ValueFilter) const; - GcStorageSize StorageSize() const; + CacheStoreSize TotalSize() const; CacheStoreStats Stats(bool IncludeNamespaceStats = true); Configuration GetConfiguration() const { return m_Configuration; } @@ -296,6 +292,10 @@ public: bool GetContentStats(std::string_view Namespace, std::string_view BucketName, CacheContentStats& OutContentStats) const; +#if ZEN_WITH_TESTS + void Scrub(ScrubContext& Ctx); +#endif // ZEN_WITH_TESTS + private: const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const; ZenCacheNamespace* GetNamespace(std::string_view Namespace); diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h index 152031c3a..c3993c028 100644 --- a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h +++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h @@ -113,7 +113,7 @@ public: std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheChunksGetComplete&& OnComplete) = 0; - virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) = 0; }; } // namespace zen diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index b3d00fec0..8918b119f 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -87,12 +87,15 @@ public: bool ContainsChunk(const IoHash& DecompressedId); void FilterChunks(HashKeySet& InOutChunks); void Flush(); - void ScrubStorage(ScrubContext& Ctx); CidStoreSize TotalSize() const; CidStoreStats Stats() const; virtual void ReportMetrics(StatsMetrics& Statsd) override; +#if ZEN_WITH_TESTS + void Scrub(ScrubContext& Ctx); +#endif // ZEN_WITH_TESTS + private: struct Impl; std::unique_ptr<CasStore> m_CasStore; |