From 0bbd1fb43bbd9f878a2aa326ef06f2dc503a3b3f Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Wed, 26 Feb 2025 11:49:05 -0700 Subject: Enforce Overwrite Prevention According To Cache Policy Overwrite with differing value should be denied if QueryLocal is not present and StoreLocal is present. Overwrite with equal value should succeed regardless of policy flags. --- src/zenstore/cache/cachedisklayer.cpp | 93 ++++++++++- src/zenstore/cache/cacherpc.cpp | 170 +++++++++++++-------- src/zenstore/cache/structuredcachestore.cpp | 62 +++++--- .../include/zenstore/cache/cachedisklayer.h | 21 ++- src/zenstore/include/zenstore/cache/cacherpc.h | 1 + .../include/zenstore/cache/structuredcachestore.h | 6 +- 6 files changed, 252 insertions(+), 101 deletions(-) (limited to 'src/zenstore') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 25f68330a..eaed1f64e 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1793,16 +1793,98 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -void +static bool +ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t ValueRawSize = 0; + IoHash ValueRawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && + (RawHashProvider() == ValueRawHash); + } + + return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); +} + +static bool +ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) +{ + if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) + { + return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); + } + else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t Value1RawSize = 0; + IoHash Value1RawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && + ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); + } + + return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); +} + +bool ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + if (!Overwrite) + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + auto It = m_Index.find(HashKey); + if (It != m_Index.end()) + { + PayloadIndex EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = m_Payloads[EntryIndex].Location; + + bool ComparisonComplete = false; + const BucketPayload* Payload = &m_Payloads[EntryIndex]; + if (Payload->MetaData) + { + const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; + if (MetaData) + { + if (!ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(false); + } + return false; + } + ComparisonComplete = true; + } + } + + if (!ComparisonComplete) + { + IndexLock.ReleaseNow(); + ZenCacheValue ExistingValue; + if (Get(HashKey, ExistingValue) && !ValueMatchesValue(Value, ExistingValue)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(false); + } + return false; + } + } + } + } + if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { PutStandaloneCacheValue(HashKey, Value, References); @@ -1817,6 +1899,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } m_DiskWriteCount++; + return true; } uint64_t @@ -3835,21 +3918,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -void +bool ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - + bool RetVal = false; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); - Bucket->Put(HashKey, Value, References, BucketBatchHandle); + RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle); TryMemCacheTrim(); } + return RetVal; } void diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index cca51e63e..94072d22d 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -422,7 +422,19 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr); + bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && + !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); + if (!m_CacheStore.Put(Request.Context, + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return PutResult::Conflict; + } m_CacheStats.WriteCount++; if (!WriteAttachmentBuffers.empty()) @@ -753,18 +765,23 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal); std::vector ReferencedAttachments; ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = {Request.RecordCacheValue}}, - ReferencedAttachments, - nullptr); + if (!m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return; + } m_CacheStats.WriteCount++; } ParseValues(Request); @@ -962,20 +979,25 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { - IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal); + IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); if (RawSize == 0) { RawSize = Chunk.DecodeRawSize(); } - m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Batch.get()); - m_CacheStats.WriteCount++; + bool PutSucceeded = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + Batch.get()); + if (PutSucceeded) + { + m_CacheStats.WriteCount++; + } if (Batch) { BatchResultIndexes.push_back(Results.size()); @@ -983,7 +1005,7 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con } else { - Results.push_back(true); + Results.push_back(PutSucceeded); } TransferredSize = Chunk.GetCompressedSize(); } @@ -1225,6 +1247,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + const bool Overwrite = StoreData && !EnumHasAllFlags(Request.Policy, CachePolicy::QueryLocal); const bool IsHit = SkipData || HasData; if (IsHit) { @@ -1235,14 +1258,18 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO if (HasData && StoreData) { - m_CacheStore.Put(Context, - *Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + if (m_CacheStore.Put( + Context, + *Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, + {}, + Overwrite, + nullptr)) + { + m_CacheStats.WriteCount++; + } } ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", @@ -1510,36 +1537,47 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, if (!UpstreamRecordRequests.empty()) { - const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context]( - CacheRecordGetCompleteParams&& Params) { - if (!Params.Record) - { - return; - } - CacheKeyRequest& RecordKey = Params.Request; - size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); - RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast(Params.ElapsedSeconds * 1000000.0); - RecordBody& Record = Records[RecordIndex]; - - const CacheKey& Key = RecordKey.Key; - Record.Exists = true; - CbObject ObjectBuffer = CbObject::Clone(Params.Record); - Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); - Record.CacheValue.SetContentType(ZenContentType::kCbObject); - Record.Source = Params.Source; - - bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); - if (StoreLocal) - { - std::vector ReferencedAttachments; - ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { - const IoHash ValueHash = HashView.AsHash(); - ReferencedAttachments.push_back(ValueHash); - }); - m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr); - m_CacheStats.WriteCount++; - } - }; + const auto OnCacheRecordGetComplete = + [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) + { + return; + } + CacheKeyRequest& RecordKey = Params.Request; + size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); + RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast(Params.ElapsedSeconds * 1000000.0); + RecordBody& Record = Records[RecordIndex]; + + const CacheKey& Key = RecordKey.Key; + Record.Exists = true; + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Record.CacheValue.SetContentType(ZenContentType::kCbObject); + Record.Source = Params.Source; + + bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); + if (StoreLocal) + { + bool Overwrite = !EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal); + std::vector ReferencedAttachments; + ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + ReferencedAttachments.push_back(ValueHash); + }); + if (!m_CacheStore.Put(Context, + Namespace, + Key.Bucket, + Key.Hash, + {.Value = Record.CacheValue}, + ReferencedAttachments, + Overwrite, + nullptr)) + { + return; + } + m_CacheStats.WriteCount++; + } + }; m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } @@ -1748,20 +1786,24 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed(); if (StoreLocal) { + bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal); if (Request.IsRecordRequest) { m_CidStore.AddChunk(Params.Value, Params.RawHash); } else { - m_CacheStore.Put(Context, - Namespace, - Key.Key.Bucket, - Key.Key.Hash, - {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, - {}, - nullptr); - m_CacheStats.WriteCount++; + if (m_CacheStore.Put(Context, + Namespace, + Key.Key.Bucket, + Key.Key.Hash, + {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, + {}, + Overwrite, + nullptr)) + { + m_CacheStats.WriteCount++; + } } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 133cb42d7..25fd23b93 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -252,11 +252,12 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc return; } -void +bool ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU(OptionalBatchHandle ? "Z$::Namespace::Put(Batched)" : "Z$::Namespace::Put"); @@ -268,8 +269,12 @@ ZenCacheNamespace::Put(std::string_view InBucket, ZEN_ASSERT(Value.Value.Size()); ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr; - m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle); - m_WriteCount++; + bool RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle); + if (RetVal) + { + m_WriteCount++; + } + return RetVal; } bool @@ -716,13 +721,14 @@ ZenCacheStore::Get(const CacheRequestContext& Context, m_MissCount++; } -void +bool ZenCacheStore::Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatch* OptionalBatchHandle) { // Ad hoc rejection of known bad usage patterns for DDC bucket names @@ -730,7 +736,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (IsKnownBadBucketName(Bucket)) { m_RejectedWriteCount++; - return; + return false; } ZEN_MEMSCOPE(GetCacheStoreTag()); @@ -760,9 +766,16 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr; - Store->Put(Bucket, HashKey, Value, References, BatchHandle); - m_WriteCount++; - return; + bool RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle); + if (RetVal) + { + m_WriteCount++; + } + else + { + m_RejectedWriteCount++; + } + return RetVal; } ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'", @@ -770,6 +783,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, Namespace, Bucket, HashKey.ToHexString()); + return false; } bool @@ -1363,7 +1377,7 @@ TEST_CASE("cachestore.store") Value.Value = Obj.GetBuffer().AsIoBuffer(); Value.Value.SetContentType(ZenContentType::kCbObject); - Zcs.Put("test_bucket"sv, Key, Value, {}); + Zcs.Put("test_bucket"sv, Key, Value, {}, false); } for (int i = 0; i < kIterationCount; ++i) @@ -1417,7 +1431,7 @@ TEST_CASE("cachestore.size") const size_t Bucket = Key % 4; std::string BucketName = fmt::format("test_bucket-{}", Bucket); IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t)); - Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}); + Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false); Keys.push_back({BucketName, Hash}); } CacheSize = Zcs.StorageSize(); @@ -1471,7 +1485,7 @@ TEST_CASE("cachestore.size") for (size_t Key = 0; Key < Count; ++Key) { const size_t Bucket = Key % 4; - Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}); + Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false); } CacheSize = Zcs.StorageSize(); @@ -1554,7 +1568,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) for (const auto& Chunk : Chunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}); + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); WorkCompleted.fetch_add(1); }); } @@ -1635,7 +1649,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) for (const auto& Chunk : NewChunks) { ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() { - Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}); + Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false); AddedChunkCount.fetch_add(1); WorkCompleted.fetch_add(1); }); @@ -1740,14 +1754,14 @@ TEST_CASE("cachestore.namespaces") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}); + Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}, false); ZenCacheValue GetValue; CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue)); CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue)); // This should just be dropped as we don't allow creating of namespaces on the fly - Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}); + Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}, false); CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue)); } @@ -1763,7 +1777,7 @@ TEST_CASE("cachestore.namespaces") IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer(); Buffer2.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue2 = {.Value = Buffer2}; - Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}); + Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}, false); ZenCacheValue GetValue; CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue)); @@ -1805,7 +1819,7 @@ TEST_CASE("cachestore.drop.bucket") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}); + Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false); return Key; }; auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { @@ -1878,7 +1892,7 @@ TEST_CASE("cachestore.drop.namespace") Buffer.SetContentType(ZenContentType::kCbObject); ZenCacheValue PutValue = {.Value = Buffer}; - Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}); + Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false); return Key; }; auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) { @@ -1964,7 +1978,7 @@ TEST_CASE("cachestore.blocked.disklayer.put") size_t Key = Buffer.Size(); IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t)); - Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}); + Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}, false); ZenCacheValue BufferGet; CHECK(Zcs.Get("test_bucket", HashKey, BufferGet)); @@ -1974,7 +1988,7 @@ TEST_CASE("cachestore.blocked.disklayer.put") Buffer2.SetContentType(ZenContentType::kCbObject); // We should be able to overwrite even if the file is open for read - Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}); + Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}, false); MemoryView OldView = BufferGet.Value.GetView(); @@ -2065,7 +2079,7 @@ TEST_CASE("cachestore.scrub") AttachmentHashes.push_back(Attachment.DecodeRawHash()); CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back()); } - Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes); + Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes, false); } }; @@ -2114,7 +2128,8 @@ TEST_CASE("cachestore.newgc.basics") {.Value = Record.second, .RawSize = Record.second.GetSize(), .RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())}, - AttachmentKeys); + AttachmentKeys, + false); for (const auto& Attachment : Attachments) { CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash()); @@ -2130,7 +2145,8 @@ TEST_CASE("cachestore.newgc.basics") {.Value = CacheValue.second, .RawSize = CacheValue.second.GetSize(), .RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())}, - {}); + {}, + false); CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}}); return Key; }; diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index b0b4f22cb..7de707a7f 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -178,10 +178,11 @@ public: PutBatchHandle* BeginPutBatch(std::vector& OutResult); void EndPutBatch(PutBatchHandle* Batch) noexcept; - void Put(std::string_view Bucket, + bool Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle); bool Drop(); bool DropBucket(std::string_view Bucket); @@ -228,13 +229,17 @@ public: void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector& OutResult); - void EndPutBatch(PutBatchHandle* Batch) noexcept; - void Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, PutBatchHandle* OptionalBatchHandle); - uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); - bool Drop(); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); + PutBatchHandle* BeginPutBatch(std::vector& OutResult); + void EndPutBatch(PutBatchHandle* Batch) noexcept; + bool Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + bool Drop(); + void Flush(); + void ScrubStorage(ScrubContext& Ctx); RwLock::SharedLockScope GetGcReferencerLock(); struct ReferencesStats diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index da8cf69fe..d489a5386 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -60,6 +60,7 @@ enum class PutResult { Success, Fail, + Conflict, Invalid, }; diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 82fec9b0e..1b5e0b76b 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -91,10 +91,11 @@ public: bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); - void Put(std::string_view Bucket, + bool Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Bucket); @@ -243,12 +244,13 @@ public: const IoHash& HashKey, GetBatch& BatchHandle); - void Put(const CacheRequestContext& Context, + bool Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatch* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Namespace, std::string_view Bucket); -- cgit v1.2.3 From 6243ad171f12c3ba9d99244612e5c0d4991c3369 Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Wed, 26 Feb 2025 14:30:36 -0700 Subject: Move utility methods in cachedisklayer Value comparison methods moved to more appropriate area in file. --- src/zenstore/cache/cachedisklayer.cpp | 76 +++++++++++++++++------------------ 1 file changed, 38 insertions(+), 38 deletions(-) (limited to 'src/zenstore') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index eaed1f64e..54f0c4bfc 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -233,6 +233,42 @@ using namespace std::literals; namespace zen::cache::impl { +static bool +ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t ValueRawSize = 0; + IoHash ValueRawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && + (RawHashProvider() == ValueRawHash); + } + + return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); +} + +static bool +ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) +{ + if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) + { + return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); + } + else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t Value1RawSize = 0; + IoHash Value1RawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && + ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); + } + + return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); +} + class BucketManifestSerializer { using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; @@ -1793,42 +1829,6 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -static bool -ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) -{ - if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) - { - return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - uint64_t ValueRawSize = 0; - IoHash ValueRawHash = IoHash::Zero; - return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && - (RawHashProvider() == ValueRawHash); - } - - return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); -} - -static bool -ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) -{ - if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) - { - return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); - } - else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - uint64_t Value1RawSize = 0; - IoHash Value1RawHash = IoHash::Zero; - return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && - ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); - } - - return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); -} - bool ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, @@ -1857,7 +1857,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; if (MetaData) { - if (!ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) + if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) { if (OptionalBatchHandle) { @@ -1873,7 +1873,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { IndexLock.ReleaseNow(); ZenCacheValue ExistingValue; - if (Get(HashKey, ExistingValue) && !ValueMatchesValue(Value, ExistingValue)) + if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) { if (OptionalBatchHandle) { -- cgit v1.2.3 From 6d07b0437ccb7800652708f76a7ee84e551f43cf Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Sun, 2 Mar 2025 00:15:35 -0700 Subject: Control overwrite enforcement with a config setting --- src/zenstore/cache/cachedisklayer.cpp | 3 ++- src/zenstore/include/zenstore/cache/cachedisklayer.h | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) (limited to 'src/zenstore') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 54f0c4bfc..d3748e70f 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1840,7 +1840,8 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); - if (!Overwrite) + const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; + if (CheckExisting) { RwLock::SharedLockScope IndexLock(m_IndexLock); auto It = m_Index.find(HashKey); diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 7de707a7f..239b0d1aa 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -113,6 +113,7 @@ public: uint32_t PayloadAlignment = 1u << 4; uint64_t MemCacheSizeThreshold = 1 * 1024; uint64_t LargeObjectThreshold = 128 * 1024; + bool LimitOverwrites = false; }; struct Configuration -- cgit v1.2.3 From 25985163796ba45b028b40662146e44e8eff47a8 Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Mon, 24 Mar 2025 23:30:03 -0600 Subject: Establish TODOs and unit test for rejected PUT propagation --- src/zenstore/cache/cacherpc.cpp | 1 + 1 file changed, 1 insertion(+) (limited to 'src/zenstore') diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 94072d22d..5b36437f2 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -424,6 +424,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag CacheValue.Value.SetContentType(ZenContentType::kCbObject); bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); + // TODO: Propagation for rejected PUTs if (!m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, -- cgit v1.2.3 From 081b55a5cf3d9af66d4d0be64fc38ea0055acede Mon Sep 17 00:00:00 2001 From: zousar Date: Tue, 24 Jun 2025 00:42:13 -0600 Subject: Change to PutResult structure Result structure contains status and a string message (may be empty) --- src/zenstore/cache/cachedisklayer.cpp | 67 ++++++--- src/zenstore/cache/cacherpc.cpp | 157 ++++++++++++--------- src/zenstore/cache/structuredcachestore.cpp | 20 +-- .../include/zenstore/cache/cachedisklayer.h | 32 +++-- src/zenstore/include/zenstore/cache/cacherpc.h | 11 +- src/zenstore/include/zenstore/cache/cacheshared.h | 8 ++ .../include/zenstore/cache/structuredcachestore.h | 40 +++--- 7 files changed, 201 insertions(+), 134 deletions(-) (limited to 'src/zenstore') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index d3748e70f..72a767645 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -233,6 +233,25 @@ using namespace std::literals; namespace zen::cache::impl { +static bool +GetValueRawSizeAndHash(const ZenCacheValue& Value, uint64_t& OutValueRawSize, IoHash& OutValueRawHash) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + OutValueRawSize = Value.RawSize; + OutValueRawHash = Value.RawHash; + return true; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + return CompressedBuffer::ValidateCompressedHeader(Value.Value, OutValueRawHash, OutValueRawSize); + } + + OutValueRawSize = Value.Value.GetSize(); + OutValueRawHash = IoHash::HashBuffer(Value.Value); + return true; +} + static bool ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) { @@ -1257,7 +1276,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { - PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} struct Entry { std::vector HashKeyAndReferences; @@ -1266,11 +1285,11 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle std::vector Entries; std::vector EntryResultIndexes; - std::vector& OutResults; + std::vector& OutResults; }; ZenCacheDiskLayer::CacheBucket::PutBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector& OutResults) +ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector& OutResults) { ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch"); return new PutBatchHandle(OutResults); @@ -1350,7 +1369,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = true; + Batch->OutResults[ResultIndex] = {zen::PutStatus::Success}; } IndexOffset += Locations.size(); }); @@ -1829,7 +1848,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -bool +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, @@ -1862,9 +1881,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}); } - return false; + return PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; } ComparisonComplete = true; } @@ -1876,11 +1899,17 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, ZenCacheValue ExistingValue; if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) { + uint64_t RawSize = 0; + IoHash RawHash = IoHash::Zero; + cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back( + PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}); } - return false; + return PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; } } } @@ -1891,7 +1920,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(true); + OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success}); } } else @@ -1900,7 +1929,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } m_DiskWriteCount++; - return true; + return PutResult{zen::PutStatus::Success}; } uint64_t @@ -2742,7 +2771,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); std::vector& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); HashKeyAndReferences.push_back(HashKey); @@ -3717,7 +3746,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) struct ZenCacheDiskLayer::PutBatchHandle { - PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -3776,13 +3805,13 @@ struct ZenCacheDiskLayer::PutBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector BucketHandles; - std::vector& OutResults; + RwLock Lock; + std::vector BucketHandles; + std::vector& OutResults; }; ZenCacheDiskLayer::PutBatchHandle* -ZenCacheDiskLayer::BeginPutBatch(std::vector& OutResults) +ZenCacheDiskLayer::BeginPutBatch(std::vector& OutResults) { return new PutBatchHandle(OutResults); } @@ -3919,7 +3948,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -bool +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, @@ -3928,7 +3957,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - bool RetVal = false; + PutResult RetVal = {zen::PutStatus::Fail}; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 5b36437f2..20c244250 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -327,13 +327,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co .Policy = std::move(Policy), .Context = Context}; - PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest); - if (Result == PutResult::Invalid) + if (Result == PutStatus::Invalid) { return CbPackage{}; } - Results.push_back(Result == PutResult::Success); + Results.push_back(Result == PutStatus::Success); } if (Results.empty()) { @@ -353,7 +353,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co return RpcResponse; } -PutResult +PutStatus CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) { CbObjectView Record = Request.RecordObject; @@ -415,7 +415,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag if (Count.Invalid > 0) { - return PutResult::Invalid; + return PutStatus::Invalid; } ZenCacheValue CacheValue; @@ -425,16 +425,17 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) && !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal); // TODO: Propagation for rejected PUTs - if (!m_CacheStore.Put(Request.Context, - Request.Namespace, - Request.Key.Bucket, - Request.Key.Hash, - CacheValue, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context, + Request.Namespace, + Request.Key.Bucket, + Request.Key.Hash, + CacheValue, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { - return PutResult::Conflict; + return PutResult.Status; } m_CacheStats.WriteCount++; @@ -472,7 +473,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); } - return PutResult::Success; + return PutStatus::Success; } CbPackage @@ -772,14 +773,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - if (!m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = {Request.RecordCacheValue}}, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = {Request.RecordCacheValue}}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { return; } @@ -928,11 +930,11 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector BatchResults; - std::vector BatchResultIndexes; - std::vector Results; - std::vector UpstreamCacheKeys; - uint64_t RequestCount = RequestsArray.Num(); + std::vector BatchResults; + std::vector BatchResultIndexes; + std::vector Results; + std::vector UpstreamCacheKeys; + uint64_t RequestCount = RequestsArray.Num(); { Results.reserve(RequestCount); std::unique_ptr Batch; @@ -987,32 +989,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { RawSize = Chunk.DecodeRawSize(); } - bool PutSucceeded = m_CacheStore.Put(Context, - *Namespace, - Key.Bucket, - Key.Hash, - {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, - {}, - Overwrite, - Batch.get()); - if (PutSucceeded) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + *Namespace, + Key.Bucket, + Key.Hash, + {.Value = Value, .RawSize = RawSize, .RawHash = RawHash}, + {}, + Overwrite, + Batch.get()); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } if (Batch) { BatchResultIndexes.push_back(Results.size()); - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail}); } else { - Results.push_back(PutSucceeded); + Results.push_back(PutResult); } TransferredSize = Chunk.GetCompressedSize(); } else { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); } Valid = true; } @@ -1028,12 +1030,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { - Results.push_back(true); + Results.push_back({zen::PutStatus::Success}); Valid = true; } else { - Results.push_back(false); + Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)}); } } // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. @@ -1068,13 +1070,13 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con { size_t BatchResultIndex = BatchResultIndexes[Index]; ZEN_ASSERT(BatchResultIndex < Results.size()); - ZEN_ASSERT(Results[BatchResultIndex] == false); + ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success); Results[BatchResultIndex] = BatchResults[Index]; } for (std::size_t Index = 0; Index < Results.size(); Index++) { - if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty) + if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty) { m_UpstreamCache.EnqueueUpstream( {.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]}); @@ -1084,11 +1086,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response"); CbObjectWriter ResponseObject{1024}; ResponseObject.BeginArray("Result"sv); - for (bool Value : Results) + bool bAnyErrors = false; + for (const ZenCacheStore::PutResult& Value : Results) { - ResponseObject.AddBool(Value); + if (Value.Status == zen::PutStatus::Success) + { + ResponseObject.AddBool(true); + } + else + { + bAnyErrors = true; + ResponseObject.AddBool(false); + } } ResponseObject.EndArray(); + if (bAnyErrors) + { + ResponseObject.BeginArray("ErrorMessages"sv); + for (const ZenCacheStore::PutResult& Value : Results) + { + if (Value.Status != zen::PutStatus::Success) + { + ResponseObject.AddString(Value.Message); + } + } + ResponseObject.EndArray(); + } CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); @@ -1259,15 +1282,16 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO if (HasData && StoreData) { - if (m_CacheStore.Put( - Context, - *Namespace, - Request.Key.Bucket, - Request.Key.Hash, - ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, - {}, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put( + Context, + *Namespace, + Request.Key.Bucket, + Request.Key.Hash, + ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash}, + {}, + Overwrite, + nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } @@ -1565,14 +1589,15 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, const IoHash ValueHash = HashView.AsHash(); ReferencedAttachments.push_back(ValueHash); }); - if (!m_CacheStore.Put(Context, - Namespace, - Key.Bucket, - Key.Hash, - {.Value = Record.CacheValue}, - ReferencedAttachments, - Overwrite, - nullptr)) + ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context, + Namespace, + Key.Bucket, + Key.Hash, + {.Value = Record.CacheValue}, + ReferencedAttachments, + Overwrite, + nullptr); + if (PutResult.Status != zen::PutStatus::Success) { return; } @@ -1794,14 +1819,16 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context, } else { - if (m_CacheStore.Put(Context, + ZenCacheStore::PutResult PutResult = + m_CacheStore.Put(Context, Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash}, {}, Overwrite, - nullptr)) + nullptr); + if (PutResult.Status == zen::PutStatus::Success) { m_CacheStats.WriteCount++; } diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index 25fd23b93..a3f80099f 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -154,7 +154,7 @@ struct ZenCacheNamespace::PutBatchHandle }; ZenCacheNamespace::PutBatchHandle* -ZenCacheNamespace::BeginPutBatch(std::vector& OutResult) +ZenCacheNamespace::BeginPutBatch(std::vector& OutResult) { ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle; Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult); @@ -252,7 +252,7 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc return; } -bool +ZenCacheNamespace::PutResult ZenCacheNamespace::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, @@ -269,8 +269,8 @@ ZenCacheNamespace::Put(std::string_view InBucket, ZEN_ASSERT(Value.Value.Size()); ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr; - bool RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle); - if (RetVal) + PutResult RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle); + if (RetVal.Status == zen::PutStatus::Success) { m_WriteCount++; } @@ -558,7 +558,7 @@ ZenCacheStore::LogWorker() } } -ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector& OutResult) +ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector& OutResult) : m_CacheStore(CacheStore) { ZEN_MEMSCOPE(GetCacheStoreTag()); @@ -721,7 +721,7 @@ ZenCacheStore::Get(const CacheRequestContext& Context, m_MissCount++; } -bool +ZenCacheStore::PutResult ZenCacheStore::Put(const CacheRequestContext& Context, std::string_view Namespace, std::string_view Bucket, @@ -736,7 +736,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (IsKnownBadBucketName(Bucket)) { m_RejectedWriteCount++; - return false; + return PutResult{zen::PutStatus::Invalid, "Bad bucket name"}; } ZEN_MEMSCOPE(GetCacheStoreTag()); @@ -766,8 +766,8 @@ ZenCacheStore::Put(const CacheRequestContext& Context, if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store) { ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr; - bool RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle); - if (RetVal) + PutResult RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle); + if (RetVal.Status == zen::PutStatus::Success) { m_WriteCount++; } @@ -783,7 +783,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context, Namespace, Bucket, HashKey.ToHexString()); - return false; + return PutResult{zen::PutStatus::Fail, fmt::format("Unknown namespace '{}'", Namespace)}; } bool diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 239b0d1aa..4f5c905ee 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -166,6 +166,12 @@ public: uint64_t MemorySize; }; + struct PutResult + { + zen::PutStatus Status; + std::string Message; + }; + explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheDiskLayer(); @@ -176,19 +182,19 @@ public: void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector& OutResult); + PutBatchHandle* BeginPutBatch(std::vector& OutResult); void EndPutBatch(PutBatchHandle* Batch) noexcept; - bool Put(std::string_view Bucket, - const IoHash& HashKey, - const ZenCacheValue& Value, - std::span References, - bool Overwrite, - PutBatchHandle* OptionalBatchHandle); - bool Drop(); - bool DropBucket(std::string_view Bucket); - void Flush(); - void ScrubStorage(ScrubContext& Ctx); + PutResult Put(std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle); + bool Drop(); + bool DropBucket(std::string_view Bucket); + void Flush(); + void ScrubStorage(ScrubContext& Ctx); void DiscoverBuckets(); GcStorageSize StorageSize() const; @@ -230,9 +236,9 @@ public: void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector& OutResult); + PutBatchHandle* BeginPutBatch(std::vector& OutResult); void EndPutBatch(PutBatchHandle* Batch) noexcept; - bool Put(const IoHash& HashKey, + PutResult Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, bool Overwrite, diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h index d489a5386..104746aba 100644 --- a/src/zenstore/include/zenstore/cache/cacherpc.h +++ b/src/zenstore/include/zenstore/cache/cacherpc.h @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -56,14 +57,6 @@ struct CacheStats std::atomic_uint64_t RpcChunkBatchRequests{}; }; -enum class PutResult -{ - Success, - Fail, - Conflict, - Invalid, -}; - /** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. @@ -108,7 +101,7 @@ private: CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest); CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest); - PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); + PutStatus PutCacheRecord(PutRequestData& Request, const CbPackage* Package); /** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */ bool ParseGetCacheChunksRequest(std::string& Namespace, diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h index 9b45c7b21..dc0c341d0 100644 --- a/src/zenstore/include/zenstore/cache/cacheshared.h +++ b/src/zenstore/include/zenstore/cache/cacheshared.h @@ -65,6 +65,14 @@ struct CacheContentStats std::vector Attachments; }; +enum class PutStatus +{ + Success, + Fail, + Conflict, + Invalid, +}; + bool IsKnownBadBucketName(std::string_view BucketName); bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer); diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h index 1b5e0b76b..581f7861b 100644 --- a/src/zenstore/include/zenstore/cache/structuredcachestore.h +++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h @@ -78,25 +78,27 @@ public: ZenCacheDiskLayer::DiskStats DiskStats; }; + using PutResult = ZenCacheDiskLayer::PutResult; + ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config); ~ZenCacheNamespace(); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector& OutResults); + PutBatchHandle* BeginPutBatch(std::vector& OutResults); void EndPutBatch(PutBatchHandle* Batch) noexcept; struct GetBatchHandle; GetBatchHandle* BeginGetBatch(std::vector& OutResults); void EndGetBatch(GetBatchHandle* Batch) noexcept; - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); - bool Put(std::string_view Bucket, - const IoHash& HashKey, - const ZenCacheValue& Value, - std::span References, - bool Overwrite, - PutBatchHandle* OptionalBatchHandle = nullptr); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle); + PutResult Put(std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Bucket); void EnumerateBucketContents(std::string_view Bucket, @@ -197,6 +199,8 @@ public: std::vector NamespaceStats; }; + using PutResult = ZenCacheNamespace::PutResult; + ZenCacheStore(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& BasePath, @@ -207,7 +211,7 @@ public: class PutBatch { public: - PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector& OutResult); + PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector& OutResult); ~PutBatch(); private: @@ -244,14 +248,14 @@ public: const IoHash& HashKey, GetBatch& BatchHandle); - bool Put(const CacheRequestContext& Context, - std::string_view Namespace, - std::string_view Bucket, - const IoHash& HashKey, - const ZenCacheValue& Value, - std::span References, - bool Overwrite, - PutBatch* OptionalBatchHandle = nullptr); + PutResult Put(const CacheRequestContext& Context, + std::string_view Namespace, + std::string_view Bucket, + const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + PutBatch* OptionalBatchHandle = nullptr); bool DropBucket(std::string_view Namespace, std::string_view Bucket); bool DropNamespace(std::string_view Namespace); -- cgit v1.2.3 From 45793f40c44e60185b149e5030539c679e874990 Mon Sep 17 00:00:00 2001 From: zousar Date: Tue, 24 Jun 2025 16:47:19 -0600 Subject: xmake precommit --- src/zenstore/cache/cacherpc.cpp | 8 ++++---- src/zenstore/include/zenstore/cache/cachedisklayer.h | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) (limited to 'src/zenstore') diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index 436e8a083..5d9a68919 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -944,10 +944,10 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con const bool HasUpstream = m_UpstreamCache.IsActive(); CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); - std::vector BatchResults; - eastl::fixed_vector BatchResultIndexes; - eastl::fixed_vector Results; - eastl::fixed_vector UpstreamCacheKeys; + std::vector BatchResults; + eastl::fixed_vector BatchResultIndexes; + eastl::fixed_vector Results; + eastl::fixed_vector UpstreamCacheKeys; uint64_t RequestCount = RequestsArray.Num(); { diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 11d13bede..903916137 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -244,14 +244,14 @@ public: void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle); struct PutBatchHandle; - PutBatchHandle* BeginPutBatch(std::vector& OutResult); - void EndPutBatch(PutBatchHandle* Batch) noexcept; + PutBatchHandle* BeginPutBatch(std::vector& OutResult); + void EndPutBatch(PutBatchHandle* Batch) noexcept; PutResult Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, bool Overwrite, PutBatchHandle* OptionalBatchHandle); - uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); + uint64_t MemCacheTrim(GcClock::TimePoint ExpireTime); std::function Drop(); void Flush(); void ScrubStorage(ScrubContext& Ctx); -- cgit v1.2.3 From 4f3c5221253b627282ce6405e79f9c90dacc9b62 Mon Sep 17 00:00:00 2001 From: zousar Date: Tue, 5 Aug 2025 23:33:56 -0600 Subject: Moving put rejections to happen in batch handling --- src/zenstore/cache/cachedisklayer.cpp | 120 +++++++++++++++------ .../include/zenstore/cache/cachedisklayer.h | 22 ++-- 2 files changed, 101 insertions(+), 41 deletions(-) (limited to 'src/zenstore') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 60475e30c..33d8489c6 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1374,6 +1374,7 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle struct Entry { std::vector HashKeyAndReferences; + bool Overwrite; }; std::vector Buffers; std::vector Entries; @@ -1417,14 +1418,39 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept size_t IndexOffset = 0; m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); - std::vector DiskEntries; + std::vector DiskEntries; + std::vector OutResults; + OutResults.reserve(Locations.size()); { + // Initial pass without an exclusive index lock to process put rejections + for (size_t Index = 0; Index < Locations.size(); Index++) + { + const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() > 0); + + ZenCacheValue TemporaryValue; + TemporaryValue.Value = Batch->Buffers[IndexOffset + Index]; + std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + OutResults.push_back({zen::PutStatus::Success}); + ShouldRejectPut(HashKeyAndReferences[0], + TemporaryValue, + ReferenceSpan, + Batch->Entries[IndexOffset + Index].Overwrite, + OutResults.back()); + } + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { + if (OutResults[Index].Status != zen::PutStatus::Success) + { + // The put was rejected, skip any effort to commit it. + continue; + } + DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 1); + ZEN_ASSERT(HashKeyAndReferences.size() > 0); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); if (m_TrackedCacheKeys) @@ -1463,7 +1489,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = {zen::PutStatus::Success}; + Batch->OutResults[ResultIndex] = std::move(OutResults[Index]); } IndexOffset += Locations.size(); }); @@ -1960,16 +1986,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -ZenCacheDiskLayer::PutResult -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span References, - bool Overwrite, - PutBatchHandle* OptionalBatchHandle) +bool +ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + ZenCacheDiskLayer::PutResult& OutPutResult) { - ZEN_TRACE_CPU("Z$::Bucket::Put"); - - metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + ZEN_UNUSED(References); const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; if (CheckExisting) @@ -1991,15 +2015,10 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) { - if (OptionalBatchHandle) - { - OptionalBatchHandle->OutResults.push_back(PutResult{ - zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}); - } - return PutResult{ + OutPutResult = PutResult{ zen::PutStatus::Conflict, fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; + return true; } ComparisonComplete = true; } @@ -2007,6 +2026,12 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, if (!ComparisonComplete) { + // We must release the index lock before calling Get as that will acquire it. + // Having the Get method not acquire the index lock is not viable because Get has potentially multiple acquisitions + // of the index lock, first as a shared lock, then as an exclusive lock. If the caller has an exclusive lock, we + // could just accept that, and not have Get do any lock acquisitions, but it creates situations where Get is doing + // slow operations (eg: reading a file from disk) while under an exclusive index lock, and this would lead to + // responsiveness issues where zenserver could fail to respond to requests because of a long-held exclusive lock. IndexLock.ReleaseNow(); ZenCacheValue ExistingValue; if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) @@ -2014,21 +2039,39 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, uint64_t RawSize = 0; IoHash RawHash = IoHash::Zero; cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash); - if (OptionalBatchHandle) - { - OptionalBatchHandle->OutResults.push_back( - PutResult{zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}); - } - return PutResult{zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; + OutPutResult = PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; + return true; } } } } + return false; +} + +ZenCacheDiskLayer::PutResult +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle) +{ + ZEN_TRACE_CPU("Z$::Bucket::Put"); + + metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + + PutResult Result{zen::PutStatus::Success}; if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { + if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(Result); + } + return Result; + } PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { @@ -2037,11 +2080,11 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } else { - PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle); + Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle); } m_DiskWriteCount++; - return PutResult{zen::PutStatus::Success}; + return Result; } uint64_t @@ -2890,25 +2933,35 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck return {}; } -void +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); + PutResult Result{zen::PutStatus::Success}; if (OptionalBatchHandle != nullptr) { OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); + PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back(); + CurrentEntry.Overwrite = Overwrite; std::vector& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; - HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); + HashKeyAndReferences.reserve(1 + References.size()); HashKeyAndReferences.push_back(HashKey); - HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end()); - return; + HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end()); + return Result; + } + + if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + { + return Result; } + uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) @@ -2958,6 +3011,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, } m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); + return Result; } std::string diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index 903916137..023dd1ffa 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -396,14 +396,20 @@ public: virtual std::vector CreateReferenceCheckers(GcCtx& Ctx) override; virtual std::vector CreateReferenceValidators(GcCtx& Ctx) override; - void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; - void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References); - IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; - void PutInlineCacheValue(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span References, - PutBatchHandle* OptionalBatchHandle = nullptr); - IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; + void BuildPath(PathBuilderBase& Path, const IoHash& HashKey) const; + bool ShouldRejectPut(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + ZenCacheDiskLayer::PutResult& OutPutResult); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References); + IoBuffer GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey) const; + PutResult PutInlineCacheValue(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle = nullptr); + IoBuffer GetInlineCacheValue(const DiskLocation& Loc) const; CacheValueDetails::ValueDetails GetValueDetails(RwLock::SharedLockScope&, const IoHash& Key, PayloadIndex Index) const; void SetMetaData(RwLock::ExclusiveLockScope&, -- cgit v1.2.3 From f9b2a1e64e4077925c4a57ac3fc2367c698dada1 Mon Sep 17 00:00:00 2001 From: zousar Date: Thu, 7 Aug 2025 01:06:03 -0600 Subject: Avoid committing chunks for batch rejected puts Previously rejected puts would put the chunks, but not write them to the index, which was wrong. --- src/zenstore/cache/cachedisklayer.cpp | 77 ++++++++++++++++------------------- 1 file changed, 35 insertions(+), 42 deletions(-) (limited to 'src/zenstore') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 33d8489c6..4535f785b 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1400,56 +1400,55 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept ZEN_ASSERT(Batch); if (!Batch->Buffers.empty()) { - std::vector EntryFlags; - for (const IoBuffer& Buffer : Batch->Buffers) + ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size()); + std::vector EntryFlags; + std::vector BufferToEntryIndexes; + std::vector BuffersToCommit; + BuffersToCommit.reserve(Batch->Buffers.size()); + for (size_t Index = 0; Index < Batch->Entries.size(); Index++) { - uint8_t Flags = 0; - if (Buffer.GetContentType() == ZenContentType::kCbObject) + const std::vector& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() > 0); + + ZenCacheValue TemporaryValue; + TemporaryValue.Value = Batch->Buffers[Index]; + std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]]; + OutResult = PutResult{zen::PutStatus::Success}; + if (!ShouldRejectPut(HashKeyAndReferences[0], + TemporaryValue, + ReferenceSpan, + Batch->Entries[Index].Overwrite, + OutResult)) { - Flags |= DiskLocation::kStructured; - } - else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) - { - Flags |= DiskLocation::kCompressed; + BufferToEntryIndexes.push_back(Index); + BuffersToCommit.push_back(TemporaryValue.Value); + + uint8_t Flags = 0; + if (TemporaryValue.Value.GetContentType() == ZenContentType::kCbObject) + { + Flags |= DiskLocation::kStructured; + } + else if (TemporaryValue.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + Flags |= DiskLocation::kCompressed; + } + EntryFlags.push_back(Flags); } - EntryFlags.push_back(Flags); } size_t IndexOffset = 0; - m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span Locations) { + m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); std::vector DiskEntries; - std::vector OutResults; - OutResults.reserve(Locations.size()); { - // Initial pass without an exclusive index lock to process put rejections - for (size_t Index = 0; Index < Locations.size(); Index++) - { - const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 0); - - ZenCacheValue TemporaryValue; - TemporaryValue.Value = Batch->Buffers[IndexOffset + Index]; - std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); - OutResults.push_back({zen::PutStatus::Success}); - ShouldRejectPut(HashKeyAndReferences[0], - TemporaryValue, - ReferenceSpan, - Batch->Entries[IndexOffset + Index].Overwrite, - OutResults.back()); - } RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { - if (OutResults[Index].Status != zen::PutStatus::Success) - { - // The put was rejected, skip any effort to commit it. - continue; - } - DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); - const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; + const std::vector& HashKeyAndReferences = + Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences; ZEN_ASSERT(HashKeyAndReferences.size() > 0); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); @@ -1485,12 +1484,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept } } m_SlogFile.Append(DiskEntries); - for (size_t Index = 0; Index < Locations.size(); Index++) - { - size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; - ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = std::move(OutResults[Index]); - } IndexOffset += Locations.size(); }); } -- cgit v1.2.3 From f1496f149259303601c0ed60747034c54873c4ac Mon Sep 17 00:00:00 2001 From: zousar Date: Thu, 7 Aug 2025 01:07:39 -0600 Subject: precommit --- src/zenstore/cache/cachedisklayer.cpp | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) (limited to 'src/zenstore') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 4535f785b..502ab1508 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1402,7 +1402,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size()); std::vector EntryFlags; - std::vector BufferToEntryIndexes; + std::vector BufferToEntryIndexes; std::vector BuffersToCommit; BuffersToCommit.reserve(Batch->Buffers.size()); for (size_t Index = 0; Index < Batch->Entries.size(); Index++) @@ -1414,12 +1414,8 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept TemporaryValue.Value = Batch->Buffers[Index]; std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]]; - OutResult = PutResult{zen::PutStatus::Success}; - if (!ShouldRejectPut(HashKeyAndReferences[0], - TemporaryValue, - ReferenceSpan, - Batch->Entries[Index].Overwrite, - OutResult)) + OutResult = PutResult{zen::PutStatus::Success}; + if (!ShouldRejectPut(HashKeyAndReferences[0], TemporaryValue, ReferenceSpan, Batch->Entries[Index].Overwrite, OutResult)) { BufferToEntryIndexes.push_back(Index); BuffersToCommit.push_back(TemporaryValue.Value); @@ -1440,9 +1436,8 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept size_t IndexOffset = 0; m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); - std::vector DiskEntries; + std::vector DiskEntries; { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { -- cgit v1.2.3