From 0bbd1fb43bbd9f878a2aa326ef06f2dc503a3b3f Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Wed, 26 Feb 2025 11:49:05 -0700 Subject: Enforce Overwrite Prevention According To Cache Policy Overwrite with differing value should be denied if QueryLocal is not present and StoreLocal is present. Overwrite with equal value should succeed regardless of policy flags. --- src/zenserver/cache/httpstructuredcache.cpp | 88 +++++++++++++++++++---------- 1 file changed, 59 insertions(+), 29 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index b9a9ca380..d5dd28f68 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -996,8 +996,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (Success && StoreLocal) { - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, nullptr); - m_CacheStats.WriteCount++; + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (m_CacheStore + .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr)) + { + m_CacheStats.WriteCount++; + } } } else if (AcceptType == ZenContentType::kCbPackage) @@ -1082,30 +1086,34 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (StoreLocal) { - m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - CacheValue, - ReferencedAttachments, - nullptr); - m_CacheStats.WriteCount++; - - if (!WriteAttachmentBuffers.empty()) + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr)) { - std::vector InsertResults = - m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (const CidStore::InsertResult& Result : InsertResults) + m_CacheStats.WriteCount++; + + if (!WriteAttachmentBuffers.empty()) { - if (Result.New) + std::vector InsertResults = + m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& Result : InsertResults) { - Count.New++; + if (Result.New) + { + Count.New++; + } } } - } - WriteAttachmentBuffers = {}; - WriteRawHashes = {}; + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; + } } BinaryWriter MemStream; @@ -1224,13 +1232,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con { RawHash = IoHash::HashBuffer(SharedBuffer(Body)); } - m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, - {}, - nullptr); + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (!m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + nullptr)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) @@ -1279,7 +1292,19 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con TotalCount++; }); - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, nullptr); + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + + if (!m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", @@ -1372,10 +1397,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } + const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); + if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) -- cgit v1.2.3 From 25985163796ba45b028b40662146e44e8eff47a8 Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Mon, 24 Mar 2025 23:30:03 -0600 Subject: Establish TODOs and unit test for rejected PUT propagation --- src/zenserver/cache/httpstructuredcache.cpp | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index d5dd28f68..932e456d2 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1233,6 +1233,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con RawHash = IoHash::HashBuffer(SharedBuffer(Body)); } const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + // TODO: Propagation for rejected PUTs if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, @@ -1294,6 +1295,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + // TODO: Propagation for rejected PUTs if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, @@ -1402,6 +1404,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); + // TODO: Propagation for rejected PUTs if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite)) { return Request.WriteResponse(HttpResponseCode::Conflict); -- cgit v1.2.3 From 081b55a5cf3d9af66d4d0be64fc38ea0055acede Mon Sep 17 00:00:00 2001 From: zousar Date: Tue, 24 Jun 2025 00:42:13 -0600 Subject: Change to PutResult structure Result structure contains status and a string message (may be empty) --- src/zenserver/cache/httpstructuredcache.cpp | 89 ++++++++++++++++++----------- 1 file changed, 57 insertions(+), 32 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 932e456d2..224cc6678 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -996,9 +996,11 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (Success && StoreLocal) { - const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - if (m_CacheStore - .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr)) + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + ZenCacheStore::PutResult PutResult = + m_CacheStore + .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } @@ -1086,15 +1088,16 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (StoreLocal) { - const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); - if (m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - CacheValue, - ReferencedAttachments, - Overwrite, - nullptr)) + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; @@ -1204,6 +1207,24 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con return Request.WriteResponse(HttpResponseCode::InsufficientStorage); } + auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) { + ZEN_UNUSED(PutResult); + + HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError; + switch (PutResult.Status) + { + case zen::PutStatus::Conflict: + ResponseCode = HttpResponseCode::Conflict; + break; + case zen::PutStatus::Invalid: + ResponseCode = HttpResponseCode::BadRequest; + break; + } + + return PutResult.Message.empty() ? Request.WriteResponse(ResponseCode) + : Request.WriteResponse(ResponseCode, zen::HttpContentType::kText, PutResult.Message); + }; + const HttpContentType ContentType = Request.RequestContentType(); Body.SetContentType(ContentType); @@ -1234,16 +1255,17 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con } const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { - return Request.WriteResponse(HttpResponseCode::Conflict); + return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; @@ -1296,16 +1318,17 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body}, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { - return Request.WriteResponse(HttpResponseCode::Conflict); + return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; @@ -1405,9 +1428,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite)) + ZenCacheStore::PutResult PutResult = + m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite); + if (PutResult.Status != zen::PutStatus::Success) { - return Request.WriteResponse(HttpResponseCode::Conflict); + return WriteFailureResponse(PutResult); } m_CacheStats.WriteCount++; -- cgit v1.2.3 From 4c05d1041461b630cd5770dae5e8d03147d5674b Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 20 Aug 2025 12:33:03 +0200 Subject: per namespace/project cas prep refactor (#470) - Refactor so we can have more than one cas store for project store and cache. - Refactor `UpstreamCacheClient` so it is not tied to a specific CidStore - Refactor scrub to keep the GC interface ScrubStorage function separate from scrub accessor functions (renamed to Scrub). - Refactor storage size to keep GC interface StorageSize function separate from size accessor functions (renamed to TotalSize) - Refactor cache storage so `ZenCacheDiskLayer::CacheBucket` implements GcStorage interface rather than `ZenCacheNamespace` --- src/zenserver/cache/httpstructuredcache.cpp | 306 ++++++++++++++++++---------- 1 file changed, 197 insertions(+), 109 deletions(-) (limited to 'src/zenserver/cache/httpstructuredcache.cpp') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index acb6053d9..19ac3a216 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -85,7 +85,7 @@ namespace { ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& GetCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache, @@ -95,11 +95,10 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCach , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) -, m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) , m_DiskWriteBlocker(InDiskWriteBlocker) , m_OpenProcessCache(InOpenProcessCache) -, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, InCidStore, InDiskWriteBlocker) +, m_RpcHandler(m_Log, m_CacheStats, UpstreamCache, InCacheStore, std::move(GetCidStore), InDiskWriteBlocker) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); @@ -130,24 +129,6 @@ HttpStructuredCacheService::Flush() m_CacheStore.Flush(); } -void -HttpStructuredCacheService::ScrubStorage(ScrubContext& Ctx) -{ - if (m_LastScrubTime == Ctx.ScrubTimestamp()) - { - return; - } - - ZenCacheStore::Info Info = m_CacheStore.GetInfo(); - - ZEN_INFO("scrubbing '{}'", Info.BasePath); - - m_LastScrubTime = Ctx.ScrubTimestamp(); - - m_CidStore.ScrubStorage(Ctx); - m_CacheStore.ScrubStorage(Ctx); -} - void HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { @@ -243,6 +224,9 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) for (const auto& NamespaceIt : ValueDetails.Namespaces) { const std::string& Namespace = NamespaceIt.first; + + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace); + for (const auto& BucketIt : NamespaceIt.second.Buckets) { const std::string& Bucket = BucketIt.first; @@ -252,7 +236,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); CSVWriter << "\r\n" << Namespace << "," << Bucket << "," << ValueIt.first.ToHexString() << ", " << Hash.ToHexString() << ", " << gsl::narrow(Payload.GetSize()); @@ -270,7 +254,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) size_t AttachmentsSize = 0; for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); AttachmentsSize += Payload.GetSize(); } CSVWriter << ", " << gsl::narrow(AttachmentsSize); @@ -292,6 +276,9 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) for (const auto& NamespaceIt : ValueDetails.Namespaces) { const std::string& Namespace = NamespaceIt.first; + + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Namespace); + Cbo.BeginObject(); { Cbo.AddString("name", Namespace); @@ -334,7 +321,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) { Cbo.BeginObject(); Cbo.AddHash("cid", Hash); - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); Cbo.AddInteger("size", gsl::narrow(Payload.GetSize())); Cbo.EndObject(); } @@ -348,7 +335,7 @@ HttpStructuredCacheService::HandleDetailsRequest(HttpServerRequest& Request) size_t AttachmentsSize = 0; for (const IoHash& Hash : ValueIt.second.Attachments) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Hash); + IoBuffer Payload = ChunkStore.FindChunkByCid(Hash); AttachmentsSize += Payload.GetSize(); } Cbo.AddInteger("attachmentssize", gsl::narrow(AttachmentsSize)); @@ -623,6 +610,8 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque ResponseWriter.AddInteger("EntryCount", Info->DiskLayerInfo.EntryCount); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName); + if (auto Buckets = HttpServerRequest::Decode(Request.GetQueryParams().GetValue("bucketsizes")); !Buckets.empty()) { ResponseWriter.BeginObject("BucketSizes"); @@ -681,7 +670,7 @@ HttpStructuredCacheService::HandleCacheNamespaceRequest(HttpServerRequest& Reque uint64_t AttachmentsSize = 0; - m_CidStore.IterateChunks( + ChunkStore.IterateChunks( AllAttachments, [&](size_t Index, const IoBuffer& Payload) { ZEN_UNUSED(Index); @@ -749,6 +738,8 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, ResponseWriter.AddInteger("DiskEntryCount", Info->DiskLayerInfo.EntryCount); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(NamespaceName); + if (auto GetBucketSize = Request.GetQueryParams().GetValue("bucketsize"); GetBucketSize == "true") { CacheContentStats ContentStats; @@ -775,7 +766,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, WorkerThreadPool& WorkerPool = GetMediumWorkerPool(EWorkloadType::Background); - m_CidStore.IterateChunks( + ChunkStore.IterateChunks( ContentStats.Attachments, [&](size_t Index, const IoBuffer& Payload) { ZEN_UNUSED(Index); @@ -850,6 +841,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con CacheRequestContext RequestContext = {.SessionId = Request.SessionId(), .RequestId = Request.RequestId()}; Stopwatch Timer; + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && m_CacheStore.Get(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { @@ -864,17 +857,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con uint32_t MissingCount = 0; CbObjectView CacheRecord(ClientResultValue.Value.Data()); - CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { + CacheRecord.IterateAttachments([this, &ChunkStore, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { if (SkipData) { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + if (!ChunkStore.ContainsChunk(AttachmentHash.AsHash())) { MissingCount++; } } else { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + if (IoBuffer Chunk = ChunkStore.FindChunkByCid(AttachmentHash.AsHash())) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) @@ -974,6 +967,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { Success = true; + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + ClientResultValue.Value = UpstreamResult.Value; ClientResultValue.Value.SetContentType(AcceptType); @@ -1024,6 +1019,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con CacheRecord.IterateAttachments([this, &Package, &Ref, + &ChunkStore, &WriteAttachmentBuffers, &WriteRawHashes, &ReferencedAttachments, @@ -1058,12 +1054,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con { if (SkipData) { - if (m_CidStore.ContainsChunk(Hash)) + if (ChunkStore.ContainsChunk(Hash)) { Count.Valid++; } } - else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash)) + else if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Hash)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Chunk)); if (Compressed) @@ -1105,7 +1101,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (!WriteAttachmentBuffers.empty()) { std::vector InsertResults = - m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (const CidStore::InsertResult& Result : InsertResults) { if (Result.New) @@ -1272,7 +1268,10 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { - m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + m_UpstreamCache.EnqueueUpstream( + {.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' in {}", @@ -1306,15 +1305,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con std::vector ReferencedAttachments; int32_t TotalCount = 0; - CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) { - const IoHash Hash = AttachmentHash.AsHash(); - ReferencedAttachments.push_back(Hash); - if (m_CidStore.ContainsChunk(Hash)) - { - ValidAttachments.emplace_back(Hash); - } - TotalCount++; - }); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + CacheRecord.IterateAttachments( + [this, &ChunkStore, &TotalCount, &ValidAttachments, &ReferencedAttachments](CbFieldView AttachmentHash) { + const IoHash Hash = AttachmentHash.AsHash(); + ReferencedAttachments.push_back(Hash); + if (ChunkStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + } + TotalCount++; + }); const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); @@ -1348,10 +1350,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con CachePolicy Policy = PolicyFromUrl; if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbObject, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } Request.WriteResponse(HttpResponseCode::Created); @@ -1384,38 +1388,46 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con WriteAttachmentBuffers.reserve(NumAttachments); WriteRawHashes.reserve(NumAttachments); - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, &ReferencedAttachments, &Count]( - CbFieldView HashView) { - const IoHash Hash = HashView.AsHash(); - ReferencedAttachments.push_back(Hash); - if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) - { - if (Attachment->IsCompressedBinary()) - { - WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); - WriteRawHashes.push_back(Hash); - ValidAttachments.emplace_back(Hash); - Count.Valid++; - } - else - { - ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - ToString(HttpContentType::kCbPackage), - Hash); - Count.Invalid++; - } - } - else if (m_CidStore.ContainsChunk(Hash)) + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + CacheRecord.IterateAttachments([this, + &Ref, + &Package, + &ChunkStore, + &WriteAttachmentBuffers, + &WriteRawHashes, + &ValidAttachments, + &ReferencedAttachments, + &Count](CbFieldView HashView) { + const IoHash Hash = HashView.AsHash(); + ReferencedAttachments.push_back(Hash); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) + { + if (Attachment->IsCompressedBinary()) { + WriteAttachmentBuffers.emplace_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + WriteRawHashes.push_back(Hash); ValidAttachments.emplace_back(Hash); Count.Valid++; } - Count.Total++; - }); + else + { + ZEN_WARN("PUTCACHERECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + ToString(HttpContentType::kCbPackage), + Hash); + Count.Invalid++; + } + } + else if (ChunkStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + Count.Valid++; + } + Count.Total++; + }); if (Count.Invalid > 0) { @@ -1439,7 +1451,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (!WriteAttachmentBuffers.empty()) { - std::vector InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + std::vector InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (const CidStore::InsertResult& InsertResult : InsertResults) { if (InsertResult.New) @@ -1466,10 +1478,12 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con if (HasUpstream && EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Ref.Namespace, - .Key = {Ref.BucketSegment, Ref.HashKey}, - .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, + .Namespace = Ref.Namespace, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } Request.WriteResponse(HttpResponseCode::Created); @@ -1503,7 +1517,9 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { Stopwatch Timer; - IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + IoBuffer Value = ChunkStore.FindChunkByCid(Ref.ValueContentId); const UpstreamEndpointInfo* Source = nullptr; CachePolicy Policy = PolicyFromUrl; @@ -1525,7 +1541,7 @@ HttpStructuredCacheService::HandleGetCacheChunk(HttpServerRequest& Request, cons { if (AreDiskWritesAllowed()) { - m_CidStore.AddChunk(UpstreamResult.Value, RawHash); + ChunkStore.AddChunk(UpstreamResult.Value, RawHash); } Source = UpstreamResult.Source; } @@ -1619,7 +1635,9 @@ HttpStructuredCacheService::HandlePutCacheChunk(HttpServerRequest& Request, cons "ValueContentId does not match attachment hash"sv); } - CidStore::InsertResult Result = m_CidStore.AddChunk(Body, RawHash); + CidStore& ChunkStore = m_RpcHandler.GetCidStore(Ref.Namespace); + + CidStore::InsertResult Result = ChunkStore.AddChunk(Body, RawHash); ZEN_DEBUG("PUTCACHECHUNK - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, @@ -1834,16 +1852,51 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) EmitSnapshot("requests", m_HttpRequests, Cbo); - const uint64_t HitCount = m_CacheStats.HitCount; - const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; - const uint64_t MissCount = m_CacheStats.MissCount; - const uint64_t WriteCount = m_CacheStats.WriteCount; - const uint64_t BadRequestCount = m_CacheStats.BadRequestCount; - struct CidStoreStats StoreStats = m_CidStore.Stats(); - const uint64_t ChunkHitCount = StoreStats.HitCount; - const uint64_t ChunkMissCount = StoreStats.MissCount; - const uint64_t ChunkWriteCount = StoreStats.WriteCount; - const uint64_t TotalCount = HitCount + MissCount; + const uint64_t HitCount = m_CacheStats.HitCount; + const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; + const uint64_t MissCount = m_CacheStats.MissCount; + const uint64_t WriteCount = m_CacheStats.WriteCount; + const uint64_t BadRequestCount = m_CacheStats.BadRequestCount; + + uint64_t TotalChunkHitCount = 0; + uint64_t TotalChunkMissCount = 0; + uint64_t TotalChunkWriteCount = 0; + CidStoreSize TotalCidSize; + + tsl::robin_map UniqueStores; + { + std::vector NamespaceNames = m_CacheStore.GetNamespaces(); + + for (const std::string& NamespaceName : NamespaceNames) + { + CidStore* Store = &m_RpcHandler.GetCidStore(NamespaceName); + if (auto It = UniqueStores.find(Store); It == UniqueStores.end()) + { + UniqueStores.insert_or_assign(Store, NamespaceName); + } + else + { + UniqueStores.insert_or_assign(Store, std::string{}); + } + } + + for (auto It : UniqueStores) + { + CidStore* ChunkStore = It.first; + + CidStoreStats StoreStats = ChunkStore->Stats(); + CidStoreSize StoreSize = ChunkStore->TotalSize(); + + TotalChunkHitCount += StoreStats.HitCount; + TotalChunkMissCount += StoreStats.MissCount; + TotalChunkWriteCount += StoreStats.WriteCount; + + TotalCidSize.TinySize += StoreSize.TinySize; + TotalCidSize.SmallSize += StoreSize.SmallSize; + TotalCidSize.LargeSize += StoreSize.LargeSize; + TotalCidSize.TotalSize += StoreSize.TotalSize; + } + } const uint64_t RpcRequests = m_CacheStats.RpcRequests; const uint64_t RpcRecordRequests = m_CacheStats.RpcRecordRequests; @@ -1853,17 +1906,11 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) const uint64_t RpcChunkRequests = m_CacheStats.RpcChunkRequests; const uint64_t RpcChunkBatchRequests = m_CacheStats.RpcChunkBatchRequests; - const CidStoreSize CidSize = m_CidStore.TotalSize(); - const GcStorageSize CacheSize = m_CacheStore.StorageSize(); + const CacheStoreSize CacheSize = m_CacheStore.TotalSize(); bool ShowCidStoreStats = Request.GetQueryParams().GetValue("cidstorestats") == "true"; bool ShowCacheStoreStats = Request.GetQueryParams().GetValue("cachestorestats") == "true"; - CidStoreStats CidStoreStats = {}; - if (ShowCidStoreStats) - { - CidStoreStats = m_CidStore.Stats(); - } ZenCacheStore::CacheStoreStats CacheStoreStats = {}; if (ShowCacheStoreStats) { @@ -1898,6 +1945,7 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) Cbo.EndObject(); Cbo << "hits" << HitCount << "misses" << MissCount << "writes" << WriteCount; + const uint64_t TotalCount = HitCount + MissCount; Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); if (m_UpstreamCache.IsActive()) @@ -1908,7 +1956,9 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); } - Cbo << "cidhits" << ChunkHitCount << "cidmisses" << ChunkMissCount << "cidwrites" << ChunkWriteCount; + Cbo << "cidhits" << TotalChunkHitCount << "cidmisses" << TotalChunkMissCount << "cidwrites" << TotalChunkWriteCount; + const uint64_t TotalChunkCount = TotalChunkHitCount + TotalChunkMissCount; + Cbo << "cidhit_ratio" << (TotalChunkHitCount ? (double(TotalChunkCount) / double(TotalChunkHitCount)) : 0.0); if (ShowCacheStoreStats) { @@ -2017,20 +2067,58 @@ HttpStructuredCacheService::HandleStatsRequest(HttpServerRequest& Request) { Cbo.BeginObject("size"); { - Cbo << "tiny" << CidSize.TinySize; - Cbo << "small" << CidSize.SmallSize; - Cbo << "large" << CidSize.LargeSize; - Cbo << "total" << CidSize.TotalSize; + Cbo << "tiny" << TotalCidSize.TinySize; + Cbo << "small" << TotalCidSize.SmallSize; + Cbo << "large" << TotalCidSize.LargeSize; + Cbo << "total" << TotalCidSize.TotalSize; } Cbo.EndObject(); if (ShowCidStoreStats) { Cbo.BeginObject("store"); - Cbo << "hits" << CidStoreStats.HitCount << "misses" << CidStoreStats.MissCount << "writes" << CidStoreStats.WriteCount; - EmitSnapshot("read", CidStoreStats.FindChunkOps, Cbo); - EmitSnapshot("write", CidStoreStats.AddChunkOps, Cbo); - // EmitSnapshot("exists", CidStoreStats.ContainChunkOps, Cbo); + + auto OutputStats = [&](CidStore& ChunkStore) { + CidStoreStats StoreStats = ChunkStore.Stats(); + Cbo << "hits" << StoreStats.HitCount << "misses" << StoreStats.MissCount << "writes" << StoreStats.WriteCount; + const uint64_t Count = StoreStats.HitCount + StoreStats.MissCount; + Cbo << "hit_ratio" << (Count ? (double(StoreStats.HitCount) / double(Count)) : 0.0); + EmitSnapshot("read", StoreStats.FindChunkOps, Cbo); + EmitSnapshot("write", StoreStats.AddChunkOps, Cbo); + }; + + if (UniqueStores.size() > 1) + { + Cbo.BeginArray("namespaces"); + for (auto It : UniqueStores) + { + CidStore* ChunkStore = It.first; + const std::string& Namespace = It.second; + CidStoreSize ChunkStoreSize = ChunkStore->TotalSize(); + Cbo.BeginObject(); + { + Cbo << "namespace" << Namespace; + Cbo.BeginObject("stats"); + OutputStats(*ChunkStore); + Cbo.EndObject(); + + Cbo.BeginObject("size"); + { + Cbo << "tiny" << ChunkStoreSize.TinySize; + Cbo << "small" << ChunkStoreSize.SmallSize; + Cbo << "large" << ChunkStoreSize.LargeSize; + Cbo << "total" << ChunkStoreSize.TotalSize; + } + Cbo.EndObject(); + } + Cbo.EndObject(); + } + Cbo.EndArray(); // namespaces + } + else if (UniqueStores.size() != 0) + { + OutputStats(*UniqueStores.begin()->first); + } Cbo.EndObject(); } } -- cgit v1.2.3