From 0bbd1fb43bbd9f878a2aa326ef06f2dc503a3b3f Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Wed, 26 Feb 2025 11:49:05 -0700 Subject: Enforce Overwrite Prevention According To Cache Policy Overwrite with differing value should be denied if QueryLocal is not present and StoreLocal is present. Overwrite with equal value should succeed regardless of policy flags. --- src/zenstore/cache/cachedisklayer.cpp | 93 +++++++++++++++++++++++++++++++++-- 1 file changed, 89 insertions(+), 4 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 25f68330a..eaed1f64e 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1793,16 +1793,98 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -void +static bool +ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t ValueRawSize = 0; + IoHash ValueRawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && + (RawHashProvider() == ValueRawHash); + } + + return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); +} + +static bool +ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) +{ + if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) + { + return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); + } + else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t Value1RawSize = 0; + IoHash Value1RawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && + ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); + } + + return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); +} + +bool ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + if (!Overwrite) + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + auto It = m_Index.find(HashKey); + if (It != m_Index.end()) + { + PayloadIndex EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = m_Payloads[EntryIndex].Location; + + bool ComparisonComplete = false; + const BucketPayload* Payload = &m_Payloads[EntryIndex]; + if (Payload->MetaData) + { + const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; + if (MetaData) + { + if (!ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(false); + } + return false; + } + ComparisonComplete = true; + } + } + + if (!ComparisonComplete) + { + IndexLock.ReleaseNow(); + ZenCacheValue ExistingValue; + if (Get(HashKey, ExistingValue) && !ValueMatchesValue(Value, ExistingValue)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(false); + } + return false; + } + } + } + } + if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { PutStandaloneCacheValue(HashKey, Value, References); @@ -1817,6 +1899,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } m_DiskWriteCount++; + return true; } uint64_t @@ -3835,21 +3918,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -void +bool ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - + bool RetVal = false; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); - Bucket->Put(HashKey, Value, References, BucketBatchHandle); + RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle); TryMemCacheTrim(); } + return RetVal; } void -- cgit v1.2.3 From 6243ad171f12c3ba9d99244612e5c0d4991c3369 Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Wed, 26 Feb 2025 14:30:36 -0700 Subject: Move utility methods in cachedisklayer Value comparison methods moved to more appropriate area in file. --- src/zenstore/cache/cachedisklayer.cpp | 76 +++++++++++++++++------------------ 1 file changed, 38 insertions(+), 38 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index eaed1f64e..54f0c4bfc 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -233,6 +233,42 @@ using namespace std::literals; namespace zen::cache::impl { +static bool +ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t ValueRawSize = 0; + IoHash ValueRawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && + (RawHashProvider() == ValueRawHash); + } + + return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); +} + +static bool +ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) +{ + if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) + { + return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); + } + else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t Value1RawSize = 0; + IoHash Value1RawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && + ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); + } + + return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); +} + class BucketManifestSerializer { using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; @@ -1793,42 +1829,6 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -static bool -ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) -{ - if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) - { - return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - uint64_t ValueRawSize = 0; - IoHash ValueRawHash = IoHash::Zero; - return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && - (RawHashProvider() == ValueRawHash); - } - - return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); -} - -static bool -ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) -{ - if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) - { - return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); - } - else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - uint64_t Value1RawSize = 0; - IoHash Value1RawHash = IoHash::Zero; - return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && - ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); - } - - return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); -} - bool ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, @@ -1857,7 +1857,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; if (MetaData) { - if (!ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) + if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) { if (OptionalBatchHandle) { @@ -1873,7 +1873,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { IndexLock.ReleaseNow(); ZenCacheValue ExistingValue; - if (Get(HashKey, ExistingValue) && !ValueMatchesValue(Value, ExistingValue)) + if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) { if (OptionalBatchHandle) { -- cgit v1.2.3 From 6d07b0437ccb7800652708f76a7ee84e551f43cf Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Sun, 2 Mar 2025 00:15:35 -0700 Subject: Control overwrite enforcement with a config setting --- src/zenstore/cache/cachedisklayer.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 54f0c4bfc..d3748e70f 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1840,7 +1840,8 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); - if (!Overwrite) + const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; + if (CheckExisting) { RwLock::SharedLockScope IndexLock(m_IndexLock); auto It = m_Index.find(HashKey); -- cgit v1.2.3 From 081b55a5cf3d9af66d4d0be64fc38ea0055acede Mon Sep 17 00:00:00 2001 From: zousar Date: Tue, 24 Jun 2025 00:42:13 -0600 Subject: Change to PutResult structure Result structure contains status and a string message (may be empty) --- src/zenstore/cache/cachedisklayer.cpp | 67 +++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 19 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index d3748e70f..72a767645 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -233,6 +233,25 @@ using namespace std::literals; namespace zen::cache::impl { +static bool +GetValueRawSizeAndHash(const ZenCacheValue& Value, uint64_t& OutValueRawSize, IoHash& OutValueRawHash) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + OutValueRawSize = Value.RawSize; + OutValueRawHash = Value.RawHash; + return true; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + return CompressedBuffer::ValidateCompressedHeader(Value.Value, OutValueRawHash, OutValueRawSize); + } + + OutValueRawSize = Value.Value.GetSize(); + OutValueRawHash = IoHash::HashBuffer(Value.Value); + return true; +} + static bool ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) { @@ -1257,7 +1276,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { - PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} struct Entry { std::vector HashKeyAndReferences; @@ -1266,11 +1285,11 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle std::vector Entries; std::vector EntryResultIndexes; - std::vector& OutResults; + std::vector& OutResults; }; ZenCacheDiskLayer::CacheBucket::PutBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector& OutResults) +ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector& OutResults) { ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch"); return new PutBatchHandle(OutResults); @@ -1350,7 +1369,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = true; + Batch->OutResults[ResultIndex] = {zen::PutStatus::Success}; } IndexOffset += Locations.size(); }); @@ -1829,7 +1848,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -bool +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, @@ -1862,9 +1881,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}); } - return false; + return PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; } ComparisonComplete = true; } @@ -1876,11 +1899,17 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, ZenCacheValue ExistingValue; if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) { + uint64_t RawSize = 0; + IoHash RawHash = IoHash::Zero; + cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back( + PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}); } - return false; + return PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; } } } @@ -1891,7 +1920,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(true); + OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success}); } } else @@ -1900,7 +1929,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } m_DiskWriteCount++; - return true; + return PutResult{zen::PutStatus::Success}; } uint64_t @@ -2742,7 +2771,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); std::vector& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); HashKeyAndReferences.push_back(HashKey); @@ -3717,7 +3746,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) struct ZenCacheDiskLayer::PutBatchHandle { - PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -3776,13 +3805,13 @@ struct ZenCacheDiskLayer::PutBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector BucketHandles; - std::vector& OutResults; + RwLock Lock; + std::vector BucketHandles; + std::vector& OutResults; }; ZenCacheDiskLayer::PutBatchHandle* -ZenCacheDiskLayer::BeginPutBatch(std::vector& OutResults) +ZenCacheDiskLayer::BeginPutBatch(std::vector& OutResults) { return new PutBatchHandle(OutResults); } @@ -3919,7 +3948,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -bool +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, @@ -3928,7 +3957,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - bool RetVal = false; + PutResult RetVal = {zen::PutStatus::Fail}; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); -- cgit v1.2.3 From 7905fcd4faaad12d846b803d9eb5ed538def1610 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 4 Aug 2025 17:06:47 +0200 Subject: add hardening for legacy cache bucket manifests (#454) --- src/zenstore/cache/cachedisklayer.cpp | 51 +++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 11 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 15a1c9650..a5cab885f 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -348,11 +348,20 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck Stopwatch Timer; const auto _ = MakeGuard([&] { ZEN_INFO("parsed store manifest '{}' in {}", ManifestPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); - const uint64_t Count = Manifest["Count"sv].AsUInt64(0); + const uint64_t Count = Manifest["Count"sv].AsUInt64(0); + CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); + if (KeyArray.Num() != Count) + { + ZEN_WARN("Mismatch in size between 'Keys' ({}) array size and 'Count' ({}) in {}, skipping metadata", + KeyArray.Num(), + Count, + ManifestPath); + return; + } + std::vector KeysIndexes; KeysIndexes.reserve(Count); - CbArrayView KeyArray = Manifest["Keys"sv].AsArrayView(); for (CbFieldView& KeyView : KeyArray) { if (auto It = Index.find(KeyView.AsHash()); It != Index.end()) @@ -367,19 +376,43 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck size_t KeyIndexOffset = 0; CbArrayView TimeStampArray = Manifest["Timestamps"].AsArrayView(); - for (CbFieldView& TimeStampView : TimeStampArray) + if (KeysIndexes.size() != TimeStampArray.Num()) { - const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; - if (KeyIndex) + ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'Timestamps' ({}) arrays in {}, skipping timestamps", + KeysIndexes.size(), + TimeStampArray.Num(), + ManifestPath); + } + else + { + for (CbFieldView& TimeStampView : TimeStampArray) { - AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + const PayloadIndex KeyIndex = KeysIndexes[KeyIndexOffset++]; + if (KeyIndex) + { + AccessTimes[KeyIndex] = TimeStampView.AsInt64(); + } } } KeyIndexOffset = 0; CbArrayView RawHashArray = Manifest["RawHash"].AsArrayView(); CbArrayView RawSizeArray = Manifest["RawSize"].AsArrayView(); - if (RawHashArray.Num() == RawSizeArray.Num()) + if (RawHashArray.Num() != KeysIndexes.size()) + { + ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'RawHash' ({}) arrays in {}, skipping meta data", + KeysIndexes.size(), + RawHashArray.Num(), + ManifestPath); + } + else if (RawSizeArray.Num() != KeysIndexes.size()) + { + ZEN_WARN("Mismatch in size between 'Keys' ({}) and 'RawSize' ({}) arrays in {}, skipping meta data", + KeysIndexes.size(), + RawSizeArray.Num(), + ManifestPath); + } + else { auto RawHashIt = RawHashArray.CreateViewIterator(); auto RawSizeIt = RawSizeArray.CreateViewIterator(); @@ -404,10 +437,6 @@ BucketManifestSerializer::ParseManifest(RwLock::ExclusiveLockScope& Buck RawSizeIt++; } } - else - { - ZEN_WARN("Mismatch in size between 'RawHash' and 'RawSize' arrays in {}, skipping meta data", ManifestPath); - } } Oid -- cgit v1.2.3 From 4f3c5221253b627282ce6405e79f9c90dacc9b62 Mon Sep 17 00:00:00 2001 From: zousar Date: Tue, 5 Aug 2025 23:33:56 -0600 Subject: Moving put rejections to happen in batch handling --- src/zenstore/cache/cachedisklayer.cpp | 120 ++++++++++++++++++++++++---------- 1 file changed, 87 insertions(+), 33 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 60475e30c..33d8489c6 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1374,6 +1374,7 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle struct Entry { std::vector HashKeyAndReferences; + bool Overwrite; }; std::vector Buffers; std::vector Entries; @@ -1417,14 +1418,39 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept size_t IndexOffset = 0; m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); - std::vector DiskEntries; + std::vector DiskEntries; + std::vector OutResults; + OutResults.reserve(Locations.size()); { + // Initial pass without an exclusive index lock to process put rejections + for (size_t Index = 0; Index < Locations.size(); Index++) + { + const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() > 0); + + ZenCacheValue TemporaryValue; + TemporaryValue.Value = Batch->Buffers[IndexOffset + Index]; + std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + OutResults.push_back({zen::PutStatus::Success}); + ShouldRejectPut(HashKeyAndReferences[0], + TemporaryValue, + ReferenceSpan, + Batch->Entries[IndexOffset + Index].Overwrite, + OutResults.back()); + } + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { + if (OutResults[Index].Status != zen::PutStatus::Success) + { + // The put was rejected, skip any effort to commit it. + continue; + } + DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 1); + ZEN_ASSERT(HashKeyAndReferences.size() > 0); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); if (m_TrackedCacheKeys) @@ -1463,7 +1489,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = {zen::PutStatus::Success}; + Batch->OutResults[ResultIndex] = std::move(OutResults[Index]); } IndexOffset += Locations.size(); }); @@ -1960,16 +1986,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -ZenCacheDiskLayer::PutResult -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span References, - bool Overwrite, - PutBatchHandle* OptionalBatchHandle) +bool +ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + ZenCacheDiskLayer::PutResult& OutPutResult) { - ZEN_TRACE_CPU("Z$::Bucket::Put"); - - metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + ZEN_UNUSED(References); const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; if (CheckExisting) @@ -1991,15 +2015,10 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) { - if (OptionalBatchHandle) - { - OptionalBatchHandle->OutResults.push_back(PutResult{ - zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}); - } - return PutResult{ + OutPutResult = PutResult{ zen::PutStatus::Conflict, fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; + return true; } ComparisonComplete = true; } @@ -2007,6 +2026,12 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, if (!ComparisonComplete) { + // We must release the index lock before calling Get as that will acquire it. + // Having the Get method not acquire the index lock is not viable because Get has potentially multiple acquisitions + // of the index lock, first as a shared lock, then as an exclusive lock. If the caller has an exclusive lock, we + // could just accept that, and not have Get do any lock acquisitions, but it creates situations where Get is doing + // slow operations (eg: reading a file from disk) while under an exclusive index lock, and this would lead to + // responsiveness issues where zenserver could fail to respond to requests because of a long-held exclusive lock. IndexLock.ReleaseNow(); ZenCacheValue ExistingValue; if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) @@ -2014,21 +2039,39 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, uint64_t RawSize = 0; IoHash RawHash = IoHash::Zero; cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash); - if (OptionalBatchHandle) - { - OptionalBatchHandle->OutResults.push_back( - PutResult{zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}); - } - return PutResult{zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; + OutPutResult = PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; + return true; } } } } + return false; +} + +ZenCacheDiskLayer::PutResult +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle) +{ + ZEN_TRACE_CPU("Z$::Bucket::Put"); + + metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + + PutResult Result{zen::PutStatus::Success}; if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { + if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(Result); + } + return Result; + } PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { @@ -2037,11 +2080,11 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } else { - PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle); + Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle); } m_DiskWriteCount++; - return PutResult{zen::PutStatus::Success}; + return Result; } uint64_t @@ -2890,25 +2933,35 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck return {}; } -void +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); + PutResult Result{zen::PutStatus::Success}; if (OptionalBatchHandle != nullptr) { OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); + PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back(); + CurrentEntry.Overwrite = Overwrite; std::vector& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; - HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); + HashKeyAndReferences.reserve(1 + References.size()); HashKeyAndReferences.push_back(HashKey); - HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end()); - return; + HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end()); + return Result; + } + + if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + { + return Result; } + uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) @@ -2958,6 +3011,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, } m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); + return Result; } std::string -- cgit v1.2.3 From 11d80722e209b14e132c0344f5cb10124d3fa8cb Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 6 Aug 2025 17:00:51 +0200 Subject: add the correct set of references hashes in batched inline mode (#459) --- src/zenstore/cache/cachedisklayer.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index a5cab885f..fdf879e1f 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1369,7 +1369,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 1); + ZEN_ASSERT(HashKeyAndReferences.size() >= 1); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); if (m_TrackedCacheKeys) @@ -2791,9 +2791,9 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); OptionalBatchHandle->OutResults.push_back(false); std::vector& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; - HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); + HashKeyAndReferences.reserve(1 + References.size()); HashKeyAndReferences.push_back(HashKey); - HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end()); + HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end()); return; } uint8_t EntryFlags = 0; -- cgit v1.2.3 From f9b2a1e64e4077925c4a57ac3fc2367c698dada1 Mon Sep 17 00:00:00 2001 From: zousar Date: Thu, 7 Aug 2025 01:06:03 -0600 Subject: Avoid committing chunks for batch rejected puts Previously rejected puts would put the chunks, but not write them to the index, which was wrong. --- src/zenstore/cache/cachedisklayer.cpp | 77 ++++++++++++++++------------------- 1 file changed, 35 insertions(+), 42 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 33d8489c6..4535f785b 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1400,56 +1400,55 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept ZEN_ASSERT(Batch); if (!Batch->Buffers.empty()) { - std::vector EntryFlags; - for (const IoBuffer& Buffer : Batch->Buffers) + ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size()); + std::vector EntryFlags; + std::vector BufferToEntryIndexes; + std::vector BuffersToCommit; + BuffersToCommit.reserve(Batch->Buffers.size()); + for (size_t Index = 0; Index < Batch->Entries.size(); Index++) { - uint8_t Flags = 0; - if (Buffer.GetContentType() == ZenContentType::kCbObject) + const std::vector& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() > 0); + + ZenCacheValue TemporaryValue; + TemporaryValue.Value = Batch->Buffers[Index]; + std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]]; + OutResult = PutResult{zen::PutStatus::Success}; + if (!ShouldRejectPut(HashKeyAndReferences[0], + TemporaryValue, + ReferenceSpan, + Batch->Entries[Index].Overwrite, + OutResult)) { - Flags |= DiskLocation::kStructured; - } - else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) - { - Flags |= DiskLocation::kCompressed; + BufferToEntryIndexes.push_back(Index); + BuffersToCommit.push_back(TemporaryValue.Value); + + uint8_t Flags = 0; + if (TemporaryValue.Value.GetContentType() == ZenContentType::kCbObject) + { + Flags |= DiskLocation::kStructured; + } + else if (TemporaryValue.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + Flags |= DiskLocation::kCompressed; + } + EntryFlags.push_back(Flags); } - EntryFlags.push_back(Flags); } size_t IndexOffset = 0; - m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span Locations) { + m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); std::vector DiskEntries; - std::vector OutResults; - OutResults.reserve(Locations.size()); { - // Initial pass without an exclusive index lock to process put rejections - for (size_t Index = 0; Index < Locations.size(); Index++) - { - const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 0); - - ZenCacheValue TemporaryValue; - TemporaryValue.Value = Batch->Buffers[IndexOffset + Index]; - std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); - OutResults.push_back({zen::PutStatus::Success}); - ShouldRejectPut(HashKeyAndReferences[0], - TemporaryValue, - ReferenceSpan, - Batch->Entries[IndexOffset + Index].Overwrite, - OutResults.back()); - } RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { - if (OutResults[Index].Status != zen::PutStatus::Success) - { - // The put was rejected, skip any effort to commit it. - continue; - } - DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); - const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; + const std::vector& HashKeyAndReferences = + Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences; ZEN_ASSERT(HashKeyAndReferences.size() > 0); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); @@ -1485,12 +1484,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept } } m_SlogFile.Append(DiskEntries); - for (size_t Index = 0; Index < Locations.size(); Index++) - { - size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; - ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = std::move(OutResults[Index]); - } IndexOffset += Locations.size(); }); } -- cgit v1.2.3 From f1496f149259303601c0ed60747034c54873c4ac Mon Sep 17 00:00:00 2001 From: zousar Date: Thu, 7 Aug 2025 01:07:39 -0600 Subject: precommit --- src/zenstore/cache/cachedisklayer.cpp | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 4535f785b..502ab1508 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1402,7 +1402,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size()); std::vector EntryFlags; - std::vector BufferToEntryIndexes; + std::vector BufferToEntryIndexes; std::vector BuffersToCommit; BuffersToCommit.reserve(Batch->Buffers.size()); for (size_t Index = 0; Index < Batch->Entries.size(); Index++) @@ -1414,12 +1414,8 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept TemporaryValue.Value = Batch->Buffers[Index]; std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]]; - OutResult = PutResult{zen::PutStatus::Success}; - if (!ShouldRejectPut(HashKeyAndReferences[0], - TemporaryValue, - ReferenceSpan, - Batch->Entries[Index].Overwrite, - OutResult)) + OutResult = PutResult{zen::PutStatus::Success}; + if (!ShouldRejectPut(HashKeyAndReferences[0], TemporaryValue, ReferenceSpan, Batch->Entries[Index].Overwrite, OutResult)) { BufferToEntryIndexes.push_back(Index); BuffersToCommit.push_back(TemporaryValue.Value); @@ -1440,9 +1436,8 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept size_t IndexOffset = 0; m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); - std::vector DiskEntries; + std::vector DiskEntries; { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { -- cgit v1.2.3 From 30bd2b7a618c4c9d1978d68c91e90ee388b48b24 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 12 Aug 2025 17:23:19 +0200 Subject: reduce lock contention when checking for disk cache put reject (#465) keep rawsize and rawhash if available when using batch for inline puts keep rawsize and rawhash of input value if we have calculated it for validation already --- src/zenstore/cache/cachedisklayer.cpp | 172 ++++++++++++++++------------------ 1 file changed, 81 insertions(+), 91 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index e0030230f..219caca01 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -234,58 +234,25 @@ using namespace std::literals; namespace zen::cache::impl { static bool -GetValueRawSizeAndHash(const ZenCacheValue& Value, uint64_t& OutValueRawSize, IoHash& OutValueRawHash) +UpdateValueWithRawSizeAndHash(ZenCacheValue& Value) { - if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + if ((Value.RawSize == 0) && (Value.RawHash == IoHash::Zero)) { - OutValueRawSize = Value.RawSize; - OutValueRawHash = Value.RawHash; - return true; - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - return CompressedBuffer::ValidateCompressedHeader(Value.Value, OutValueRawHash, OutValueRawSize); - } - - OutValueRawSize = Value.Value.GetSize(); - OutValueRawHash = IoHash::HashBuffer(Value.Value); - return true; -} - -static bool -ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) -{ - if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) - { - return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); + if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + return CompressedBuffer::ValidateCompressedHeader(Value.Value, Value.RawHash, Value.RawSize); + } + else + { + Value.RawSize = Value.Value.GetSize(); + Value.RawHash = IoHash::HashBuffer(Value.Value); + return true; + } } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + else { - uint64_t ValueRawSize = 0; - IoHash ValueRawHash = IoHash::Zero; - return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && - (RawHashProvider() == ValueRawHash); + return true; } - - return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); -} - -static bool -ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) -{ - if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) - { - return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); - } - else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - uint64_t Value1RawSize = 0; - IoHash Value1RawHash = IoHash::Zero; - return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && - ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); - } - - return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); } class BucketManifestSerializer @@ -1376,9 +1343,9 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle std::vector HashKeyAndReferences; bool Overwrite; }; - std::vector Buffers; - std::vector Entries; - std::vector EntryResultIndexes; + std::vector Buffers; + std::vector Entries; + std::vector EntryResultIndexes; std::vector& OutResults; }; @@ -1410,22 +1377,21 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept const std::vector& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences; ZEN_ASSERT(HashKeyAndReferences.size() >= 1); - ZenCacheValue TemporaryValue; - TemporaryValue.Value = Batch->Buffers[Index]; + ZenCacheValue& Value = Batch->Buffers[Index]; std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]]; OutResult = PutResult{zen::PutStatus::Success}; - if (!ShouldRejectPut(HashKeyAndReferences[0], TemporaryValue, ReferenceSpan, Batch->Entries[Index].Overwrite, OutResult)) + if (!ShouldRejectPut(HashKeyAndReferences[0], Value, Batch->Entries[Index].Overwrite, OutResult)) { BufferToEntryIndexes.push_back(Index); - BuffersToCommit.push_back(TemporaryValue.Value); + BuffersToCommit.push_back(Value.Value); uint8_t Flags = 0; - if (TemporaryValue.Value.GetContentType() == ZenContentType::kCbObject) + if (Value.Value.GetContentType() == ZenContentType::kCbObject) { Flags |= DiskLocation::kStructured; } - else if (TemporaryValue.Value.GetContentType() == ZenContentType::kCompressedBinary) + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) { Flags |= DiskLocation::kCompressed; } @@ -1976,13 +1942,10 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal bool ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span References, + ZenCacheValue& InOutValue, bool Overwrite, ZenCacheDiskLayer::PutResult& OutPutResult) { - ZEN_UNUSED(References); - const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; if (CheckExisting) { @@ -1990,46 +1953,71 @@ ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey, auto It = m_Index.find(HashKey); if (It != m_Index.end()) { - PayloadIndex EntryIndex = It.value(); - m_AccessTimes[EntryIndex] = GcClock::TickCount(); - DiskLocation Location = m_Payloads[EntryIndex].Location; + const PayloadIndex EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + const DiskLocation Location = m_Payloads[EntryIndex].Location; - bool ComparisonComplete = false; - const BucketPayload* Payload = &m_Payloads[EntryIndex]; + const BucketPayload* Payload = &m_Payloads[EntryIndex]; if (Payload->MetaData) { - const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; + const BucketMetaData MetaData = m_MetaDatas[Payload->MetaData]; if (MetaData) { - if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) + IndexLock.ReleaseNow(); + if (!cache::impl::UpdateValueWithRawSizeAndHash(InOutValue)) + { + OutPutResult = PutResult{zen::PutStatus::Fail, "Value provided is of bad format"}; + return true; + } + else if (MetaData.RawSize != InOutValue.RawSize || MetaData.RawHash != InOutValue.RawHash) { OutPutResult = PutResult{ zen::PutStatus::Conflict, fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; return true; } - ComparisonComplete = true; + return false; } } - if (!ComparisonComplete) + ZenCacheValue ExistingValue; + if (Payload->MemCached) { - // We must release the index lock before calling Get as that will acquire it. - // Having the Get method not acquire the index lock is not viable because Get has potentially multiple acquisitions - // of the index lock, first as a shared lock, then as an exclusive lock. If the caller has an exclusive lock, we - // could just accept that, and not have Get do any lock acquisitions, but it creates situations where Get is doing - // slow operations (eg: reading a file from disk) while under an exclusive index lock, and this would lead to - // responsiveness issues where zenserver could fail to respond to requests because of a long-held exclusive lock. + ExistingValue.Value = m_MemCachedPayloads[Payload->MemCached].Payload; IndexLock.ReleaseNow(); - ZenCacheValue ExistingValue; - if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) + } + else + { + IndexLock.ReleaseNow(); + + if (Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + ExistingValue.Value = GetStandaloneCacheValue(Location, HashKey); + } + else { - uint64_t RawSize = 0; - IoHash RawHash = IoHash::Zero; - cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash); - OutPutResult = PutResult{zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; - return true; + ExistingValue.Value = GetInlineCacheValue(Location); + } + } + + if (ExistingValue.Value) + { + if (cache::impl::UpdateValueWithRawSizeAndHash(ExistingValue)) + { + if (!cache::impl::UpdateValueWithRawSizeAndHash(InOutValue)) + { + OutPutResult = PutResult{zen::PutStatus::Fail, "Value provided is of bad format"}; + return true; + } + + if (ExistingValue.RawSize != InOutValue.RawSize || ExistingValue.RawHash != InOutValue.RawHash) + { + OutPutResult = PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", + ExistingValue.RawSize, + ExistingValue.RawHash)}; + return true; + } } } } @@ -2052,7 +2040,8 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { - if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + ZenCacheValue AcceptedValue = Value; + if (ShouldRejectPut(HashKey, AcceptedValue, Overwrite, Result)) { if (OptionalBatchHandle) { @@ -2060,7 +2049,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } return Result; } - PutStandaloneCacheValue(HashKey, Value, References); + PutStandaloneCacheValue(HashKey, AcceptedValue, References); if (OptionalBatchHandle) { OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success}); @@ -2932,7 +2921,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, PutResult Result{zen::PutStatus::Success}; if (OptionalBatchHandle != nullptr) { - OptionalBatchHandle->Buffers.push_back(Value.Value); + OptionalBatchHandle->Buffers.push_back(Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); @@ -2945,24 +2934,25 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, return Result; } - if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + ZenCacheValue AcceptedValue = Value; + if (ShouldRejectPut(HashKey, AcceptedValue, Overwrite, Result)) { return Result; } uint8_t EntryFlags = 0; - if (Value.Value.GetContentType() == ZenContentType::kCbObject) + if (AcceptedValue.Value.GetContentType() == ZenContentType::kCbObject) { EntryFlags |= DiskLocation::kStructured; } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + else if (AcceptedValue.Value.GetContentType() == ZenContentType::kCompressedBinary) { EntryFlags |= DiskLocation::kCompressed; } - m_BlockStore.WriteChunk(Value.Value.Data(), - Value.Value.Size(), + m_BlockStore.WriteChunk(AcceptedValue.Value.Data(), + AcceptedValue.Value.Size(), m_Configuration.PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { ZEN_MEMSCOPE(GetCacheDiskTag()); -- cgit v1.2.3 From 4c05d1041461b630cd5770dae5e8d03147d5674b Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 20 Aug 2025 12:33:03 +0200 Subject: per namespace/project cas prep refactor (#470) - Refactor so we can have more than one cas store for project store and cache. - Refactor `UpstreamCacheClient` so it is not tied to a specific CidStore - Refactor scrub to keep the GC interface ScrubStorage function separate from scrub accessor functions (renamed to Scrub). - Refactor storage size to keep GC interface StorageSize function separate from size accessor functions (renamed to TotalSize) - Refactor cache storage so `ZenCacheDiskLayer::CacheBucket` implements GcStorage interface rather than `ZenCacheNamespace` --- src/zenstore/cache/cachedisklayer.cpp | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 219caca01..cacbbd966 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -798,6 +798,7 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, m_Configuration.LargeObjectThreshold = Max(m_Configuration.LargeObjectThreshold, IoStoreDDCOverrideSize); } m_Gc.AddGcReferencer(*this); + m_Gc.AddGcStorage(this); } ZenCacheDiskLayer::CacheBucket::~CacheBucket() @@ -812,6 +813,7 @@ ZenCacheDiskLayer::CacheBucket::~CacheBucket() { ZEN_ERROR("~CacheBucket() failed with: ", Ex.what()); } + m_Gc.RemoveGcStorage(this); m_Gc.RemoveGcReferencer(*this); } @@ -2587,7 +2589,7 @@ ZenCacheDiskLayer::CacheBucket::ScrubStorage(ScrubContext& Ctx) ZenCacheDiskLayer::BucketStats ZenCacheDiskLayer::CacheBucket::Stats() { - GcStorageSize Size = StorageSize(); + CacheStoreSize Size = TotalSize(); return ZenCacheDiskLayer::BucketStats{.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize, .DiskHitCount = m_DiskHitCount, @@ -4417,8 +4419,9 @@ ZenCacheDiskLayer::Flush() } } +#if ZEN_WITH_TESTS void -ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) +ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { ZEN_TRACE_CPU("Z$::ScrubStorage"); @@ -4429,13 +4432,13 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) for (auto& Kv : m_Buckets) { -#if 1 +# if 1 Results.push_back(Ctx.ThreadPool().EnqueueTask( std::packaged_task{[this, Bucket = Kv.second.get(), &Ctx] { Bucket->ScrubStorage(Ctx); }})); -#else +# else CacheBucket& Bucket = *Kv.second; Bucket.ScrubStorage(Ctx); -#endif +# endif } for (auto& Result : Results) @@ -4451,16 +4454,17 @@ ZenCacheDiskLayer::ScrubStorage(ScrubContext& Ctx) } } } +#endif // ZEN_WITH_TESTS -GcStorageSize -ZenCacheDiskLayer::StorageSize() const +CacheStoreSize +ZenCacheDiskLayer::TotalSize() const { - GcStorageSize StorageSize{}; + CacheStoreSize StorageSize{}; RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { - GcStorageSize BucketSize = Kv.second->StorageSize(); + CacheStoreSize BucketSize = Kv.second->TotalSize(); StorageSize.DiskSize += BucketSize.DiskSize; StorageSize.MemorySize += BucketSize.MemorySize; } @@ -4471,7 +4475,7 @@ ZenCacheDiskLayer::StorageSize() const ZenCacheDiskLayer::DiskStats ZenCacheDiskLayer::Stats() const { - GcStorageSize Size = StorageSize(); + CacheStoreSize Size = TotalSize(); ZenCacheDiskLayer::DiskStats Stats = {.DiskSize = Size.DiskSize, .MemorySize = Size.MemorySize}; { RwLock::SharedLockScope _(m_Lock); @@ -4495,7 +4499,7 @@ ZenCacheDiskLayer::GetInfo() const { Info.BucketNames.push_back(Kv.first); Info.EntryCount += Kv.second->EntryCount(); - GcStorageSize BucketSize = Kv.second->StorageSize(); + CacheStoreSize BucketSize = Kv.second->TotalSize(); Info.StorageSize.DiskSize += BucketSize.DiskSize; Info.StorageSize.MemorySize += BucketSize.MemorySize; } @@ -4510,7 +4514,7 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) { - return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->StorageSize()}; + return ZenCacheDiskLayer::BucketInfo{.EntryCount = It->second->EntryCount(), .StorageSize = It->second->TotalSize()}; } return {}; } -- cgit v1.2.3