diff options
| author | Dan Engelbrecht <[email protected]> | 2025-08-20 12:33:03 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-08-20 12:33:03 +0200 |
| commit | 4c05d1041461b630cd5770dae5e8d03147d5674b (patch) | |
| tree | 3f5d6b1b4b2b3f167f94e98f902a5f60c2e3d753 /src | |
| parent | zen print fixes/improvements (#469) (diff) | |
| download | zen-4c05d1041461b630cd5770dae5e8d03147d5674b.tar.xz zen-4c05d1041461b630cd5770dae5e8d03147d5674b.zip | |
per namespace/project cas prep refactor (#470)
- Refactor so we can have more than one cas store for project store and cache.
- Refactor `UpstreamCacheClient` so it is not tied to a specific CidStore
- Refactor scrub to keep the GC interface ScrubStorage function separate from scrub accessor functions (renamed to Scrub).
- Refactor storage size to keep GC interface StorageSize function separate from size accessor functions (renamed to TotalSize)
- Refactor cache storage so `ZenCacheDiskLayer::CacheBucket` implements GcStorage interface rather than `ZenCacheNamespace`
Diffstat (limited to 'src')
24 files changed, 703 insertions, 480 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 9d982678a..8c2e6d771 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -17,14 +17,11 @@ # include <mimalloc.h> #endif -#include <zenstore/cidstore.h> #include <zenstore/gc.h> -#include <zenstore/buildstore/buildstore.h> #include <zenstore/cache/structuredcachestore.h> #include <zenutil/workerpools.h> #include "config.h" -#include "projectstore/projectstore.h" #include <chrono> @@ -104,17 +101,13 @@ GetStatsForStateDirectory(std::filesystem::path StateDir) HttpAdminService::HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJobQueue, ZenCacheStore* CacheStore, - CidStore* CidStore, - ProjectStore* ProjectStore, - BuildStore* BuildStore, + std::function<void()>&& FlushFunction, const LogPaths& LogPaths, const ZenServerOptions& ServerOptions) : m_GcScheduler(Scheduler) , m_BackgroundJobQueue(BackgroundJobQueue) , m_CacheStore(CacheStore) -, m_CidStore(CidStore) -, m_ProjectStore(ProjectStore) -, m_BuildStore(BuildStore) +, m_FlushFunction(std::move(FlushFunction)) , m_LogPaths(LogPaths) , m_ServerOptions(ServerOptions) { @@ -783,22 +776,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, "flush", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); - if (m_CidStore) - { - m_CidStore->Flush(); - } - if (m_CacheStore) - { - m_CacheStore->Flush(); - } - if (m_ProjectStore) - { - m_ProjectStore->Flush(); - } - if (m_BuildStore) - { - m_BuildStore->Flush(); - } + m_FlushFunction(); HttpReq.WriteResponse(HttpResponseCode::OK); }, HttpVerb::kPost); diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h index e7821dead..9a49f5120 100644 --- a/src/zenserver/admin/admin.h +++ b/src/zenserver/admin/admin.h @@ -4,15 +4,13 @@ #include <zencore/compactbinary.h> #include <zenhttp/httpserver.h> +#include <functional> namespace zen { class GcScheduler; class JobQueue; class ZenCacheStore; -class CidStore; -class ProjectStore; -class BuildStore; struct ZenServerOptions; class HttpAdminService : public zen::HttpService @@ -27,9 +25,7 @@ public: HttpAdminService(GcScheduler& Scheduler, JobQueue& BackgroundJobQueue, ZenCacheStore* CacheStore, - CidStore* CidStore, - ProjectStore* ProjectStore, - BuildStore* BuildStore, + std::function<void()>&& FlushFunction, const LogPaths& LogPaths, const ZenServerOptions& ServerOptions); ~HttpAdminService(); @@ -42,9 +38,7 @@ private: GcScheduler& m_GcScheduler; JobQueue& m_BackgroundJobQueue; ZenCacheStore* m_CacheStore; - CidStore* m_CidStore; - ProjectStore* m_ProjectStore; - BuildStore* m_BuildStore; + std::function<void()> m_FlushFunction; LogPaths m_LogPaths; const ZenServerOptions& m_ServerOptions; }; diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index acb6053d9..19ac3a216 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -85,7 +85,7 @@ namespace { ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& GetCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache, @@ -95,11 +95,10 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) -, m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) , m_DiskWriteBlocker(InDiskWriteBlocker) , m_OpenProcessCache(InOpenProcessCache) -, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker) +, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, std::move(GetCidStore), InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); @@ -131,24 +130,6 @@ HttpStructuredCacheService::Flush() } void -HttpStructuredCacheService::ScrubStorage(ScrubContext& Ctx) -{ - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - - ZenCacheStore::Info Info = m_CacheStore.GetInfo(); - - ZEN_INFO("scrubbing '{}'", Info.BasePath); - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_CidStore.ScrubStorage(Ctx); - m_CacheStore.ScrubStorage(Ctx); -} - -void HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { std::string_view Key = Request.RelativeUri(); @@ -243,6 +224,9 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) for (const auto& NamespaceIt : ValueDetails.Namespaces) { const std::string& Namespace = NamespaceIt.first; + + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace); + for (const auto& BucketIt : NamespaceIt.second.Buckets) { const std::string& Bucket = BucketIt.first; @@ -252,7 +236,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString() << ", " << gsl::narrow<uint64_t>(Payload.GetSize()); @@ -270,7 +254,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) size_t AttachmentsSize = 0; for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); AttachmentsSize += Payload.GetSize(); } CSVWriter << ", " << gsl::narrow<uint64_t>(AttachmentsSize); @@ -292,6 +276,9 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) for (const auto& NamespaceIt : ValueDetails.Namespaces) { const std::string& Namespace = NamespaceIt.first; + + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace); + Cbo.BeginObject(); { Cbo.AddString("name", Namespace); @@ -334,7 +321,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { Cbo.BeginObject(); Cbo.AddHash("cid", Hash); - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); Cbo.AddInteger("size", gsl::narrow<uint64_t>(Payload.GetSize())); Cbo.EndObject(); } @@ -348,7 +335,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) size_t AttachmentsSize = 0; for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); AttachmentsSize += Payload.GetSize(); } Cbo.AddInteger("attachmentssize", gsl::narrow<uint64_t>(AttachmentsSize)); @@ -623,6 +610,8 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName); + if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty()) { ResponseWriter.BeginObject("BucketSizes"); @@ -681,7 +670,7 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque uint64_t AttachmentsSize = 0; - m_CidStore.IterateChunks( + ChunkStore.IterateChunks( AllAttachments, [&](size_t Index, const IoBuffer& Payload) { ZEN_UNUSED(Index); @@ -749,6 +738,8 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName); + if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true") { CacheContentStats ContentStats; @@ -775,7 +766,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); - m_CidStore.IterateChunks( + ChunkStore.IterateChunks( ContentStats.Attachments, [&](size_t Index, const IoBuffer& Payload) { ZEN_UNUSED(Index); @@ -850,6 +841,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; Stopwatch Timer; + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { @@ -864,17 +857,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con uint32_t MissingCount = 0; CbObjectView CacheRecord(ClientResultValue.Value.Data()); - CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { + CacheRecord.IterateAttachments([this, &ChunkStore, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { if (SkipData) { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + if (!ChunkStore.ContainsChunk(AttachmentHash.AsHash())) { MissingCount++; } } else { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + if (IoBuffer Chunk = ChunkStore.FindChunkByCid(AttachmentHash.AsHash())) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) @@ -974,6 +967,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { Success = true; + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + ClientResultValue.Value = UpstreamResult.Value; ClientResultValue.Value.SetContentType(AcceptType); @@ -1024,6 +1019,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con CacheRecord.IterateAttachments([this, &Package, &Ref, + &ChunkStore, &WriteAttachmentBuffers, &WriteRawHashes, &ReferencedAttachments, @@ -1058,12 +1054,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { if (SkipData) { - if (m_CidStore.ContainsChunk(Hash)) + if (ChunkStore.ContainsChunk(Hash)) { Count.Valid++; } } - else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) + else if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Hash)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) @@ -1105,7 +1101,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (!WriteAttachmentBuffers.empty()) { std::vector<CidStore::InsertResult> InsertResults = - m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (const CidStore::InsertResult& Result : InsertResults) { if (Result.New) @@ -1272,7 +1268,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + m_UpstreamCache.EnqueueUpstream( + {.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}", @@ -1306,15 +1305,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con std::vector<IoHash> ReferencedAttachments; int32_t TotalCount = 0; - CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) { - const IoHash Hash = AttachmentHash.AsHash(); - ReferencedAttachments.push_back(Hash); - if (m_CidStore.ContainsChunk(Hash)) - { - ValidAttachments.emplace_back(Hash); - } - TotalCount++; - }); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + CacheRecord.IterateAttachments( + [this, &ChunkStore, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) { + const IoHash Hash = AttachmentHash.AsHash(); + ReferencedAttachments.push_back(Hash); + if (ChunkStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + } + TotalCount++; + }); const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); @@ -1348,10 +1350,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con CachePolicy Policy = PolicyFromUrl; if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbObject, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } Request.WriteResponse(HttpResponseCode::Created); @@ -1384,38 +1388,46 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con WriteAttachmentBuffers.reserve(NumAttachments); WriteRawHashes.reserve(NumAttachments); - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count]( - CbFieldView HashView) { - const IoHash Hash = HashView.AsHash(); - ReferencedAttachments.push_back(Hash); - if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) - { - if (Attachment->IsCompressedBinary()) - { - WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(Hash); - ValidAttachments.emplace_back(Hash); - Count.Valid++; - } - else - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(HttpContentType::kCbPackage), - Hash); - Count.Invalid++; - } - } - else if (m_CidStore.ContainsChunk(Hash)) + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + CacheRecord.IterateAttachments([this, + &Ref, + &Package, + &ChunkStore, + &WriteAttachmentBuffers, + &WriteRawHashes, + &ValidAttachments, + &ReferencedAttachments, + &Count](CbFieldView HashView) { + const IoHash Hash = HashView.AsHash(); + ReferencedAttachments.push_back(Hash); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) + { + if (Attachment->IsCompressedBinary()) { + WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Hash); ValidAttachments.emplace_back(Hash); Count.Valid++; } - Count.Total++; - }); + else + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(HttpContentType::kCbPackage), + Hash); + Count.Invalid++; + } + } + else if (ChunkStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + Count.Valid++; + } + Count.Total++; + }); if (Count.Invalid > 0) { @@ -1439,7 +1451,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (!WriteAttachmentBuffers.empty()) { - std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + std::vector<CidStore::InsertResult> InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (const CidStore::InsertResult& InsertResult : InsertResults) { if (InsertResult.New) @@ -1466,10 +1478,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } Request.WriteResponse(HttpResponseCode::Created); @@ -1503,7 +1517,9 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { Stopwatch Timer; - IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + IoBuffer Value = ChunkStore.FindChunkByCid(Ref.ValueContentId); const UpstreamEndpointInfo* Source = nullptr; CachePolicy Policy = PolicyFromUrl; @@ -1525,7 +1541,7 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { if (AreDiskWritesAllowed()) { - m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + ChunkStore.AddChunk(UpstreamResult.Value, RawHash); } Source = UpstreamResult.Source; } @@ -1619,7 +1635,9 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons "ValueContentId does not match attachment hash"sv); } - CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + CidStore::InsertResult Result = ChunkStore.AddChunk(Body, RawHash); ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, @@ -1834,16 +1852,51 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) EmitSnapshot("requests", m_HttpRequests, Cbo); - const uint64_t HitCount = m_CacheStats.HitCount; - const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; - const uint64_t MissCount = m_CacheStats.MissCount; - const uint64_t WriteCount = m_CacheStats.WriteCount; - const uint64_t BadRequestCount = m_CacheStats.BadRequestCount; - struct CidStoreStats StoreStats = m_CidStore.Stats(); - const uint64_t ChunkHitCount = StoreStats.HitCount; - const uint64_t ChunkMissCount = StoreStats.MissCount; - const uint64_t ChunkWriteCount = StoreStats.WriteCount; - const uint64_t TotalCount = HitCount + MissCount; + const uint64_t HitCount = m_CacheStats.HitCount; + const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; + const uint64_t MissCount = m_CacheStats.MissCount; + const uint64_t WriteCount = m_CacheStats.WriteCount; + const uint64_t BadRequestCount = m_CacheStats.BadRequestCount; + + uint64_t TotalChunkHitCount = 0; + uint64_t TotalChunkMissCount = 0; + uint64_t TotalChunkWriteCount = 0; + CidStoreSize TotalCidSize; + + tsl::robin_map<CidStore*, std::string> UniqueStores; + { + std::vector<std::string> NamespaceNames = m_CacheStore.GetNamespaces(); + + for (const std::string& NamespaceName : NamespaceNames) + { + CidStore* Store = &m_RpcHandler.GetCidStore(NamespaceName); + if (auto It = UniqueStores.find(Store); It == UniqueStores.end()) + { + UniqueStores.insert_or_assign(Store, NamespaceName); + } + else + { + UniqueStores.insert_or_assign(Store, std::string{}); + } + } + + for (auto It : UniqueStores) + { + CidStore* ChunkStore = It.first; + + CidStoreStats StoreStats = ChunkStore->Stats(); + CidStoreSize StoreSize = ChunkStore->TotalSize(); + + TotalChunkHitCount += StoreStats.HitCount; + TotalChunkMissCount += StoreStats.MissCount; + TotalChunkWriteCount += StoreStats.WriteCount; + + TotalCidSize.TinySize += StoreSize.TinySize; + TotalCidSize.SmallSize += StoreSize.SmallSize; + TotalCidSize.LargeSize += StoreSize.LargeSize; + TotalCidSize.TotalSize += StoreSize.TotalSize; + } + } const uint64_t RpcRequests = m_CacheStats.RpcRequests; const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests; @@ -1853,17 +1906,11 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests; const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests; - const CidStoreSize CidSize = m_CidStore.TotalSize(); - const GcStorageSize CacheSize = m_CacheStore.StorageSize(); + const CacheStoreSize CacheSize = m_CacheStore.TotalSize(); bool ShowCidStoreStats = Request.GetQueryParams().GetValue("cidstorestats") == "true"; bool ShowCacheStoreStats = Request.GetQueryParams().GetValue("cachestorestats") == "true"; - CidStoreStats CidStoreStats = {}; - if (ShowCidStoreStats) - { - CidStoreStats = m_CidStore.Stats(); - } ZenCacheStore::CacheStoreStats CacheStoreStats = {}; if (ShowCacheStoreStats) { @@ -1898,6 +1945,7 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) Cbo.EndObject(); Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount; + const uint64_t TotalCount = HitCount + MissCount; Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); if (m_UpstreamCache.IsActive()) @@ -1908,7 +1956,9 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); } - Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount; + Cbo << "cidhits" << TotalChunkHitCount << "cidmisses" << TotalChunkMissCount << "cidwrites" << TotalChunkWriteCount; + const uint64_t TotalChunkCount = TotalChunkHitCount + TotalChunkMissCount; + Cbo << "cidhit_ratio" << (TotalChunkHitCount ? (double(TotalChunkCount) / double(TotalChunkHitCount)) : 0.0); if (ShowCacheStoreStats) { @@ -2017,20 +2067,58 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) { Cbo.BeginObject("size"); { - Cbo << "tiny" << CidSize.TinySize; - Cbo << "small" << CidSize.SmallSize; - Cbo << "large" << CidSize.LargeSize; - Cbo << "total" << CidSize.TotalSize; + Cbo << "tiny" << TotalCidSize.TinySize; + Cbo << "small" << TotalCidSize.SmallSize; + Cbo << "large" << TotalCidSize.LargeSize; + Cbo << "total" << TotalCidSize.TotalSize; } Cbo.EndObject(); if (ShowCidStoreStats) { Cbo.BeginObject("store"); - Cbo << "hits" << CidStoreStats.HitCount << "misses" << CidStoreStats.MissCount << "writes" << CidStoreStats.WriteCount; - EmitSnapshot("read", CidStoreStats.FindChunkOps, Cbo); - EmitSnapshot("write", CidStoreStats.AddChunkOps, Cbo); - // EmitSnapshot("exists", CidStoreStats.ContainChunkOps, Cbo); + + auto OutputStats = [&](CidStore& ChunkStore) { + CidStoreStats StoreStats = ChunkStore.Stats(); + Cbo << "hits" << StoreStats.HitCount << "misses" << StoreStats.MissCount << "writes" << StoreStats.WriteCount; + const uint64_t Count = StoreStats.HitCount + StoreStats.MissCount; + Cbo << "hit_ratio" << (Count ? (double(StoreStats.HitCount) / double(Count)) : 0.0); + EmitSnapshot("read", StoreStats.FindChunkOps, Cbo); + EmitSnapshot("write", StoreStats.AddChunkOps, Cbo); + }; + + if (UniqueStores.size() > 1) + { + Cbo.BeginArray("namespaces"); + for (auto It : UniqueStores) + { + CidStore* ChunkStore = It.first; + const std::string& Namespace = It.second; + CidStoreSize ChunkStoreSize = ChunkStore->TotalSize(); + Cbo.BeginObject(); + { + Cbo << "namespace" << Namespace; + Cbo.BeginObject("stats"); + OutputStats(*ChunkStore); + Cbo.EndObject(); + + Cbo.BeginObject("size"); + { + Cbo << "tiny" << ChunkStoreSize.TinySize; + Cbo << "small" << ChunkStoreSize.SmallSize; + Cbo << "large" << ChunkStoreSize.LargeSize; + Cbo << "total" << ChunkStoreSize.TotalSize; + } + Cbo.EndObject(); + } + Cbo.EndObject(); + } + Cbo.EndArray(); // namespaces + } + else if (UniqueStores.size() != 0) + { + OutputStats(*UniqueStores.begin()->first); + } Cbo.EndObject(); } } diff --git a/src/zenserver/cache/httpstructuredcache.h b/src/zenserver/cache/httpstructuredcache.h index 13c1d6475..d46ca145d 100644 --- a/src/zenserver/cache/httpstructuredcache.h +++ b/src/zenserver/cache/httpstructuredcache.h @@ -70,8 +70,10 @@ namespace cache { class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider { public: + typedef std::function<CidStore&(std::string_view Context)> GetCidStoreFunc; + HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& GetCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache, @@ -83,7 +85,6 @@ public: virtual void HandleRequest(HttpServerRequest& Request) override; void Flush(); - void ScrubStorage(ScrubContext& Ctx); private: struct CacheRef @@ -116,9 +117,7 @@ private: ZenCacheStore& m_CacheStore; HttpStatsService& m_StatsService; HttpStatusService& m_StatusService; - CidStore& m_CidStore; UpstreamCache& m_UpstreamCache; - uint64_t m_LastScrubTime = 0; metrics::OperationTiming m_HttpRequests; metrics::OperationTiming m_UpstreamGetRequestTiming; CacheStats m_CacheStats; diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 317a419eb..9600133f3 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -235,13 +235,11 @@ namespace { ////////////////////////////////////////////////////////////////////////// -HttpProjectService::HttpProjectService(CidStore& Store, - ProjectStore* Projects, +HttpProjectService::HttpProjectService(ProjectStore* Projects, HttpStatusService& StatusService, HttpStatsService& StatsService, AuthMgr& AuthMgr) : m_Log(logging::Get("project")) -, m_CidStore(Store) , m_ProjectStore(Projects) , m_StatusService(StatusService) , m_StatsService(StatsService) @@ -407,8 +405,45 @@ HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq) { ZEN_TRACE_CPU("ProjectService::Stats"); - const GcStorageSize StoreSize = m_ProjectStore->StorageSize(); - const CidStoreSize CidSize = m_CidStore.TotalSize(); + bool ShowCidStoreStats = HttpReq.GetQueryParams().GetValue("cidstorestats") == "true"; + + const GcStorageSize StoreSize = m_ProjectStore->StorageSize(); + uint64_t TotalChunkHitCount = 0; + uint64_t TotalChunkMissCount = 0; + uint64_t TotalChunkWriteCount = 0; + CidStoreSize TotalCidSize; + + tsl::robin_map<CidStore*, std::string> UniqueStores; + { + m_ProjectStore->IterateProjects([&UniqueStores](ProjectStore::Project& Project) { + CidStore* Store = &Project.GetCidStore(); + if (auto It = UniqueStores.find(Store); It == UniqueStores.end()) + { + UniqueStores.insert_or_assign(Store, Project.Identifier); + } + else + { + UniqueStores.insert_or_assign(Store, std::string{}); + } + }); + + for (auto It : UniqueStores) + { + CidStore* ChunkStore = It.first; + + CidStoreStats ChunkStoreStats = ChunkStore->Stats(); + CidStoreSize ChunkStoreSize = ChunkStore->TotalSize(); + + TotalChunkHitCount += ChunkStoreStats.HitCount; + TotalChunkMissCount += ChunkStoreStats.MissCount; + TotalChunkWriteCount += ChunkStoreStats.WriteCount; + + TotalCidSize.TinySize += ChunkStoreSize.TinySize; + TotalCidSize.SmallSize += ChunkStoreSize.SmallSize; + TotalCidSize.LargeSize += ChunkStoreSize.LargeSize; + TotalCidSize.TotalSize += ChunkStoreSize.TotalSize; + } + } CbObjectWriter Cbo; @@ -460,12 +495,66 @@ HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq) { Cbo.BeginObject("size"); { - Cbo << "tiny" << CidSize.TinySize; - Cbo << "small" << CidSize.SmallSize; - Cbo << "large" << CidSize.LargeSize; - Cbo << "total" << CidSize.TotalSize; + Cbo << "tiny" << TotalCidSize.TinySize; + Cbo << "small" << TotalCidSize.SmallSize; + Cbo << "large" << TotalCidSize.LargeSize; + Cbo << "total" << TotalCidSize.TotalSize; } Cbo.EndObject(); + + if (ShowCidStoreStats) + { + Cbo << "cidhits" << TotalChunkHitCount << "cidmisses" << TotalChunkMissCount << "cidwrites" << TotalChunkWriteCount; + const uint64_t TotalChunkCount = TotalChunkHitCount + TotalChunkMissCount; + Cbo << "cidhit_ratio" << (TotalChunkHitCount ? (double(TotalChunkCount) / double(TotalChunkHitCount)) : 0.0); + + Cbo.BeginObject("store"); + + auto OutputStats = [&](CidStore& ChunkStore) { + CidStoreStats StoreStats = ChunkStore.Stats(); + Cbo << "hits" << StoreStats.HitCount << "misses" << StoreStats.MissCount << "writes" << StoreStats.WriteCount; + const uint64_t Count = StoreStats.HitCount + StoreStats.MissCount; + Cbo << "hit_ratio" << (Count ? (double(StoreStats.HitCount) / double(Count)) : 0.0); + EmitSnapshot("read", StoreStats.FindChunkOps, Cbo); + EmitSnapshot("write", StoreStats.AddChunkOps, Cbo); + }; + + if (UniqueStores.size() > 1) + { + Cbo.BeginArray("projects"); + for (auto It : UniqueStores) + { + CidStore* ChunkStore = It.first; + const std::string& ProjectId = It.second; + CidStoreSize ChunkStoreSize = ChunkStore->TotalSize(); + + Cbo.BeginObject(); + { + Cbo << "project" << ProjectId; + Cbo.BeginObject("stats"); + OutputStats(*ChunkStore); + Cbo.EndObject(); + + Cbo.BeginObject("size"); + { + Cbo << "tiny" << ChunkStoreSize.TinySize; + Cbo << "small" << ChunkStoreSize.SmallSize; + Cbo << "large" << ChunkStoreSize.LargeSize; + Cbo << "total" << ChunkStoreSize.TotalSize; + } + Cbo.EndObject(); + } + Cbo.EndObject(); + } + Cbo.EndArray(); // projects + } + else if (UniqueStores.size() != 0) + { + CidStore& ChunkStore = *UniqueStores.begin()->first; + OutputStats(ChunkStore); + } + Cbo.EndObject(); + } } Cbo.EndObject(); @@ -1125,6 +1214,8 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) } Project->TouchOplog(OplogId); + CidStore& ChunkStore = Project->GetCidStore(); + ProjectStore::Oplog& Oplog = *FoundLog; IoBuffer Payload = HttpReq.ReadPayload(); @@ -1137,7 +1228,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) std::vector<IoHash> MissingChunks; CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer { - if (m_CidStore.ContainsChunk(Hash)) + if (ChunkStore.ContainsChunk(Hash)) { // Return null attachment as we already have it, no point in reading it and storing it again return {}; @@ -1393,6 +1484,8 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req) } Project->TouchOplog(OplogId); + CidStore& ChunkStore = Project->GetCidStore(); + ProjectStore::Oplog& Oplog = *FoundLog; if (const std::optional<int32_t> OpId = ParseInt<uint32_t>(OpIdString)) @@ -1407,7 +1500,7 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req) Op.IterateAttachments([&](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsAttachment(); - IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); + IoBuffer Payload = ChunkStore.FindChunkByCid(AttachmentHash); if (Payload) { switch (Payload.GetContentType()) @@ -2036,11 +2129,14 @@ HttpProjectService::HandleDetailsRequest(HttpRouterRequest& Req) CSVHeader(Details, AttachmentDetails, CSVWriter); m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { + CidStore& ChunkStore = Project.GetCidStore(); + Project.IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) { - Oplog.IterateOplogWithKey( - [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](uint32_t LSN, const Oid& Key, CbObjectView Op) { - CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); - }); + Oplog.IterateOplogWithKey([this, &Project, &Oplog, &ChunkStore, &CSVWriter, Details, AttachmentDetails](uint32_t LSN, + const Oid& Key, + CbObjectView Op) { + CSVWriteOp(ChunkStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); + }); }); }); @@ -2054,8 +2150,9 @@ HttpProjectService::HandleDetailsRequest(HttpRouterRequest& Req) m_ProjectStore->DiscoverProjects(); m_ProjectStore->IterateProjects([&](ProjectStore::Project& Project) { - std::vector<std::string> OpLogs = Project.ScanForOplogs(); - CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); + CidStore& ChunkStore = Project.GetCidStore(); + std::vector<std::string> OpLogs = Project.ScanForOplogs(); + CbWriteProject(ChunkStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); }); } Cbo.EndArray(); @@ -2084,7 +2181,8 @@ HttpProjectService::HandleProjectDetailsRequest(HttpRouterRequest& Req) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } - ProjectStore::Project& Project = *FoundProject.Get(); + ProjectStore::Project& Project = *FoundProject.Get(); + CidStore& ChunkStore = Project.GetCidStore(); if (CSV) { @@ -2092,10 +2190,11 @@ HttpProjectService::HandleProjectDetailsRequest(HttpRouterRequest& Req) CSVHeader(Details, AttachmentDetails, CSVWriter); FoundProject->IterateOplogs([&](const RwLock::SharedLockScope&, ProjectStore::Oplog& Oplog) { - Oplog.IterateOplogWithKey( - [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](uint32_t LSN, const Oid& Key, CbObjectView Op) { - CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); - }); + Oplog.IterateOplogWithKey([this, &Project, &Oplog, &ChunkStore, &CSVWriter, Details, AttachmentDetails](uint32_t LSN, + const Oid& Key, + CbObjectView Op) { + CSVWriteOp(ChunkStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); + }); }); HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); } @@ -2105,7 +2204,7 @@ HttpProjectService::HandleProjectDetailsRequest(HttpRouterRequest& Req) std::vector<std::string> OpLogs = FoundProject->ScanForOplogs(); Cbo.BeginArray("projects"); { - CbWriteProject(m_CidStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); + CbWriteProject(ChunkStore, Project, OpLogs, Details, OpDetails, AttachmentDetails, Cbo); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); @@ -2141,16 +2240,17 @@ HttpProjectService::HandleOplogDetailsRequest(HttpRouterRequest& Req) return HttpReq.WriteResponse(HttpResponseCode::NotFound); } - ProjectStore::Project& Project = *FoundProject.Get(); - ProjectStore::Oplog& Oplog = *FoundLog; + ProjectStore::Project& Project = *FoundProject.Get(); + CidStore& ChunkStore = Project.GetCidStore(); + ProjectStore::Oplog& Oplog = *FoundLog; if (CSV) { ExtendableStringBuilder<4096> CSVWriter; CSVHeader(Details, AttachmentDetails, CSVWriter); Oplog.IterateOplogWithKey( - [this, &Project, &Oplog, &CSVWriter, Details, AttachmentDetails](uint32_t LSN, const Oid& Key, CbObjectView Op) { - CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); + [this, &Project, &Oplog, &ChunkStore, &CSVWriter, Details, AttachmentDetails](uint32_t LSN, const Oid& Key, CbObjectView Op) { + CSVWriteOp(ChunkStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN, Key, Op, CSVWriter); }); HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); } @@ -2159,7 +2259,7 @@ HttpProjectService::HandleOplogDetailsRequest(HttpRouterRequest& Req) CbObjectWriter Cbo; Cbo.BeginArray("oplogs"); { - CbWriteOplog(m_CidStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); + CbWriteOplog(ChunkStore, Oplog, Details, OpDetails, AttachmentDetails, Cbo); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); @@ -2204,9 +2304,10 @@ HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req) fmt::format("Chunk info request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, OpId)); } - const Oid ObjId = Oid::FromHexString(OpId); - ProjectStore::Project& Project = *FoundProject.Get(); - ProjectStore::Oplog& Oplog = *FoundLog; + const Oid ObjId = Oid::FromHexString(OpId); + ProjectStore::Project& Project = *FoundProject.Get(); + CidStore& ChunkStore = Project.GetCidStore(); + ProjectStore::Oplog& Oplog = *FoundLog; std::optional<CbObject> Op = Oplog.GetOpByKey(ObjId); if (!Op.has_value()) @@ -2224,7 +2325,7 @@ HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req) ExtendableStringBuilder<4096> CSVWriter; CSVHeader(Details, AttachmentDetails, CSVWriter); - CSVWriteOp(m_CidStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN.value(), ObjId, Op.value(), CSVWriter); + CSVWriteOp(ChunkStore, Project.Identifier, Oplog.OplogId(), Details, AttachmentDetails, LSN.value(), ObjId, Op.value(), CSVWriter); HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, CSVWriter.ToView()); } else @@ -2232,7 +2333,7 @@ HttpProjectService::HandleOplogOpDetailsRequest(HttpRouterRequest& Req) CbObjectWriter Cbo; Cbo.BeginArray("ops"); { - CbWriteOp(m_CidStore, Details, OpDetails, AttachmentDetails, LSN.value(), ObjId, Op.value(), Cbo); + CbWriteOp(ChunkStore, Details, OpDetails, AttachmentDetails, LSN.value(), ObjId, Op.value(), Cbo); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); diff --git a/src/zenserver/projectstore/httpprojectstore.h b/src/zenserver/projectstore/httpprojectstore.h index 295defa5c..5782188e6 100644 --- a/src/zenserver/projectstore/httpprojectstore.h +++ b/src/zenserver/projectstore/httpprojectstore.h @@ -35,11 +35,7 @@ class ProjectStore; class HttpProjectService : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider { public: - HttpProjectService(CidStore& Store, - ProjectStore* InProjectStore, - HttpStatusService& StatusService, - HttpStatsService& StatsService, - AuthMgr& AuthMgr); + HttpProjectService(ProjectStore* InProjectStore, HttpStatusService& StatusService, HttpStatsService& StatsService, AuthMgr& AuthMgr); ~HttpProjectService(); virtual const char* BaseUri() const override; @@ -92,7 +88,6 @@ private: inline LoggerRef Log() { return m_Log; } LoggerRef m_Log; - CidStore& m_CidStore; HttpRequestRouter m_Router; Ref<ProjectStore> m_ProjectStore; HttpStatusService& m_StatusService; diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 61c279afa..d6dd6ef9b 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1176,7 +1176,7 @@ ProjectStore::Oplog::Flush() } void -ProjectStore::Oplog::ScrubStorage(ScrubContext& Ctx) +ProjectStore::Oplog::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); @@ -3575,7 +3575,7 @@ ProjectStore::Project::Flush() } void -ProjectStore::Project::ScrubStorage(ScrubContext& Ctx) +ProjectStore::Project::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetProjectstoreTag()); // Scrubbing needs to check all existing oplogs @@ -3587,7 +3587,7 @@ ProjectStore::Project::ScrubStorage(ScrubContext& Ctx) IterateOplogs([&](const RwLock::SharedLockScope&, Oplog& Ops) { if (!IsExpired(GcClock::TimePoint::min(), Ops)) { - Ops.ScrubStorage(Ctx); + Ops.Scrub(Ctx); } }); } @@ -3832,7 +3832,7 @@ ProjectStore::Project::LastOplogAccessTime(std::string_view Oplog) const ////////////////////////////////////////////////////////////////////////// -ProjectStore::ProjectStore(CidStore& Store, +ProjectStore::ProjectStore(GetCidStoreFunc&& GetCidStore, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue, @@ -3840,7 +3840,7 @@ ProjectStore::ProjectStore(CidStore& Store, const Configuration& Config) : m_Log(logging::Get("project")) , m_Gc(Gc) -, m_CidStore(Store) +, m_GetCidStore(std::move(GetCidStore)) , m_JobQueue(JobQueue) , m_OpenProcessCache(InOpenProcessCache) , m_ProjectBasePath(BasePath) @@ -3973,7 +3973,7 @@ ProjectStore::ScrubStorage(ScrubContext& Ctx) } for (const Ref<Project>& Project : Projects) { - Project->ScrubStorage(Ctx); + Project->Scrub(Ctx); } } @@ -4025,6 +4025,8 @@ ProjectStore::OpenProject(std::string_view ProjectId) } } + CidStore& ChunkStore = m_GetCidStore(ProjectId); + RwLock::ExclusiveLockScope _(m_ProjectsLock); if (auto ProjIt = m_Projects.find(std::string{ProjectId}); ProjIt != m_Projects.end()) { @@ -4041,7 +4043,7 @@ ProjectStore::OpenProject(std::string_view ProjectId) Ref<Project>& Prj = m_Projects - .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) + .try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, ChunkStore, BasePath))) .first->second; Prj->Identifier = ProjectId; Prj->Read(); @@ -4068,12 +4070,14 @@ ProjectStore::NewProject(const std::filesystem::path& BasePath, ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::NewProject"); + CidStore& ChunkStore = m_GetCidStore(ProjectId); + RwLock::ExclusiveLockScope _(m_ProjectsLock); ZEN_INFO("project '{}': creating project at '{}'", ProjectId, BasePath); Ref<Project>& Prj = - m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, m_CidStore, BasePath))) + m_Projects.try_emplace(std::string{ProjectId}, Ref<ProjectStore::Project>(new ProjectStore::Project(this, ChunkStore, BasePath))) .first->second; Prj->Identifier = ProjectId; Prj->RootDir = RootDir; @@ -4802,7 +4806,7 @@ ProjectStore::GetChunk(const std::string_view ProjectId, } const IoHash Hash = IoHash::FromHexString(Cid); - OutChunk = m_CidStore.FindChunkByCid(Hash); + OutChunk = Project->GetCidStore().FindChunkByCid(Hash); if (!OutChunk) { @@ -4865,7 +4869,7 @@ ProjectStore::PutChunk(const std::string_view ProjectId, } FoundLog->CaptureAddedAttachments(std::vector<IoHash>{Hash}); - CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, Hash); + CidStore::InsertResult Result = Project->GetCidStore().AddChunk(Chunk, Hash); return {Result.New ? HttpResponseCode::Created : HttpResponseCode::OK, {}}; } @@ -4894,18 +4898,19 @@ ProjectStore::GetChunks(const std::string_view ProjectId, } Project->TouchOplog(OplogId); + CidStore& ChunkStore = Project->GetCidStore(); + if (RequestObject["chunks"sv].IsArray()) { // Legacy full chunks only by rawhash - CbArrayView ChunksArray = RequestObject["chunks"sv].AsArrayView(); - + CbArrayView ChunksArray = RequestObject["chunks"sv].AsArrayView(); CbObjectWriter ResponseWriter; ResponseWriter.BeginArray("chunks"sv); for (CbFieldView FieldView : ChunksArray) { IoHash RawHash = FieldView.AsHash(); - IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash); + IoBuffer ChunkBuffer = ChunkStore.FindChunkByCid(RawHash); if (ChunkBuffer) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)); @@ -5057,7 +5062,7 @@ ProjectStore::GetChunks(const std::string_view ProjectId, if (ChunkRequest.Input.Id.index() == 0) { const IoHash& ChunkHash = std::get<IoHash>(ChunkRequest.Input.Id); - IoBuffer Payload = m_CidStore.FindChunkByCid(ChunkHash); + IoBuffer Payload = ChunkStore.FindChunkByCid(ChunkHash); if (Payload) { ChunkRequest.Output.Exists = true; @@ -5244,7 +5249,7 @@ ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_vie return {HttpResponseCode::BadRequest, "Invalid payload format"}; } - CidStore& ChunkStore = m_CidStore; + CidStore& ChunkStore = Project->GetCidStore(); RwLock AttachmentsLock; tsl::robin_set<IoHash, IoHash::Hasher> Attachments; @@ -5350,7 +5355,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, } } - CidStore& ChunkStore = m_CidStore; + CidStore& ChunkStore = Project->GetCidStore(); RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer( ChunkStore, @@ -5462,6 +5467,8 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } Project->TouchOplog(OplogId); + CidStore& ChunkStore = Project->GetCidStore(); + if (Method == "import"sv) { if (!AreDiskWritesAllowed()) @@ -5543,7 +5550,7 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } Oplog->CaptureAddedAttachments(WriteRawHashes); - m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); } HttpReq.WriteResponse(HttpResponseCode::OK); return true; @@ -5716,14 +5723,14 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, ResponseObj.EndArray(); } - // Ops that have moved chunks to a compressed buffer for storage in m_CidStore have been rewritten with references to the new - // chunk(s). Make sure we add the chunks to m_CidStore, and do it after we update the oplog so GC doesn't think we have + // Ops that have moved chunks to a compressed buffer for storage in ChunkStore have been rewritten with references to the new + // chunk(s). Make sure we add the chunks to ChunkStore, and do it after we update the oplog so GC doesn't think we have // unreferenced chunks. for (auto It : AddedChunks) { const IoHash& RawHash = It.first; AddedChunk& Chunk = It.second; - CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk.Buffer, RawHash); + CidStore::InsertResult Result = ChunkStore.AddChunk(Chunk.Buffer, RawHash); if (Result.New) { InlinedBytes += Chunk.RawSize; @@ -5786,7 +5793,7 @@ ProjectStore::Export(Ref<ProjectStore::Project> Project, ProjectStore::Oplog& Op EmbedLooseFile, Force, IgnoreMissingAttachments](JobContext& Context) { - RemoteProjectStore::Result Result = SaveOplog(m_CidStore, + RemoteProjectStore::Result Result = SaveOplog(Project->GetCidStore(), *ActualRemoteStore, *Project.Get(), *OplogPtr, @@ -5831,14 +5838,20 @@ ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, } std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); + CidStore& ChunkStore = Project.GetCidStore(); ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description); JobId JobId = m_JobQueue.QueueJob( fmt::format("Import oplog '{}/{}'", Project.Identifier, Oplog.OplogId()), - [this, ActualRemoteStore = std::move(RemoteStore), OplogPtr = &Oplog, Force, IgnoreMissingAttachments, CleanOplog]( - JobContext& Context) { + [this, + ChunkStore = &ChunkStore, + ActualRemoteStore = std::move(RemoteStore), + OplogPtr = &Oplog, + Force, + IgnoreMissingAttachments, + CleanOplog](JobContext& Context) { RemoteProjectStore::Result Result = - LoadOplog(m_CidStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, CleanOplog, &Context); + LoadOplog(*ChunkStore, *ActualRemoteStore, *OplogPtr, Force, IgnoreMissingAttachments, CleanOplog, &Context); auto Response = ConvertResult(Result); ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); if (!IsHttpSuccessCode(Response.first)) @@ -6877,6 +6890,11 @@ namespace testutils { return BuildChunksRequest<IoHash>(SkipData, "RawHash", Chunks, Ranges, ModTags); } + ProjectStore::GetCidStoreFunc SingleChunkStore(CidStore& ChunkStore) + { + return [ChunkStore = &ChunkStore](std::string_view) -> CidStore& { return *ChunkStore; }; + } + } // namespace testutils TEST_CASE("project.opkeys") @@ -6939,13 +6957,18 @@ TEST_CASE("project.store.create") auto JobQueue = MakeJobQueue(1, ""sv); OpenProcessCache ProcessCache; GcManager Gc; - CidStore CidStore(Gc); + CidStore ChunkStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); + ChunkStore.Initialize(CidConfig); std::string_view ProjectName("proj1"sv); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore), + BasePath, + Gc, + *JobQueue, + ProcessCache, + ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -6970,12 +6993,17 @@ TEST_CASE("project.store.lifetimes") auto JobQueue = MakeJobQueue(1, ""sv); OpenProcessCache ProcessCache; GcManager Gc; - CidStore CidStore(Gc); + CidStore ChunkStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); + ChunkStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore), + BasePath, + Gc, + *JobQueue, + ProcessCache, + ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -7033,12 +7061,17 @@ TEST_CASE_TEMPLATE("project.store.export", auto JobQueue = MakeJobQueue(1, ""sv); OpenProcessCache ProcessCache; GcManager Gc; - CidStore CidStore(Gc); + CidStore ChunkStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); + ChunkStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore), + BasePath, + Gc, + *JobQueue, + ProcessCache, + ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; @@ -7072,7 +7105,7 @@ TEST_CASE_TEMPLATE("project.store.export", std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); - RemoteProjectStore::Result ExportResult = SaveOplog(CidStore, + RemoteProjectStore::Result ExportResult = SaveOplog(ChunkStore, *RemoteStore, *Project.Get(), *Oplog, @@ -7089,7 +7122,7 @@ TEST_CASE_TEMPLATE("project.store.export", ProjectStore::Oplog* OplogImport = Project->NewOplog("oplog2", {}); CHECK(OplogImport != nullptr); - RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, + RemoteProjectStore::Result ImportResult = LoadOplog(ChunkStore, *RemoteStore, *OplogImport, /*Force*/ false, @@ -7098,7 +7131,7 @@ TEST_CASE_TEMPLATE("project.store.export", nullptr); CHECK(ImportResult.ErrorCode == 0); - RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, + RemoteProjectStore::Result ImportForceResult = LoadOplog(ChunkStore, *RemoteStore, *OplogImport, /*Force*/ true, @@ -7107,7 +7140,7 @@ TEST_CASE_TEMPLATE("project.store.export", nullptr); CHECK(ImportForceResult.ErrorCode == 0); - RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore, + RemoteProjectStore::Result ImportCleanResult = LoadOplog(ChunkStore, *RemoteStore, *OplogImport, /*Force*/ false, @@ -7116,7 +7149,7 @@ TEST_CASE_TEMPLATE("project.store.export", nullptr); CHECK(ImportCleanResult.ErrorCode == 0); - RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore, + RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(ChunkStore, *RemoteStore, *OplogImport, /*Force*/ true, @@ -7136,12 +7169,17 @@ TEST_CASE("project.store.gc") auto JobQueue = MakeJobQueue(1, ""sv); OpenProcessCache ProcessCache; GcManager Gc; - CidStore CidStore(Gc); + CidStore ChunkStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); + ChunkStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore), + BasePath, + Gc, + *JobQueue, + ProcessCache, + ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; @@ -7337,12 +7375,17 @@ TEST_CASE("project.store.gc.prep") auto JobQueue = MakeJobQueue(1, ""sv); OpenProcessCache ProcessCache; GcManager Gc; - CidStore CidStore(Gc); + CidStore ChunkStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); + ChunkStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore), + BasePath, + Gc, + *JobQueue, + ProcessCache, + ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; @@ -7385,7 +7428,7 @@ TEST_CASE("project.store.gc.prep") // Equivalent of a `prep` existance check call for (auto Attachment : OpAttachments) { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { @@ -7399,7 +7442,7 @@ TEST_CASE("project.store.gc.prep") // If a gc comes in between our prep and op write the chunks will be removed for (auto Attachment : OpAttachments) { - CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + CHECK(!ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { @@ -7419,7 +7462,7 @@ TEST_CASE("project.store.gc.prep") for (auto Attachment : OpAttachments) { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { @@ -7433,7 +7476,7 @@ TEST_CASE("project.store.gc.prep") // Attachments should now be retained for (auto Attachment : OpAttachments) { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { @@ -7447,7 +7490,7 @@ TEST_CASE("project.store.gc.prep") // Attachments should now be retained across multiple GCs if retain time is still valud for (auto Attachment : OpAttachments) { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { @@ -7459,7 +7502,7 @@ TEST_CASE("project.store.gc.prep") } for (auto Attachment : OpAttachments) { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { Ref<ProjectStore::Project> Project1 = ProjectStore.OpenProject("proj1"sv); @@ -7476,7 +7519,7 @@ TEST_CASE("project.store.gc.prep") for (auto Attachment : OpAttachments) { - CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + CHECK(!ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash())); } { @@ -7509,7 +7552,7 @@ TEST_CASE("project.store.gc.prep") } for (auto Attachment : OpAttachments) { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash())); } Sleep(200); @@ -7524,7 +7567,7 @@ TEST_CASE("project.store.gc.prep") } for (auto Attachment : OpAttachments) { - CHECK(CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + CHECK(ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash())); } // This pass the retention time has expired and the last GC pass cleared the entries @@ -7538,7 +7581,7 @@ TEST_CASE("project.store.gc.prep") for (auto Attachment : OpAttachments) { - CHECK(!CidStore.ContainsChunk(Attachment.second.DecodeRawHash())); + CHECK(!ChunkStore.ContainsChunk(Attachment.second.DecodeRawHash())); } } @@ -7552,12 +7595,17 @@ TEST_CASE("project.store.rpc.getchunks") auto JobQueue = MakeJobQueue(1, ""sv); OpenProcessCache ProcessCache; GcManager Gc; - CidStore CidStore(Gc); + CidStore ChunkStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); + ChunkStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore), + BasePath, + Gc, + *JobQueue, + ProcessCache, + ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; @@ -8474,12 +8522,17 @@ TEST_CASE("project.store.partial.read") auto JobQueue = MakeJobQueue(1, ""sv); OpenProcessCache ProcessCache; GcManager Gc; - CidStore CidStore(Gc); + CidStore ChunkStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); + ChunkStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore), + BasePath, + Gc, + *JobQueue, + ProcessCache, + ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "engine"sv; @@ -8652,12 +8705,17 @@ TEST_CASE("project.store.iterateoplog") auto JobQueue = MakeJobQueue(1, ""sv); OpenProcessCache ProcessCache; GcManager Gc; - CidStore CidStore(Gc); + CidStore ChunkStore(Gc); CidStoreConfiguration CidConfig = {.RootDirectory = TempDir.Path() / "cas"sv, .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; - CidStore.Initialize(CidConfig); + ChunkStore.Initialize(CidConfig); std::filesystem::path BasePath = TempDir.Path() / "projectstore"sv; - ProjectStore ProjectStore(CidStore, BasePath, Gc, *JobQueue, ProcessCache, ProjectStore::Configuration{}); + ProjectStore ProjectStore(testutils::SingleChunkStore(ChunkStore), + BasePath, + Gc, + *JobQueue, + ProcessCache, + ProjectStore::Configuration{}); std::filesystem::path RootDir = TempDir.Path() / "root"sv; std::filesystem::path EngineRootDir = TempDir.Path() / "enginesv"; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 368da5ea4..eb27665f9 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -68,7 +68,9 @@ public: { }; - ProjectStore(CidStore& Store, + typedef std::function<CidStore&(std::string_view Context)> GetCidStoreFunc; + + ProjectStore(GetCidStoreFunc&& GetCidStore, std::filesystem::path BasePath, GcManager& Gc, JobQueue& JobQueue, @@ -156,7 +158,7 @@ public: LoggerRef Log() { return m_OuterProject->Log(); } void Flush(); - void ScrubStorage(ScrubContext& Ctx); + void Scrub(ScrubContext& Ctx); static uint64_t TotalSize(const std::filesystem::path& BasePath); uint64_t TotalSize() const; @@ -326,11 +328,13 @@ public: Project(ProjectStore* PrjStore, CidStore& Store, std::filesystem::path BasePath); virtual ~Project(); + CidStore& GetCidStore() { return m_CidStore; }; + void Read(); void Write(); [[nodiscard]] static bool Exists(const std::filesystem::path& BasePath); void Flush(); - void ScrubStorage(ScrubContext& Ctx); + void Scrub(ScrubContext& Ctx); LoggerRef Log() const; static uint64_t TotalSize(const std::filesystem::path& BasePath); uint64_t TotalSize() const; @@ -405,6 +409,7 @@ public: LoggerRef Log() { return m_Log; } const std::filesystem::path& BasePath() const { return m_ProjectBasePath; } + // GcStorage virtual void ScrubStorage(ScrubContext& Ctx) override; virtual GcStorageSize StorageSize() const override; @@ -498,7 +503,7 @@ public: private: LoggerRef m_Log; GcManager& m_Gc; - CidStore& m_CidStore; + GetCidStoreFunc m_GetCidStore; JobQueue& m_JobQueue; OpenProcessCache& m_OpenProcessCache; std::filesystem::path m_ProjectBasePath; diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index 744b861dd..a1c460bc0 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -1475,12 +1475,17 @@ namespace detail { class UpstreamCacheImpl final : public UpstreamCache { + struct EnqueuedRequest + { + UpstreamCacheRecord Record; + std::function<IoBuffer(const IoHash& ChunkHash)> GetValueFunc; + }; + public: - UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) + UpstreamCacheImpl(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore) : m_Log(logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) - , m_CidStore(CidStore) { } @@ -1836,17 +1841,17 @@ public: } } - virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) override { if (m_RunState.IsRunning && m_Options.WriteUpstream && m_Endpoints.size() > 0) { if (!m_UpstreamThreads.empty()) { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + m_UpstreamQueue.Enqueue(EnqueuedRequest{.Record = std::move(CacheRecord), .GetValueFunc = GetValueFunc}); } else { - ProcessCacheRecord(std::move(CacheRecord)); + ProcessCacheRecord(std::move(CacheRecord), std::move(GetValueFunc)); } } } @@ -1900,7 +1905,7 @@ public: } private: - void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) + void ProcessCacheRecord(const UpstreamCacheRecord& CacheRecord, std::function<IoBuffer(const IoHash& ChunkHash)>&& GetValueFunc) { ZEN_TRACE_CPU("Upstream::ProcessCacheRecord"); @@ -1918,7 +1923,7 @@ private: for (const IoHash& ValueContentId : CacheRecord.ValueContentIds) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(ValueContentId)) + if (IoBuffer Payload = GetValueFunc(ValueContentId)) { Payloads.push_back(Payload); } @@ -1970,19 +1975,19 @@ private: for (;;) { - UpstreamCacheRecord CacheRecord; - if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) + EnqueuedRequest Request; + if (m_UpstreamQueue.WaitAndDequeue(Request)) { try { - ProcessCacheRecord(std::move(CacheRecord)); + ProcessCacheRecord(Request.Record, std::move(Request.GetValueFunc)); } catch (const std::exception& Err) { ZEN_ERROR("upload cache record '{}/{}/{}' FAILED, reason '{}'", - CacheRecord.Namespace, - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, + Request.Record.Namespace, + Request.Record.Key.Bucket, + Request.Record.Key.Hash, Err.what()); } } @@ -2076,7 +2081,7 @@ private: LoggerRef Log() { return m_Log; } - using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>; + using UpstreamQueue = BlockingQueue<EnqueuedRequest>; struct RunState { @@ -2102,7 +2107,6 @@ private: LoggerRef m_Log; UpstreamCacheOptions m_Options; ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; std::shared_mutex m_EndpointsMutex; std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; @@ -2126,9 +2130,9 @@ UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, con } std::unique_ptr<UpstreamCache> -CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) +CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore) { - return std::make_unique<UpstreamCacheImpl>(Options, CacheStore, CidStore); + return std::make_unique<UpstreamCacheImpl>(Options, CacheStore); } } // namespace zen diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h index 26e5decac..e4b9a73ad 100644 --- a/src/zenserver/upstream/upstreamcache.h +++ b/src/zenserver/upstream/upstreamcache.h @@ -24,7 +24,6 @@ class AuthMgr; class CbObjectView; class CbPackage; class CbObjectWriter; -class CidStore; class ZenCacheStore; struct JupiterClientOptions; class JupiterAccessTokenProvider; @@ -162,6 +161,6 @@ struct UpstreamCacheOptions bool WriteUpstream = true; }; -std::unique_ptr<UpstreamCache> CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); +std::unique_ptr<UpstreamCache> CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore); } // namespace zen diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index ba1413819..5cab54acc 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -239,18 +239,18 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen CidStoreConfiguration Config; Config.RootDirectory = m_DataRoot / "cas"; - m_CidStore = std::make_unique<CidStore>(m_GcManager); - m_CidStore->Initialize(Config); + m_CidStores.insert_or_assign({}, std::make_unique<CidStore>(m_GcManager)); + m_CidStores.at({})->Initialize(Config); ZEN_INFO("instantiating project service"); - m_ProjectStore = new ProjectStore(*m_CidStore, + m_ProjectStore = new ProjectStore([this](std::string_view) -> CidStore& { return *m_CidStores.at({}).get(); }, m_DataRoot / "projects", m_GcManager, *m_JobQueue, *m_OpenProcessCache, ProjectStore::Configuration{}); - m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatusService, m_StatsService, *m_AuthMgr}); + m_HttpProjectService.reset(new HttpProjectService(m_ProjectStore, m_StatusService, m_StatsService, *m_AuthMgr)); if (ServerOptions.WorksSpacesConfig.Enabled) { @@ -362,17 +362,15 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_GcScheduler.Initialize(GcConfig); // Create and register admin interface last to make sure all is properly initialized - m_AdminService = - std::make_unique<HttpAdminService>(m_GcScheduler, - *m_JobQueue, - m_CacheStore.Get(), - m_CidStore.get(), - m_ProjectStore, - m_BuildStore.get(), - HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile, - .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log", - .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"}, - ServerOptions); + m_AdminService = std::make_unique<HttpAdminService>( + m_GcScheduler, + *m_JobQueue, + m_CacheStore.Get(), + [this]() { Flush(); }, + HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile, + .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log", + .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"}, + ServerOptions); m_Http->RegisterService(*m_AdminService); return EffectiveBasePort; @@ -595,7 +593,7 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); } - m_UpstreamCache = CreateUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); + m_UpstreamCache = CreateUpstreamCache(UpstreamOptions, *m_CacheStore); m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache, *m_AuthMgr); m_UpstreamCache->Initialize(); @@ -659,19 +657,23 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) } } - m_StructuredCacheService = std::make_unique<HttpStructuredCacheService>(*m_CacheStore, - *m_CidStore, - m_StatsService, - m_StatusService, - *m_UpstreamCache, - m_GcManager.GetDiskWriteBlocker(), - *m_OpenProcessCache); + m_StructuredCacheService = std::make_unique<HttpStructuredCacheService>( + *m_CacheStore, + [this](std::string_view) -> CidStore& { return *m_CidStores.at({}).get(); }, + m_StatsService, + m_StatusService, + *m_UpstreamCache, + m_GcManager.GetDiskWriteBlocker(), + *m_OpenProcessCache); m_Http->RegisterService(*m_StructuredCacheService); m_Http->RegisterService(*m_UpstreamService); m_StatsReporter.AddProvider(m_CacheStore.Get()); - m_StatsReporter.AddProvider(m_CidStore.get()); + for (const auto& It : m_CidStores) + { + m_StatsReporter.AddProvider(It.second.get()); + } m_StatsReporter.AddProvider(m_BuildCidStore.get()); } @@ -857,7 +859,7 @@ ZenServer::Cleanup() m_Workspaces.reset(); m_HttpProjectService.reset(); m_ProjectStore = {}; - m_CidStore.reset(); + m_CidStores.clear(); m_AuthService.reset(); m_AuthMgr.reset(); m_Http = {}; @@ -1038,40 +1040,12 @@ ZenServer::CheckOwnerPid() } void -ZenServer::ScrubStorage() -{ - Stopwatch Timer; - ZEN_INFO("Storage validation STARTING"); - - WorkerThreadPool ThreadPool{1, "Scrub"}; - ScrubContext Ctx{ThreadPool}; - - if (m_CidStore) - m_CidStore->ScrubStorage(Ctx); - - if (m_ProjectStore) - m_ProjectStore->ScrubStorage(Ctx); - - if (m_StructuredCacheService) - m_StructuredCacheService->ScrubStorage(Ctx); - - if (m_BuildCidStore) - m_BuildCidStore->ScrubStorage(Ctx); - - const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); - - ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})", - NiceTimeSpanMs(ElapsedTimeMs), - NiceBytes(Ctx.ScrubbedBytes()), - Ctx.ScrubbedChunks(), - NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); -} - -void ZenServer::Flush() { - if (m_CidStore) - m_CidStore->Flush(); + for (auto& It : m_CidStores) + { + It.second->Flush(); + } if (m_StructuredCacheService) m_StructuredCacheService->Flush(); diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index 0dcab6ec4..0a17446ae 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -82,7 +82,6 @@ public: void CheckStateExitFlag(); void CheckOwnerPid(); bool UpdateProcessMonitor(); - void ScrubStorage(); void Flush(); virtual void HandleStatusRequest(HttpServerRequest& Request) override; @@ -116,20 +115,20 @@ private: inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; } static std::string_view ToString(ServerState Value); - StatsReporter m_StatsReporter; - Ref<HttpServer> m_Http; - std::unique_ptr<AuthMgr> m_AuthMgr; - std::unique_ptr<HttpAuthService> m_AuthService; - HttpStatusService m_StatusService; - HttpStatsService m_StatsService; - GcManager m_GcManager; - GcScheduler m_GcScheduler{m_GcManager}; - std::unique_ptr<CidStore> m_CidStore; - Ref<ZenCacheStore> m_CacheStore; - std::unique_ptr<OpenProcessCache> m_OpenProcessCache; - HttpTestService m_TestService; - std::unique_ptr<CidStore> m_BuildCidStore; - std::unique_ptr<BuildStore> m_BuildStore; + StatsReporter m_StatsReporter; + Ref<HttpServer> m_Http; + std::unique_ptr<AuthMgr> m_AuthMgr; + std::unique_ptr<HttpAuthService> m_AuthService; + HttpStatusService m_StatusService; + HttpStatsService m_StatsService; + GcManager m_GcManager; + GcScheduler m_GcScheduler{m_GcManager}; + tsl::robin_map<std::string, std::unique_ptr<CidStore>> m_CidStores; + Ref<ZenCacheStore> m_CacheStore; + std::unique_ptr<OpenProcessCache> m_OpenProcessCache; + HttpTestService m_TestService; + std::unique_ptr<CidStore> m_BuildCidStore; + std::unique_ptr<BuildStore> m_BuildStore; #if ZEN_WITH_TESTS HttpTestingService m_TestingService; diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 219caca01..cacbbd966 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -798,6 +798,7 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, IoStoreDDCOverrideSize); } m_Gc.AddGcReferencer(*this); + m_Gc.AddGcStorage(this); } ZenCacheDiskLayer::CacheBucket::~CacheBucket() @@ -812,6 +813,7 @@ ZenCacheDiskLayer::CacheBucket::~CacheBucket() { ZEN_ERROR("~CacheBucket() failed with: ", Ex.what()); } + m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcReferencer(*this); } @@ -2587,7 +2589,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { - GcStorageSize Size = StorageSize(); + CacheStoreSize Size = TotalSize(); return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize, .DiskHitCount = m_DiskHitCount, @@ -4417,8 +4419,9 @@ ZenCacheDiskLayer::Flush() } } +#if ZEN_WITH_TESTS void -ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) +ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::ScrubStorage"); @@ -4429,13 +4432,13 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) for (auto& Kv : m_Buckets) { -#if 1 +# if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( std::packaged_task<void()>{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); -#else +# else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); -#endif +# endif } for (auto& Result : Results) @@ -4451,16 +4454,17 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) } } } +#endif // ZEN_WITH_TESTS -GcStorageSize -ZenCacheDiskLayer::StorageSize() const +CacheStoreSize +ZenCacheDiskLayer::TotalSize() const { - GcStorageSize StorageSize{}; + CacheStoreSize StorageSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { - GcStorageSize BucketSize = Kv.second->StorageSize(); + CacheStoreSize BucketSize = Kv.second->TotalSize(); StorageSize.DiskSize += BucketSize.DiskSize; StorageSize.MemorySize += BucketSize.MemorySize; } @@ -4471,7 +4475,7 @@ ZenCacheDiskLayer::StorageSize() const ZenCacheDiskLayer::DiskStats ZenCacheDiskLayer::Stats() const { - GcStorageSize Size = StorageSize(); + CacheStoreSize Size = TotalSize(); ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; { RwLock::SharedLockScope _(m_Lock); @@ -4495,7 +4499,7 @@ ZenCacheDiskLayer::GetInfo() const { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); - GcStorageSize BucketSize = Kv.second->StorageSize(); + CacheStoreSize BucketSize = Kv.second->TotalSize(); Info.StorageSize.DiskSize += BucketSize.DiskSize; Info.StorageSize.MemorySize += BucketSize.MemorySize; } @@ -4510,7 +4514,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { - return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; + return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->TotalSize()}; } return {}; } diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 5d9a68919..ff21d1ede 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -153,13 +153,13 @@ CacheRpcHandler::CacheRpcHandler(LoggerRef InLog, CacheStats& InCacheStats, UpstreamCacheClient& InUpstreamCache, ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& InGetCidStore, const DiskWriteBlocker* InDiskWriteBlocker) : m_Log(InLog) , m_CacheStats(InCacheStats) , m_UpstreamCache(InUpstreamCache) , m_CacheStore(InCacheStore) -, m_CidStore(InCidStore) +, m_GetCidStore(std::move(InGetCidStore)) , m_DiskWriteBlocker(InDiskWriteBlocker) { } @@ -174,6 +174,12 @@ CacheRpcHandler::AreDiskWritesAllowed() const return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } +CidStore& +CacheRpcHandler::GetCidStore(std::string_view Namespace) +{ + return m_GetCidStore(Namespace); +} + CacheRpcHandler::RpcResponseCode CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, std::string_view UriNamespace, @@ -381,9 +387,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Stopwatch Timer; + CidStore& ChunkStore = m_GetCidStore(Request.Namespace); + Request.RecordObject.IterateAttachments([this, &Request, Package, + &ChunkStore, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, @@ -412,7 +421,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Count.Invalid++; } } - else if (m_CidStore.ContainsChunk(ValueHash)) + else if (ChunkStore.ContainsChunk(ValueHash)) { ValidAttachments.emplace_back(ValueHash); Count.Valid++; @@ -448,7 +457,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (!WriteAttachmentBuffers.empty()) { - std::vector<CidStore::InsertResult> InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + std::vector<CidStore::InsertResult> InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (size_t Index = 0; Index < InsertResults.size(); Index++) { if (InsertResults[Index].New) @@ -475,10 +484,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Request.Namespace, - .Key = Request.Key, - .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, + .Namespace = Request.Namespace, + .Key = Request.Key, + .ValueContentIds = std::move(ValidAttachments)}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } return PutStatus::Success; } @@ -521,6 +532,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb return CbPackage{}; } + CidStore& ChunkStore = m_GetCidStore(Namespace.value()); + const bool HasUpstream = m_UpstreamCache.IsActive(); eastl::fixed_vector<RecordRequestData, 16> Requests; @@ -620,7 +633,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { - if (m_CidStore.ContainsChunk(Value.ContentId)) + if (ChunkStore.ContainsChunk(Value.ContentId)) { Value.Exists = true; } @@ -641,7 +654,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } else { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) + if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Value.ContentId)) { if (Chunk.GetSize() > 0) { @@ -665,7 +678,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } if (!RequestValueIndexes.empty()) { - m_CidStore.IterateChunks( + ChunkStore.IterateChunks( CidHashes, [this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { try @@ -758,7 +771,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } } - const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &ChunkStore, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -827,7 +840,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Value.Exists = true; if (StoreLocal) { - m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); + ChunkStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { @@ -944,6 +957,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + CidStore& ChunkStore = m_GetCidStore(Namespace.value()); + std::vector<ZenCacheStore::PutResult> BatchResults; eastl::fixed_vector<size_t, 32> BatchResultIndexes; eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results; @@ -1094,7 +1109,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty) { m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); + {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } } { @@ -1546,6 +1562,8 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); + CidStore& ChunkStore = m_GetCidStore(Namespace); + // TODO: BatchGet records? std::vector<CacheKeyRequest*> UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) @@ -1684,12 +1702,12 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) { - if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) + if (ChunkStore.ContainsChunk(Request->Key->ChunkId)) { Request->Exists = true; } } - else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) + else if (IoBuffer Payload = ChunkStore.FindChunkByCid(Request->Key->ChunkId)) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { @@ -1822,6 +1840,8 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, return; } + CidStore& ChunkStore = m_GetCidStore(Namespace); + CacheChunkRequest& Key = Params.Request; size_t RequestIndex = std::distance(RequestKeys.data(), &Key); ChunkRequest& Request = Requests[RequestIndex]; @@ -1841,7 +1861,7 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal); if (Request.IsRecordRequest) { - m_CidStore.AddChunk(Params.Value, Params.RawHash); + ChunkStore.AddChunk(Params.Value, Params.RawHash); } else { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 973af52b2..1f2d6c37f 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -139,13 +139,10 @@ ZenCacheNamespace::ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const st CreateDirectories(m_RootDir); m_DiskLayer.DiscoverBuckets(); - - m_Gc.AddGcStorage(this); } ZenCacheNamespace::~ZenCacheNamespace() { - m_Gc.RemoveGcStorage(this); } struct ZenCacheNamespace::PutBatchHandle @@ -302,7 +299,6 @@ ZenCacheNamespace::EnumerateBucketContents(std::string_view std::function<void()> ZenCacheNamespace::Drop() { - m_Gc.RemoveGcStorage(this); return m_DiskLayer.Drop(); } @@ -312,25 +308,19 @@ ZenCacheNamespace::Flush() m_DiskLayer.Flush(); } +#if ZEN_WITH_TESTS void -ZenCacheNamespace::ScrubStorage(ScrubContext& Ctx) +ZenCacheNamespace::Scrub(ScrubContext& Ctx) { - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - ZEN_INFO("scrubbing '{}'", m_RootDir); - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_DiskLayer.ScrubStorage(Ctx); + m_DiskLayer.Scrub(Ctx); } +#endif // ZEN_WITH_TESTS -GcStorageSize -ZenCacheNamespace::StorageSize() const +CacheStoreSize +ZenCacheNamespace::TotalSize() const { - return m_DiskLayer.StorageSize(); + return m_DiskLayer.TotalSize(); } ZenCacheNamespace::Info @@ -836,11 +826,13 @@ ZenCacheStore::Flush() IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Flush(); }); } +#if ZEN_WITH_TESTS void -ZenCacheStore::ScrubStorage(ScrubContext& Ctx) +ZenCacheStore::Scrub(ScrubContext& Ctx) { - IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.ScrubStorage(Ctx); }); + IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { Store.Scrub(Ctx); }); } +#endif // ZEN_WITH_TESTS CacheValueDetails ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter, @@ -965,12 +957,12 @@ ZenCacheStore::IterateNamespaces(const std::function<void(std::string_view Names } } -GcStorageSize -ZenCacheStore::StorageSize() const +CacheStoreSize +ZenCacheStore::TotalSize() const { - GcStorageSize Size; + CacheStoreSize Size; IterateNamespaces([&](std::string_view, ZenCacheNamespace& Store) { - GcStorageSize StoreSize = Store.StorageSize(); + CacheStoreSize StoreSize = Store.TotalSize(); Size.MemorySize += StoreSize.MemorySize; Size.DiskSize += StoreSize.DiskSize; }); @@ -1040,7 +1032,7 @@ ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig) ZenCacheStore::Info ZenCacheStore::GetInfo() const { - ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = StorageSize()}; + ZenCacheStore::Info Info = {.Config = m_Configuration, .StorageSize = TotalSize()}; IterateNamespaces([&Info](std::string_view NamespaceName, ZenCacheNamespace& Namespace) { Info.NamespaceNames.push_back(std::string(NamespaceName)); @@ -1428,7 +1420,7 @@ TEST_CASE("cachestore.size") const size_t Count = 16; ScopedTemporaryDirectory TempDir; - GcStorageSize CacheSize; + CacheStoreSize CacheSize; { GcManager Gc; @@ -1449,7 +1441,7 @@ TEST_CASE("cachestore.size") Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false); Keys.push_back({BucketName, Hash}); } - CacheSize = Zcs.StorageSize(); + CacheSize = Zcs.TotalSize(); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_EQ(0, CacheSize.MemorySize); @@ -1459,7 +1451,7 @@ TEST_CASE("cachestore.size") Zcs.Get(Key.first, Key.second, _); } - CacheSize = Zcs.StorageSize(); + CacheSize = Zcs.TotalSize(); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.MemorySize); } @@ -1468,7 +1460,7 @@ TEST_CASE("cachestore.size") GcManager Gc; ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - const GcStorageSize SerializedSize = Zcs.StorageSize(); + const CacheStoreSize SerializedSize = Zcs.TotalSize(); CHECK_EQ(SerializedSize.MemorySize, 0); CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); @@ -1476,8 +1468,8 @@ TEST_CASE("cachestore.size") { Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); } - CHECK_EQ(0, Zcs.StorageSize().DiskSize); - CHECK_EQ(0, Zcs.StorageSize().MemorySize); + CHECK_EQ(0, Zcs.TotalSize().DiskSize); + CHECK_EQ(0, Zcs.TotalSize().MemorySize); } } @@ -1486,7 +1478,7 @@ TEST_CASE("cachestore.size") const size_t Count = 16; ScopedTemporaryDirectory TempDir; - GcStorageSize CacheSize; + CacheStoreSize CacheSize; { GcManager Gc; @@ -1503,7 +1495,7 @@ TEST_CASE("cachestore.size") Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false); } - CacheSize = Zcs.StorageSize(); + CacheSize = Zcs.TotalSize(); CHECK_LE(CacheValue.GetSize() * Count, CacheSize.DiskSize); CHECK_EQ(0, CacheSize.MemorySize); } @@ -1512,7 +1504,7 @@ TEST_CASE("cachestore.size") GcManager Gc; ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path() / "cache", {}); - const GcStorageSize SerializedSize = Zcs.StorageSize(); + const CacheStoreSize SerializedSize = Zcs.TotalSize(); CHECK_EQ(SerializedSize.MemorySize, 0); CHECK_LE(SerializedSize.DiskSize, CacheSize.DiskSize); @@ -1520,7 +1512,7 @@ TEST_CASE("cachestore.size") { Zcs.DropBucket(fmt::format("test_bucket-{}", Bucket)); } - CHECK_EQ(0, Zcs.StorageSize().DiskSize); + CHECK_EQ(0, Zcs.TotalSize().DiskSize); } } } @@ -1613,7 +1605,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) GcChunkHashes.swap(RemainingChunkHashes); }; - const uint64_t TotalSize = Zcs.StorageSize().DiskSize; + const uint64_t TotalSize = Zcs.TotalSize().DiskSize; CHECK_LE(kChunkSize * Chunks.size(), TotalSize); { @@ -1971,8 +1963,6 @@ TEST_CASE("cachestore.blocked.disklayer.put") { ScopedTemporaryDirectory TempDir; - GcStorageSize CacheSize; - const auto CreateCacheValue = [](size_t Size) -> CbObject { std::vector<uint8_t> Buf; Buf.resize(Size, Size & 0xff); @@ -2108,8 +2098,8 @@ TEST_CASE("cachestore.scrub") WorkerThreadPool ThreadPool{1}; ScrubContext ScrubCtx{ThreadPool}; - Zcs.ScrubStorage(ScrubCtx); - CidStore.ScrubStorage(ScrubCtx); + Zcs.Scrub(ScrubCtx); + CidStore.Scrub(ScrubCtx); CHECK(ScrubCtx.ScrubbedChunks() == (StructuredCids.size() + StructuredCids.size() * AttachmentSizes.size()) + UnstructuredCids.size()); CHECK(ScrubCtx.BadCids().GetSize() == 0); } diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index 5879a6742..6b89beb3d 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -73,9 +73,12 @@ public: WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) override; virtual void Flush() override; - virtual void ScrubStorage(ScrubContext& Ctx) override; virtual CidStoreSize TotalSize() const override; +#if ZEN_WITH_TESTS + virtual void Scrub(ScrubContext& Ctx) override; +#endif // ZEN_WITH_TESTS + private: CasContainerStrategy m_TinyStrategy; CasContainerStrategy m_SmallStrategy; @@ -463,24 +466,19 @@ CasImpl::Flush() m_LargeStrategy.Flush(); } +#if ZEN_WITH_TESTS void -CasImpl::ScrubStorage(ScrubContext& Ctx) +CasImpl::Scrub(ScrubContext& Ctx) { ZEN_MEMSCOPE(GetCasTag()); ZEN_TRACE_CPU("Cas::ScrubStorage"); - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - - m_LastScrubTime = Ctx.ScrubTimestamp(); - m_SmallStrategy.ScrubStorage(Ctx); m_TinyStrategy.ScrubStorage(Ctx); m_LargeStrategy.ScrubStorage(Ctx); } +#endif // ZEN_WITH_TESTS CidStoreSize CasImpl::TotalSize() const @@ -523,7 +521,7 @@ TEST_CASE("CasStore") WorkerThreadPool ThreadPool{1}; ScrubContext Ctx{ThreadPool}; - Store->ScrubStorage(Ctx); + Store->Scrub(Ctx); IoBuffer Value1{16}; memcpy(Value1.MutableData(), "1234567890123456", 16); diff --git a/src/zenstore/cas.h b/src/zenstore/cas.h index e279dd2cc..0f6e2ba9d 100644 --- a/src/zenstore/cas.h +++ b/src/zenstore/cas.h @@ -50,12 +50,13 @@ public: WorkerThreadPool* OptionalWorkerPool, uint64_t LargeSizeLimit) = 0; virtual void Flush() = 0; - virtual void ScrubStorage(ScrubContext& Ctx) = 0; virtual CidStoreSize TotalSize() const = 0; +#if ZEN_WITH_TESTS + virtual void Scrub(ScrubContext& Ctx) = 0; +#endif // ZEN_WITH_TESTS protected: CidStoreConfiguration m_Config; - uint64_t m_LastScrubTime = 0; }; ZENCORE_API std::unique_ptr<CasStore> CreateCasStore(GcManager& Gc); diff --git a/src/zenstore/cidstore.cpp b/src/zenstore/cidstore.cpp index 2ab769d04..ae1b59dc0 100644 --- a/src/zenstore/cidstore.cpp +++ b/src/zenstore/cidstore.cpp @@ -127,17 +127,9 @@ struct CidStore::Impl void Flush() { m_CasStore.Flush(); } - void ScrubStorage(ScrubContext& Ctx) - { - if (Ctx.ScrubTimestamp() == m_LastScrubTime) - { - return; - } - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_CasStore.ScrubStorage(Ctx); - } +#if ZEN_WITH_TESTS + void Scrub(ScrubContext& Ctx) { m_CasStore.Scrub(Ctx); } +#endif // ZEN_WITH_TESTS CidStoreStats Stats() { @@ -236,11 +228,13 @@ CidStore::Flush() m_Impl->Flush(); } +#if ZEN_WITH_TESTS void -CidStore::ScrubStorage(ScrubContext& Ctx) +CidStore::Scrub(ScrubContext& Ctx) { - m_Impl->ScrubStorage(Ctx); + m_Impl->Scrub(Ctx); } +#endif // ZEN_WITH_TESTS CidStoreSize CidStore::TotalSize() const diff --git a/src/zenstore/compactcas.h b/src/zenstore/compactcas.h index 15e4cbf81..32c256a42 100644 --- a/src/zenstore/compactcas.h +++ b/src/zenstore/compactcas.h @@ -68,10 +68,10 @@ struct CasContainerStrategy final : public GcStorage, public GcReferenceStore void Flush(); // GcStorage - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; virtual GcStorageSize StorageSize() const override; + // GcReferenceStore virtual std::string GetGcName(GcCtx& Ctx) override; virtual GcReferencePruner* CreateReferencePruner(GcCtx& Ctx, GcReferenceStoreStats& Stats) override; diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index ae5b2014d..49c52f847 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -106,6 +106,12 @@ static_assert(sizeof(DiskIndexEntry) == 32); ////////////////////////////////////////////////////////////////////////// +struct CacheStoreSize +{ + uint64_t DiskSize = 0; + uint64_t MemorySize = 0; +}; + class ZenCacheDiskLayer { public: @@ -131,8 +137,8 @@ public: struct BucketInfo { - uint64_t EntryCount = 0; - GcStorageSize StorageSize; + uint64_t EntryCount = 0; + CacheStoreSize StorageSize; }; struct Info @@ -141,7 +147,7 @@ public: Configuration Config; std::vector<std::string> BucketNames; uint64_t EntryCount = 0; - GcStorageSize StorageSize; + CacheStoreSize StorageSize; }; struct BucketStats @@ -199,11 +205,10 @@ public: std::function<void()> Drop(); std::function<void()> DropBucket(std::string_view Bucket); void Flush(); - void ScrubStorage(ScrubContext& Ctx); - void DiscoverBuckets(); - GcStorageSize StorageSize() const; - DiskStats Stats() const; + void DiscoverBuckets(); + CacheStoreSize TotalSize() const; + DiskStats Stats() const; Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; @@ -220,6 +225,7 @@ public: #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); + void Scrub(ScrubContext& Ctx); #endif // ZEN_WITH_TESTS bool GetContentStats(std::string_view BucketName, CacheContentStats& OutContentStats) const; @@ -227,7 +233,7 @@ public: /** A cache bucket manages a single directory containing metadata and data for that bucket */ - struct CacheBucket : public GcReferencer + struct CacheBucket : public GcReferencer, public GcStorage { CacheBucket(GcManager& Gc, std::atomic_uint64_t& OuterCacheMemoryUsage, @@ -244,17 +250,22 @@ public: void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult); - void EndPutBatch(PutBatchHandle* Batch) noexcept; - PutResult Put(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - bool Overwrite, - PutBatchHandle* OptionalBatchHandle); - uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); - std::function<void()> Drop(); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); + PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult); + void EndPutBatch(PutBatchHandle* Batch) noexcept; + PutResult Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + std::function<void()> Drop(); + void Flush(); + inline CacheStoreSize TotalSize() const + { + return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), + .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; + } + RwLock::SharedLockScope GetGcReferencerLock(); struct ReferencesStats @@ -277,11 +288,6 @@ public: std::span<const std::size_t> ChunkIndexes, std::vector<IoHash>& OutReferences) const; - inline GcStorageSize StorageSize() const - { - return {.DiskSize = m_StandaloneSize.load(std::memory_order::relaxed) + m_BlockStore.TotalSize(), - .MemorySize = m_MemCachedSize.load(std::memory_order::relaxed)}; - } uint64_t EntryCount() const; BucketStats Stats(); @@ -293,6 +299,20 @@ public: void SetAccessTime(const IoHash& HashKey, GcClock::TimePoint Time); #endif // ZEN_WITH_TESTS + // GcReferencer + virtual std::string GetGcName(GcCtx& Ctx) override; + virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; + virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; + virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; + + // GcStorage + virtual void ScrubStorage(ScrubContext& Ctx) override; + virtual GcStorageSize StorageSize() const override + { + CacheStoreSize Size = TotalSize(); + return {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; + } + private: #pragma pack(push) #pragma pack(1) @@ -391,11 +411,6 @@ public: std::atomic_uint64_t m_StandaloneSize{}; std::atomic_uint64_t m_MemCachedSize{}; - virtual std::string GetGcName(GcCtx& Ctx) override; - virtual GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override; - virtual std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override; - virtual std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override; - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; bool ShouldRejectPut(const IoHash& HashKey, ZenCacheValue& InOutValue, bool Overwrite, ZenCacheDiskLayer::PutResult& OutPutResult); void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References); diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index 104746aba..80340d72c 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -70,11 +70,13 @@ IsCompressedBinary(ZenContentType Type) struct CacheRpcHandler { + typedef std::function<CidStore&(std::string_view Context)> GetCidStoreFunc; + CacheRpcHandler(LoggerRef InLog, CacheStats& InCacheStats, UpstreamCacheClient& InUpstreamCache, ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& InGetCidStore, const DiskWriteBlocker* InDiskWriteBlocker); ~CacheRpcHandler(); @@ -94,6 +96,8 @@ struct CacheRpcHandler int& OutTargetProcessId, CbPackage& OutPackage); + CidStore& GetCidStore(std::string_view Namespace); + private: CbPackage HandleRpcPutCacheRecords(const CacheRequestContext& Context, const CbPackage& BatchRequest); CbPackage HandleRpcGetCacheRecords(const CacheRequestContext& Context, CbObjectView BatchRequest); @@ -142,7 +146,7 @@ private: CacheStats& m_CacheStats; UpstreamCacheClient& m_UpstreamCache; ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; + GetCidStoreFunc m_GetCidStore; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; bool AreDiskWritesAllowed() const; diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index b6e8e7565..c51d7312c 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -49,7 +49,7 @@ class JobQueue; */ -class ZenCacheNamespace final : public GcStorage +class ZenCacheNamespace final { public: struct Configuration @@ -107,10 +107,7 @@ public: std::function<void()> Drop(); void Flush(); - // GcStorage - virtual void ScrubStorage(ScrubContext& ScrubCtx) override; - virtual GcStorageSize StorageSize() const override; - + CacheStoreSize TotalSize() const; Configuration GetConfig() const { return m_Configuration; } Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; @@ -127,6 +124,7 @@ public: #if ZEN_WITH_TESTS void SetAccessTime(std::string_view Bucket, const IoHash& HashKey, GcClock::TimePoint Time); + void Scrub(ScrubContext& ScrubCtx); #endif // ZEN_WITH_TESTS private: @@ -140,7 +138,6 @@ private: std::atomic<uint64_t> m_WriteCount{}; metrics::RequestStats m_PutOps; metrics::RequestStats m_GetOps; - uint64_t m_LastScrubTime = 0; ZenCacheNamespace(const ZenCacheNamespace&) = delete; ZenCacheNamespace& operator=(const ZenCacheNamespace&) = delete; @@ -178,7 +175,7 @@ public: Configuration Config; std::vector<std::string> NamespaceNames; uint64_t DiskEntryCount = 0; - GcStorageSize StorageSize; + CacheStoreSize StorageSize; }; struct NamedNamespaceStats @@ -260,13 +257,12 @@ public: bool DropBucket(std::string_view Namespace, std::string_view Bucket); bool DropNamespace(std::string_view Namespace); void Flush(); - void ScrubStorage(ScrubContext& Ctx); CacheValueDetails GetValueDetails(const std::string_view NamespaceFilter, const std::string_view BucketFilter, const std::string_view ValueFilter) const; - GcStorageSize StorageSize() const; + CacheStoreSize TotalSize() const; CacheStoreStats Stats(bool IncludeNamespaceStats = true); Configuration GetConfiguration() const { return m_Configuration; } @@ -296,6 +292,10 @@ public: bool GetContentStats(std::string_view Namespace, std::string_view BucketName, CacheContentStats& OutContentStats) const; +#if ZEN_WITH_TESTS + void Scrub(ScrubContext& Ctx); +#endif // ZEN_WITH_TESTS + private: const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const; ZenCacheNamespace* GetNamespace(std::string_view Namespace); diff --git a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h index 152031c3a..c3993c028 100644 --- a/src/zenstore/include/zenstore/cache/upstreamcacheclient.h +++ b/src/zenstore/include/zenstore/cache/upstreamcacheclient.h @@ -113,7 +113,7 @@ public: std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheChunksGetComplete&& OnComplete) = 0; - virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord, std::function<IoBuffer(const IoHash&)>&& GetValueFunc) = 0; }; } // namespace zen diff --git a/src/zenstore/include/zenstore/cidstore.h b/src/zenstore/include/zenstore/cidstore.h index b3d00fec0..8918b119f 100644 --- a/src/zenstore/include/zenstore/cidstore.h +++ b/src/zenstore/include/zenstore/cidstore.h @@ -87,12 +87,15 @@ public: bool ContainsChunk(const IoHash& DecompressedId); void FilterChunks(HashKeySet& InOutChunks); void Flush(); - void ScrubStorage(ScrubContext& Ctx); CidStoreSize TotalSize() const; CidStoreStats Stats() const; virtual void ReportMetrics(StatsMetrics& Statsd) override; +#if ZEN_WITH_TESTS + void Scrub(ScrubContext& Ctx); +#endif // ZEN_WITH_TESTS + private: struct Impl; std::unique_ptr<CasStore> m_CasStore; |