From 0bbd1fb43bbd9f878a2aa326ef06f2dc503a3b3f Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Wed, 26 Feb 2025 11:49:05 -0700 Subject: Enforce Overwrite Prevention According To Cache Policy Overwrite with differing value should be denied if QueryLocal is not present and StoreLocal is present. Overwrite with equal value should succeed regardless of policy flags. --- src/zenstore/cache/cacherpc.cpp | 170 +++++++++++++++++++++++++--------------- 1 file changed, 106 insertions(+), 64 deletions(-) (limited to 'src/zenstore/cache/cacherpc.cpp') diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index cca51e63e..94072d22d 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -422,7 +422,19 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr); + bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && + !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); + if (!m_CacheStore.Put(Request.Context, + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return PutResult::Conflict; + } m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) @@ -753,18 +765,23 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal); std::vector ReferencedAttachments; ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = {Request.RecordCacheValue}}, - ReferencedAttachments, - nullptr); + if (!m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return; + } m_CacheStats.WriteCount++; } ParseValues(Request); @@ -962,20 +979,25 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); if (RawSize == 0) { RawSize = Chunk.DecodeRawSize(); } - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Batch.get()); - m_CacheStats.WriteCount++; + bool PutSucceeded = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + Batch.get()); + if (PutSucceeded) + { + m_CacheStats.WriteCount++; + } if (Batch) { BatchResultIndexes.push_back(Results.size()); @@ -983,7 +1005,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con } else { - Results.push_back(true); + Results.push_back(PutSucceeded); } TransferredSize = Chunk.GetCompressedSize(); } @@ -1225,6 +1247,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + const bool Overwrite = StoreData && !EnumHasAllFlags(Request.Policy, CachePolicy::QueryLocal); const bool IsHit = SkipData || HasData; if (IsHit) { @@ -1235,14 +1258,18 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO if (HasData && StoreData) { - m_CacheStore.Put(Context, - *Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + if (m_CacheStore.Put( + Context, + *Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, + {}, + Overwrite, + nullptr)) + { + m_CacheStats.WriteCount++; + } } ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", @@ -1510,36 +1537,47 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context]( - CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - CacheKeyRequest& RecordKey = Params.Request; - size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); - RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast(Params.ElapsedSeconds * 1000000.0); - RecordBody& Record = Records[RecordIndex]; - - const CacheKey& Key = RecordKey.Key; - Record.Exists = true; - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Record.CacheValue.SetContentType(ZenContentType::kCbObject); - Record.Source = Params.Source; - - bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (StoreLocal) - { - std::vector ReferencedAttachments; - ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - ReferencedAttachments.push_back(ValueHash); - }); - m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr); - m_CacheStats.WriteCount++; - } - }; + const auto OnCacheRecordGetComplete = + [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) + { + return; + } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast(Params.ElapsedSeconds * 1000000.0); + RecordBody& Record = Records[RecordIndex]; + + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = Params.Source; + + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) + { + bool Overwrite = !EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal); + std::vector ReferencedAttachments; + ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + ReferencedAttachments.push_back(ValueHash); + }); + if (!m_CacheStore.Put(Context, + Namespace, + Key.Bucket, + Key.Hash, + {.Value = Record.CacheValue}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return; + } + m_CacheStats.WriteCount++; + } + }; m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } @@ -1748,20 +1786,24 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal); if (Request.IsRecordRequest) { m_CidStore.AddChunk(Params.Value, Params.RawHash); } else { - m_CacheStore.Put(Context, - Namespace, - Key.Key.Bucket, - Key.Key.Hash, - {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + if (m_CacheStore.Put(Context, + Namespace, + Key.Key.Bucket, + Key.Key.Hash, + {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, + {}, + Overwrite, + nullptr)) + { + m_CacheStats.WriteCount++; + } } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) -- cgit v1.2.3 From 25985163796ba45b028b40662146e44e8eff47a8 Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Mon, 24 Mar 2025 23:30:03 -0600 Subject: Establish TODOs and unit test for rejected PUT propagation --- src/zenstore/cache/cacherpc.cpp | 1 + 1 file changed, 1 insertion(+) (limited to 'src/zenstore/cache/cacherpc.cpp') diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 94072d22d..5b36437f2 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -424,6 +424,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag CacheValue.Value.SetContentType(ZenContentType::kCbObject); bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); + // TODO: Propagation for rejected PUTs if (!m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, -- cgit v1.2.3 From 081b55a5cf3d9af66d4d0be64fc38ea0055acede Mon Sep 17 00:00:00 2001 From: zousar Date: Tue, 24 Jun 2025 00:42:13 -0600 Subject: Change to PutResult structure Result structure contains status and a string message (may be empty) --- src/zenstore/cache/cacherpc.cpp | 157 +++++++++++++++++++++++----------------- 1 file changed, 92 insertions(+), 65 deletions(-) (limited to 'src/zenstore/cache/cacherpc.cpp') diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 5b36437f2..20c244250 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -327,13 +327,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co .Policy = std::move(Policy), .Context = Context}; - PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest); - if (Result == PutResult::Invalid) + if (Result == PutStatus::Invalid) { return CbPackage{}; } - Results.push_back(Result == PutResult::Success); + Results.push_back(Result == PutStatus::Success); } if (Results.empty()) { @@ -353,7 +353,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co return RpcResponse; } -PutResult +PutStatus CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) { CbObjectView Record = Request.RecordObject; @@ -415,7 +415,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (Count.Invalid > 0) { - return PutResult::Invalid; + return PutStatus::Invalid; } ZenCacheValue CacheValue; @@ -425,16 +425,17 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(Request.Context, - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - CacheValue, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context, + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { - return PutResult::Conflict; + return PutResult.Status; } m_CacheStats.WriteCount++; @@ -472,7 +473,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); } - return PutResult::Success; + return PutStatus::Success; } CbPackage @@ -772,14 +773,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - if (!m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = {Request.RecordCacheValue}}, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { return; } @@ -928,11 +930,11 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector BatchResults; - std::vector BatchResultIndexes; - std::vector Results; - std::vector UpstreamCacheKeys; - uint64_t RequestCount = RequestsArray.Num(); + std::vector BatchResults; + std::vector BatchResultIndexes; + std::vector Results; + std::vector UpstreamCacheKeys; + uint64_t RequestCount = RequestsArray.Num(); { Results.reserve(RequestCount); std::unique_ptr Batch; @@ -987,32 +989,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { RawSize = Chunk.DecodeRawSize(); } - bool PutSucceeded = m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Overwrite, - Batch.get()); - if (PutSucceeded) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + Batch.get()); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } if (Batch) { BatchResultIndexes.push_back(Results.size()); - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail}); } else { - Results.push_back(PutSucceeded); + Results.push_back(PutResult); } TransferredSize = Chunk.GetCompressedSize(); } else { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); } Valid = true; } @@ -1028,12 +1030,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); Valid = true; } else { - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)}); } } // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. @@ -1068,13 +1070,13 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { size_t BatchResultIndex = BatchResultIndexes[Index]; ZEN_ASSERT(BatchResultIndex < Results.size()); - ZEN_ASSERT(Results[BatchResultIndex] == false); + ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success); Results[BatchResultIndex] = BatchResults[Index]; } for (std::size_t Index = 0; Index < Results.size(); Index++) { - if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty) + if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty) { m_UpstreamCache.EnqueueUpstream( {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); @@ -1084,11 +1086,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response"); CbObjectWriter ResponseObject{1024}; ResponseObject.BeginArray("Result"sv); - for (bool Value : Results) + bool bAnyErrors = false; + for (const ZenCacheStore::PutResult& Value : Results) { - ResponseObject.AddBool(Value); + if (Value.Status == zen::PutStatus::Success) + { + ResponseObject.AddBool(true); + } + else + { + bAnyErrors = true; + ResponseObject.AddBool(false); + } } ResponseObject.EndArray(); + if (bAnyErrors) + { + ResponseObject.BeginArray("ErrorMessages"sv); + for (const ZenCacheStore::PutResult& Value : Results) + { + if (Value.Status != zen::PutStatus::Success) + { + ResponseObject.AddString(Value.Message); + } + } + ResponseObject.EndArray(); + } CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); @@ -1259,15 +1282,16 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO if (HasData && StoreData) { - if (m_CacheStore.Put( - Context, - *Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put( + Context, + *Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } @@ -1565,14 +1589,15 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - if (!m_CacheStore.Put(Context, - Namespace, - Key.Bucket, - Key.Hash, - {.Value = Record.CacheValue}, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + Namespace, + Key.Bucket, + Key.Hash, + {.Value = Record.CacheValue}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { return; } @@ -1794,14 +1819,16 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, } else { - if (m_CacheStore.Put(Context, + ZenCacheStore::PutResult PutResult = + m_CacheStore.Put(Context, Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, {}, Overwrite, - nullptr)) + nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } -- cgit v1.2.3 From 45793f40c44e60185b149e5030539c679e874990 Mon Sep 17 00:00:00 2001 From: zousar Date: Tue, 24 Jun 2025 16:47:19 -0600 Subject: xmake precommit --- src/zenstore/cache/cacherpc.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src/zenstore/cache/cacherpc.cpp') diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 436e8a083..5d9a68919 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -944,10 +944,10 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector BatchResults; - eastl::fixed_vector BatchResultIndexes; - eastl::fixed_vector Results; - eastl::fixed_vector UpstreamCacheKeys; + std::vector BatchResults; + eastl::fixed_vector BatchResultIndexes; + eastl::fixed_vector Results; + eastl::fixed_vector UpstreamCacheKeys; uint64_t RequestCount = RequestsArray.Num(); { -- cgit v1.2.3 From 4c05d1041461b630cd5770dae5e8d03147d5674b Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 20 Aug 2025 12:33:03 +0200 Subject: per namespace/project cas prep refactor (#470) - Refactor so we can have more than one cas store for project store and cache. - Refactor `UpstreamCacheClient` so it is not tied to a specific CidStore - Refactor scrub to keep the GC interface ScrubStorage function separate from scrub accessor functions (renamed to Scrub). - Refactor storage size to keep GC interface StorageSize function separate from size accessor functions (renamed to TotalSize) - Refactor cache storage so `ZenCacheDiskLayer::CacheBucket` implements GcStorage interface rather than `ZenCacheNamespace` --- src/zenstore/cache/cacherpc.cpp | 54 ++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 17 deletions(-) (limited to 'src/zenstore/cache/cacherpc.cpp') diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 5d9a68919..ff21d1ede 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -153,13 +153,13 @@ CacheRpcHandler::CacheRpcHandler(LoggerRef InLog, CacheStats& InCacheStats, UpstreamCacheClient& InUpstreamCache, ZenCacheStore& InCacheStore, - CidStore& InCidStore, + GetCidStoreFunc&& InGetCidStore, const DiskWriteBlocker* InDiskWriteBlocker) : m_Log(InLog) , m_CacheStats(InCacheStats) , m_UpstreamCache(InUpstreamCache) , m_CacheStore(InCacheStore) -, m_CidStore(InCidStore) +, m_GetCidStore(std::move(InGetCidStore)) , m_DiskWriteBlocker(InDiskWriteBlocker) { } @@ -174,6 +174,12 @@ CacheRpcHandler::AreDiskWritesAllowed() const return (m_DiskWriteBlocker == nullptr || m_DiskWriteBlocker->AreDiskWritesAllowed()); } +CidStore& +CacheRpcHandler::GetCidStore(std::string_view Namespace) +{ + return m_GetCidStore(Namespace); +} + CacheRpcHandler::RpcResponseCode CacheRpcHandler::HandleRpcRequest(const CacheRequestContext& Context, std::string_view UriNamespace, @@ -381,9 +387,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Stopwatch Timer; + CidStore& ChunkStore = m_GetCidStore(Request.Namespace); + Request.RecordObject.IterateAttachments([this, &Request, Package, + &ChunkStore, &WriteAttachmentBuffers, &WriteRawHashes, &ValidAttachments, @@ -412,7 +421,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag Count.Invalid++; } } - else if (m_CidStore.ContainsChunk(ValueHash)) + else if (ChunkStore.ContainsChunk(ValueHash)) { ValidAttachments.emplace_back(ValueHash); Count.Valid++; @@ -448,7 +457,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (!WriteAttachmentBuffers.empty()) { - std::vector InsertResults = m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + std::vector InsertResults = ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (size_t Index = 0; Index < InsertResults.size(); Index++) { if (InsertResults[Index].New) @@ -475,10 +484,12 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (HasUpstream && EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .Namespace = Request.Namespace, - .Key = Request.Key, - .ValueContentIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, + .Namespace = Request.Namespace, + .Key = Request.Key, + .ValueContentIds = std::move(ValidAttachments)}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } return PutStatus::Success; } @@ -521,6 +532,8 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb return CbPackage{}; } + CidStore& ChunkStore = m_GetCidStore(Namespace.value()); + const bool HasUpstream = m_UpstreamCache.IsActive(); eastl::fixed_vector Requests; @@ -620,7 +633,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { - if (m_CidStore.ContainsChunk(Value.ContentId)) + if (ChunkStore.ContainsChunk(Value.ContentId)) { Value.Exists = true; } @@ -641,7 +654,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } else { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) + if (IoBuffer Chunk = ChunkStore.FindChunkByCid(Value.ContentId)) { if (Chunk.GetSize() > 0) { @@ -665,7 +678,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } if (!RequestValueIndexes.empty()) { - m_CidStore.IterateChunks( + ChunkStore.IterateChunks( CidHashes, [this, &Request, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { try @@ -758,7 +771,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } } - const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) { + const auto OnCacheRecordGetComplete = [this, Namespace, &ChunkStore, &ParseValues, Context](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; @@ -827,7 +840,7 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Value.Exists = true; if (StoreLocal) { - m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); + ChunkStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), Attachment->GetHash()); } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { @@ -944,6 +957,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + CidStore& ChunkStore = m_GetCidStore(Namespace.value()); + std::vector BatchResults; eastl::fixed_vector BatchResultIndexes; eastl::fixed_vector Results; @@ -1094,7 +1109,8 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty) { m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); + {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}, + [ChunkStore = &ChunkStore](const IoHash& ValueHash) { return ChunkStore->FindChunkByCid(ValueHash); }); } } { @@ -1546,6 +1562,8 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); + CidStore& ChunkStore = m_GetCidStore(Namespace); + // TODO: BatchGet records? std::vector UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) @@ -1684,12 +1702,12 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->RawSizeKnown) { - if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) + if (ChunkStore.ContainsChunk(Request->Key->ChunkId)) { Request->Exists = true; } } - else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) + else if (IoBuffer Payload = ChunkStore.FindChunkByCid(Request->Key->ChunkId)) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { @@ -1822,6 +1840,8 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, return; } + CidStore& ChunkStore = m_GetCidStore(Namespace); + CacheChunkRequest& Key = Params.Request; size_t RequestIndex = std::distance(RequestKeys.data(), &Key); ChunkRequest& Request = Requests[RequestIndex]; @@ -1841,7 +1861,7 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal); if (Request.IsRecordRequest) { - m_CidStore.AddChunk(Params.Value, Params.RawHash); + ChunkStore.AddChunk(Params.Value, Params.RawHash); } else { -- cgit v1.2.3