From 0bbd1fb43bbd9f878a2aa326ef06f2dc503a3b3f Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Wed, 26 Feb 2025 11:49:05 -0700 Subject: Enforce Overwrite Prevention According To Cache Policy Overwrite with differing value should be denied if QueryLocal is not present and StoreLocal is present. Overwrite with equal value should succeed regardless of policy flags. --- src/zenserver/cache/httpstructuredcache.cpp | 88 +++++++---- src/zenstore/cache/cachedisklayer.cpp | 93 ++++++++++- src/zenstore/cache/cacherpc.cpp | 170 +++++++++++++-------- src/zenstore/cache/structuredcachestore.cpp | 62 +++++--- .../include/zenstore/cache/cachedisklayer.h | 21 ++- src/zenstore/include/zenstore/cache/cacherpc.h | 1 + .../include/zenstore/cache/structuredcachestore.h | 6 +- 7 files changed, 311 insertions(+), 130 deletions(-) (limited to 'src') diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index b9a9ca380..d5dd28f68 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -996,8 +996,12 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (Success && StoreLocal) { - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, nullptr); - m_CacheStats.WriteCount++; + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (m_CacheStore + .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr)) + { + m_CacheStats.WriteCount++; + } } } else if (AcceptType == ZenContentType::kCbPackage) @@ -1082,30 +1086,34 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con if (StoreLocal) { - m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - CacheValue, - ReferencedAttachments, - nullptr); - m_CacheStats.WriteCount++; - - if (!WriteAttachmentBuffers.empty()) + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr)) { - std::vector InsertResults = - m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); - for (const CidStore::InsertResult& Result : InsertResults) + m_CacheStats.WriteCount++; + + if (!WriteAttachmentBuffers.empty()) { - if (Result.New) + std::vector InsertResults = + m_CidStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + for (const CidStore::InsertResult& Result : InsertResults) { - Count.New++; + if (Result.New) + { + Count.New++; + } } } - } - WriteAttachmentBuffers = {}; - WriteRawHashes = {}; + WriteAttachmentBuffers = {}; + WriteRawHashes = {}; + } } BinaryWriter MemStream; @@ -1224,13 +1232,18 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con { RawHash = IoHash::HashBuffer(SharedBuffer(Body)); } - m_CacheStore.Put(RequestContext, - Ref.Namespace, - Ref.BucketSegment, - Ref.HashKey, - {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, - {}, - nullptr); + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + if (!m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + nullptr)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; if (HasUpstream && EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) @@ -1279,7 +1292,19 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con TotalCount++; }); - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}, ReferencedAttachments, nullptr); + const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); + + if (!m_CacheStore.Put(RequestContext, + Ref.Namespace, + Ref.BucketSegment, + Ref.HashKey, + {.Value = Body}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total) in {}", @@ -1372,10 +1397,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } + const bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments); + if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite)) + { + return Request.WriteResponse(HttpResponseCode::Conflict); + } m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 25f68330a..eaed1f64e 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1793,16 +1793,98 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -void +static bool +ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t ValueRawSize = 0; + IoHash ValueRawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && + (RawHashProvider() == ValueRawHash); + } + + return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); +} + +static bool +ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) +{ + if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) + { + return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); + } + else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t Value1RawSize = 0; + IoHash Value1RawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && + ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); + } + + return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); +} + +bool ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + if (!Overwrite) + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + auto It = m_Index.find(HashKey); + if (It != m_Index.end()) + { + PayloadIndex EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = m_Payloads[EntryIndex].Location; + + bool ComparisonComplete = false; + const BucketPayload* Payload = &m_Payloads[EntryIndex]; + if (Payload->MetaData) + { + const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; + if (MetaData) + { + if (!ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(false); + } + return false; + } + ComparisonComplete = true; + } + } + + if (!ComparisonComplete) + { + IndexLock.ReleaseNow(); + ZenCacheValue ExistingValue; + if (Get(HashKey, ExistingValue) && !ValueMatchesValue(Value, ExistingValue)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(false); + } + return false; + } + } + } + } + if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { PutStandaloneCacheValue(HashKey, Value, References); @@ -1817,6 +1899,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } m_DiskWriteCount++; + return true; } uint64_t @@ -3835,21 +3918,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -void +bool ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - + bool RetVal = false; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); - Bucket->Put(HashKey, Value, References, BucketBatchHandle); + RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle); TryMemCacheTrim(); } + return RetVal; } void diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index cca51e63e..94072d22d 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -422,7 +422,19 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr); + bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && + !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); + if (!m_CacheStore.Put(Request.Context, + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return PutResult::Conflict; + } m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) @@ -753,18 +765,23 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal); std::vector ReferencedAttachments; ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = {Request.RecordCacheValue}}, - ReferencedAttachments, - nullptr); + if (!m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return; + } m_CacheStats.WriteCount++; } ParseValues(Request); @@ -962,20 +979,25 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); if (RawSize == 0) { RawSize = Chunk.DecodeRawSize(); } - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Batch.get()); - m_CacheStats.WriteCount++; + bool PutSucceeded = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + Batch.get()); + if (PutSucceeded) + { + m_CacheStats.WriteCount++; + } if (Batch) { BatchResultIndexes.push_back(Results.size()); @@ -983,7 +1005,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con } else { - Results.push_back(true); + Results.push_back(PutSucceeded); } TransferredSize = Chunk.GetCompressedSize(); } @@ -1225,6 +1247,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + const bool Overwrite = StoreData && !EnumHasAllFlags(Request.Policy, CachePolicy::QueryLocal); const bool IsHit = SkipData || HasData; if (IsHit) { @@ -1235,14 +1258,18 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO if (HasData && StoreData) { - m_CacheStore.Put(Context, - *Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + if (m_CacheStore.Put( + Context, + *Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, + {}, + Overwrite, + nullptr)) + { + m_CacheStats.WriteCount++; + } } ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", @@ -1510,36 +1537,47 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context]( - CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - CacheKeyRequest& RecordKey = Params.Request; - size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); - RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast(Params.ElapsedSeconds * 1000000.0); - RecordBody& Record = Records[RecordIndex]; - - const CacheKey& Key = RecordKey.Key; - Record.Exists = true; - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Record.CacheValue.SetContentType(ZenContentType::kCbObject); - Record.Source = Params.Source; - - bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (StoreLocal) - { - std::vector ReferencedAttachments; - ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - ReferencedAttachments.push_back(ValueHash); - }); - m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr); - m_CacheStats.WriteCount++; - } - }; + const auto OnCacheRecordGetComplete = + [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) + { + return; + } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast(Params.ElapsedSeconds * 1000000.0); + RecordBody& Record = Records[RecordIndex]; + + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = Params.Source; + + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) + { + bool Overwrite = !EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal); + std::vector ReferencedAttachments; + ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + ReferencedAttachments.push_back(ValueHash); + }); + if (!m_CacheStore.Put(Context, + Namespace, + Key.Bucket, + Key.Hash, + {.Value = Record.CacheValue}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return; + } + m_CacheStats.WriteCount++; + } + }; m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } @@ -1748,20 +1786,24 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal); if (Request.IsRecordRequest) { m_CidStore.AddChunk(Params.Value, Params.RawHash); } else { - m_CacheStore.Put(Context, - Namespace, - Key.Key.Bucket, - Key.Key.Hash, - {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + if (m_CacheStore.Put(Context, + Namespace, + Key.Key.Bucket, + Key.Key.Hash, + {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, + {}, + Overwrite, + nullptr)) + { + m_CacheStats.WriteCount++; + } } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 133cb42d7..25fd23b93 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -252,11 +252,12 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc return; } -void +bool ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU(OptionalBatchHandle ? "Z$::Namespace::Put(Batched)" : "Z$::Namespace::Put"); @@ -268,8 +269,12 @@ ZenCacheNamespace::Put(std::string_view InBucket, ZEN_ASSERT(Value.Value.Size()); ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr; - m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle); - m_WriteCount++; + bool RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle); + if (RetVal) + { + m_WriteCount++; + } + return RetVal; } bool @@ -716,13 +721,14 @@ ZenCacheStore::Get(const CacheRequestContext& Context, m_MissCount++; } -void +bool ZenCacheStore::Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatch* OptionalBatchHandle) { // Ad hoc rejection of known bad usage patterns for DDC bucket names @@ -730,7 +736,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (IsKnownBadBucketName(Bucket)) { m_RejectedWriteCount++; - return; + return false; } ZEN_MEMSCOPE(GetCacheStoreTag()); @@ -760,9 +766,16 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr; - Store->Put(Bucket, HashKey, Value, References, BatchHandle); - m_WriteCount++; - return; + bool RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle); + if (RetVal) + { + m_WriteCount++; + } + else + { + m_RejectedWriteCount++; + } + return RetVal; } ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'", @@ -770,6 +783,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, Namespace, Bucket, HashKey.ToHexString()); + return false; } bool @@ -1363,7 +1377,7 @@ TEST_CASE("cachestore.store") Value.Value = Obj.GetBuffer().AsIoBuffer(); Value.Value.SetContentType(ZenContentType::kCbObject); - Zcs.Put("test_bucket"sv, Key, Value, {}); + Zcs.Put("test_bucket"sv, Key, Value, {}, false); } for (int i = 0; i < kIterationCount; ++i) @@ -1417,7 +1431,7 @@ TEST_CASE("cachestore.size") const size_t Bucket = Key % 4; std::string BucketName = fmt::format("test_bucket-{}", Bucket); IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t)); - Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}); + Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false); Keys.push_back({BucketName, Hash}); } CacheSize = Zcs.StorageSize(); @@ -1471,7 +1485,7 @@ TEST_CASE("cachestore.size") for (size_t Key = 0; Key < Count; ++Key) { const size_t Bucket = Key % 4; - Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}); + Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false); } CacheSize = Zcs.StorageSize(); @@ -1554,7 +1568,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}); + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); WorkCompleted.fetch_add(1); }); } @@ -1635,7 +1649,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) for (const auto& Chunk : NewChunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}); + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); AddedChunkCount.fetch_add(1); WorkCompleted.fetch_add(1); }); @@ -1740,14 +1754,14 @@ TEST_CASE("cachestore.namespaces") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}); + Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}, false); ZenCacheValue GetValue; CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue)); // This should just be dropped as we don't allow creating of namespaces on the fly - Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}); + Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}, false); CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue)); } @@ -1763,7 +1777,7 @@ TEST_CASE("cachestore.namespaces") IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); Buffer2.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue2 = {.Value = Buffer2}; - Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}); + Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}, false); ZenCacheValue GetValue; CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); @@ -1805,7 +1819,7 @@ TEST_CASE("cachestore.drop.bucket") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}); + Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false); return Key; }; auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { @@ -1878,7 +1892,7 @@ TEST_CASE("cachestore.drop.namespace") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}); + Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false); return Key; }; auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { @@ -1964,7 +1978,7 @@ TEST_CASE("cachestore.blocked.disklayer.put") size_t Key = Buffer.Size(); IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t)); - Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}); + Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}, false); ZenCacheValue BufferGet; CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); @@ -1974,7 +1988,7 @@ TEST_CASE("cachestore.blocked.disklayer.put") Buffer2.SetContentType(ZenContentType::kCbObject); // We should be able to overwrite even if the file is open for read - Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}); + Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}, false); MemoryView OldView = BufferGet.Value.GetView(); @@ -2065,7 +2079,7 @@ TEST_CASE("cachestore.scrub") AttachmentHashes.push_back(Attachment.DecodeRawHash()); CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back()); } - Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes); + Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes, false); } }; @@ -2114,7 +2128,8 @@ TEST_CASE("cachestore.newgc.basics") {.Value = Record.second, .RawSize = Record.second.GetSize(), .RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())}, - AttachmentKeys); + AttachmentKeys, + false); for (const auto& Attachment : Attachments) { CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash()); @@ -2130,7 +2145,8 @@ TEST_CASE("cachestore.newgc.basics") {.Value = CacheValue.second, .RawSize = CacheValue.second.GetSize(), .RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())}, - {}); + {}, + false); CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}}); return Key; }; diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index b0b4f22cb..7de707a7f 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -178,10 +178,11 @@ public: PutBatchHandle* BeginPutBatch(std::vector& OutResult); void EndPutBatch(PutBatchHandle* Batch) noexcept; - void Put(std::string_view Bucket, + bool Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle); bool Drop(); bool DropBucket(std::string_view Bucket); @@ -228,13 +229,17 @@ public: void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector& OutResult); - void EndPutBatch(PutBatchHandle* Batch) noexcept; - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, PutBatchHandle* OptionalBatchHandle); - uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); - bool Drop(); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); + PutBatchHandle* BeginPutBatch(std::vector& OutResult); + void EndPutBatch(PutBatchHandle* Batch) noexcept; + bool Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + bool Drop(); + void Flush(); + void ScrubStorage(ScrubContext& Ctx); RwLock::SharedLockScope GetGcReferencerLock(); struct ReferencesStats diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index da8cf69fe..d489a5386 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -60,6 +60,7 @@ enum class PutResult { Success, Fail, + Conflict, Invalid, }; diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 82fec9b0e..1b5e0b76b 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -91,10 +91,11 @@ public: bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); - void Put(std::string_view Bucket, + bool Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Bucket); @@ -243,12 +244,13 @@ public: const IoHash& HashKey, GetBatch& BatchHandle); - void Put(const CacheRequestContext& Context, + bool Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatch* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Namespace, std::string_view Bucket); -- cgit v1.2.3