From 0bbd1fb43bbd9f878a2aa326ef06f2dc503a3b3f Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Wed, 26 Feb 2025 11:49:05 -0700 Subject: Enforce Overwrite Prevention According To Cache Policy Overwrite with differing value should be denied if QueryLocal is not present and StoreLocal is present. Overwrite with equal value should succeed regardless of policy flags. --- src/zenstore/cache/cachedisklayer.cpp | 93 +++++++++++++++++++++++++++++++++-- 1 file changed, 89 insertions(+), 4 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 25f68330a..eaed1f64e 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1793,16 +1793,98 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -void +static bool +ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t ValueRawSize = 0; + IoHash ValueRawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && + (RawHashProvider() == ValueRawHash); + } + + return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); +} + +static bool +ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) +{ + if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) + { + return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); + } + else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t Value1RawSize = 0; + IoHash Value1RawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && + ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); + } + + return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); +} + +bool ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::Put"); metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + if (!Overwrite) + { + RwLock::SharedLockScope IndexLock(m_IndexLock); + auto It = m_Index.find(HashKey); + if (It != m_Index.end()) + { + PayloadIndex EntryIndex = It.value(); + m_AccessTimes[EntryIndex] = GcClock::TickCount(); + DiskLocation Location = m_Payloads[EntryIndex].Location; + + bool ComparisonComplete = false; + const BucketPayload* Payload = &m_Payloads[EntryIndex]; + if (Payload->MetaData) + { + const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; + if (MetaData) + { + if (!ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(false); + } + return false; + } + ComparisonComplete = true; + } + } + + if (!ComparisonComplete) + { + IndexLock.ReleaseNow(); + ZenCacheValue ExistingValue; + if (Get(HashKey, ExistingValue) && !ValueMatchesValue(Value, ExistingValue)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(false); + } + return false; + } + } + } + } + if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { PutStandaloneCacheValue(HashKey, Value, References); @@ -1817,6 +1899,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } m_DiskWriteCount++; + return true; } uint64_t @@ -3835,21 +3918,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -void +bool ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - + bool RetVal = false; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); - Bucket->Put(HashKey, Value, References, BucketBatchHandle); + RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle); TryMemCacheTrim(); } + return RetVal; } void -- cgit v1.2.3 From 6243ad171f12c3ba9d99244612e5c0d4991c3369 Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Wed, 26 Feb 2025 14:30:36 -0700 Subject: Move utility methods in cachedisklayer Value comparison methods moved to more appropriate area in file. --- src/zenstore/cache/cachedisklayer.cpp | 76 +++++++++++++++++------------------ 1 file changed, 38 insertions(+), 38 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index eaed1f64e..54f0c4bfc 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -233,6 +233,42 @@ using namespace std::literals; namespace zen::cache::impl { +static bool +ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t ValueRawSize = 0; + IoHash ValueRawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && + (RawHashProvider() == ValueRawHash); + } + + return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); +} + +static bool +ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) +{ + if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) + { + return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); + } + else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + uint64_t Value1RawSize = 0; + IoHash Value1RawHash = IoHash::Zero; + return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && + ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); + } + + return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); +} + class BucketManifestSerializer { using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex; @@ -1793,42 +1829,6 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -static bool -ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) -{ - if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) - { - return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash)); - } - else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - uint64_t ValueRawSize = 0; - IoHash ValueRawHash = IoHash::Zero; - return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) && - (RawHashProvider() == ValueRawHash); - } - - return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value)); -} - -static bool -ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2) -{ - if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero)) - { - return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; }); - } - else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary) - { - uint64_t Value1RawSize = 0; - IoHash Value1RawHash = IoHash::Zero; - return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) && - ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; }); - } - - return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); }); -} - bool ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, @@ -1857,7 +1857,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData]; if (MetaData) { - if (!ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) + if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) { if (OptionalBatchHandle) { @@ -1873,7 +1873,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { IndexLock.ReleaseNow(); ZenCacheValue ExistingValue; - if (Get(HashKey, ExistingValue) && !ValueMatchesValue(Value, ExistingValue)) + if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) { if (OptionalBatchHandle) { -- cgit v1.2.3 From 6d07b0437ccb7800652708f76a7ee84e551f43cf Mon Sep 17 00:00:00 2001 From: zousar <2936246+zousar@users.noreply.github.com> Date: Sun, 2 Mar 2025 00:15:35 -0700 Subject: Control overwrite enforcement with a config setting --- src/zenstore/cache/cachedisklayer.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 54f0c4bfc..d3748e70f 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1840,7 +1840,8 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); - if (!Overwrite) + const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; + if (CheckExisting) { RwLock::SharedLockScope IndexLock(m_IndexLock); auto It = m_Index.find(HashKey); -- cgit v1.2.3 From 081b55a5cf3d9af66d4d0be64fc38ea0055acede Mon Sep 17 00:00:00 2001 From: zousar Date: Tue, 24 Jun 2025 00:42:13 -0600 Subject: Change to PutResult structure Result structure contains status and a string message (may be empty) --- src/zenstore/cache/cachedisklayer.cpp | 67 +++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 19 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index d3748e70f..72a767645 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -233,6 +233,25 @@ using namespace std::literals; namespace zen::cache::impl { +static bool +GetValueRawSizeAndHash(const ZenCacheValue& Value, uint64_t& OutValueRawSize, IoHash& OutValueRawHash) +{ + if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero)) + { + OutValueRawSize = Value.RawSize; + OutValueRawHash = Value.RawHash; + return true; + } + else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + return CompressedBuffer::ValidateCompressedHeader(Value.Value, OutValueRawHash, OutValueRawSize); + } + + OutValueRawSize = Value.Value.GetSize(); + OutValueRawHash = IoHash::HashBuffer(Value.Value); + return true; +} + static bool ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function& RawHashProvider) { @@ -1257,7 +1276,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle { - PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} struct Entry { std::vector HashKeyAndReferences; @@ -1266,11 +1285,11 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle std::vector Entries; std::vector EntryResultIndexes; - std::vector& OutResults; + std::vector& OutResults; }; ZenCacheDiskLayer::CacheBucket::PutBatchHandle* -ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector& OutResults) +ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector& OutResults) { ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch"); return new PutBatchHandle(OutResults); @@ -1350,7 +1369,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = true; + Batch->OutResults[ResultIndex] = {zen::PutStatus::Success}; } IndexOffset += Locations.size(); }); @@ -1829,7 +1848,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -bool +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, @@ -1862,9 +1881,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}); } - return false; + return PutResult{ + zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; } ComparisonComplete = true; } @@ -1876,11 +1899,17 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, ZenCacheValue ExistingValue; if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) { + uint64_t RawSize = 0; + IoHash RawHash = IoHash::Zero; + cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back( + PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}); } - return false; + return PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; } } } @@ -1891,7 +1920,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { - OptionalBatchHandle->OutResults.push_back(true); + OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success}); } } else @@ -1900,7 +1929,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } m_DiskWriteCount++; - return true; + return PutResult{zen::PutStatus::Success}; } uint64_t @@ -2742,7 +2771,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); - OptionalBatchHandle->OutResults.push_back(false); + OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); std::vector& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); HashKeyAndReferences.push_back(HashKey); @@ -3717,7 +3746,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket) struct ZenCacheDiskLayer::PutBatchHandle { - PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} + PutBatchHandle(std::vector& OutResults) : OutResults(OutResults) {} struct BucketHandle { CacheBucket* Bucket; @@ -3776,13 +3805,13 @@ struct ZenCacheDiskLayer::PutBatchHandle return NewBucketHandle; } - RwLock Lock; - std::vector BucketHandles; - std::vector& OutResults; + RwLock Lock; + std::vector BucketHandles; + std::vector& OutResults; }; ZenCacheDiskLayer::PutBatchHandle* -ZenCacheDiskLayer::BeginPutBatch(std::vector& OutResults) +ZenCacheDiskLayer::BeginPutBatch(std::vector& OutResults) { return new PutBatchHandle(OutResults); } @@ -3919,7 +3948,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc } } -bool +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value, @@ -3928,7 +3957,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Put"); - bool RetVal = false; + PutResult RetVal = {zen::PutStatus::Fail}; if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr) { CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket); -- cgit v1.2.3 From 4f3c5221253b627282ce6405e79f9c90dacc9b62 Mon Sep 17 00:00:00 2001 From: zousar Date: Tue, 5 Aug 2025 23:33:56 -0600 Subject: Moving put rejections to happen in batch handling --- src/zenstore/cache/cachedisklayer.cpp | 120 ++++++++++++++++++++++++---------- 1 file changed, 87 insertions(+), 33 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 60475e30c..33d8489c6 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1374,6 +1374,7 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle struct Entry { std::vector HashKeyAndReferences; + bool Overwrite; }; std::vector Buffers; std::vector Entries; @@ -1417,14 +1418,39 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept size_t IndexOffset = 0; m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); - std::vector DiskEntries; + std::vector DiskEntries; + std::vector OutResults; + OutResults.reserve(Locations.size()); { + // Initial pass without an exclusive index lock to process put rejections + for (size_t Index = 0; Index < Locations.size(); Index++) + { + const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() > 0); + + ZenCacheValue TemporaryValue; + TemporaryValue.Value = Batch->Buffers[IndexOffset + Index]; + std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + OutResults.push_back({zen::PutStatus::Success}); + ShouldRejectPut(HashKeyAndReferences[0], + TemporaryValue, + ReferenceSpan, + Batch->Entries[IndexOffset + Index].Overwrite, + OutResults.back()); + } + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { + if (OutResults[Index].Status != zen::PutStatus::Success) + { + // The put was rejected, skip any effort to commit it. + continue; + } + DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 1); + ZEN_ASSERT(HashKeyAndReferences.size() > 0); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); if (m_TrackedCacheKeys) @@ -1463,7 +1489,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = {zen::PutStatus::Success}; + Batch->OutResults[ResultIndex] = std::move(OutResults[Index]); } IndexOffset += Locations.size(); }); @@ -1960,16 +1986,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -ZenCacheDiskLayer::PutResult -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span References, - bool Overwrite, - PutBatchHandle* OptionalBatchHandle) +bool +ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + ZenCacheDiskLayer::PutResult& OutPutResult) { - ZEN_TRACE_CPU("Z$::Bucket::Put"); - - metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + ZEN_UNUSED(References); const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; if (CheckExisting) @@ -1991,15 +2015,10 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) { - if (OptionalBatchHandle) - { - OptionalBatchHandle->OutResults.push_back(PutResult{ - zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}); - } - return PutResult{ + OutPutResult = PutResult{ zen::PutStatus::Conflict, fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; + return true; } ComparisonComplete = true; } @@ -2007,6 +2026,12 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, if (!ComparisonComplete) { + // We must release the index lock before calling Get as that will acquire it. + // Having the Get method not acquire the index lock is not viable because Get has potentially multiple acquisitions + // of the index lock, first as a shared lock, then as an exclusive lock. If the caller has an exclusive lock, we + // could just accept that, and not have Get do any lock acquisitions, but it creates situations where Get is doing + // slow operations (eg: reading a file from disk) while under an exclusive index lock, and this would lead to + // responsiveness issues where zenserver could fail to respond to requests because of a long-held exclusive lock. IndexLock.ReleaseNow(); ZenCacheValue ExistingValue; if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) @@ -2014,21 +2039,39 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, uint64_t RawSize = 0; IoHash RawHash = IoHash::Zero; cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash); - if (OptionalBatchHandle) - { - OptionalBatchHandle->OutResults.push_back( - PutResult{zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}); - } - return PutResult{zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; + OutPutResult = PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; + return true; } } } } + return false; +} + +ZenCacheDiskLayer::PutResult +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle) +{ + ZEN_TRACE_CPU("Z$::Bucket::Put"); + + metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + + PutResult Result{zen::PutStatus::Success}; if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { + if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(Result); + } + return Result; + } PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { @@ -2037,11 +2080,11 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } else { - PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle); + Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle); } m_DiskWriteCount++; - return PutResult{zen::PutStatus::Success}; + return Result; } uint64_t @@ -2890,25 +2933,35 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck return {}; } -void +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); + PutResult Result{zen::PutStatus::Success}; if (OptionalBatchHandle != nullptr) { OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); + PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back(); + CurrentEntry.Overwrite = Overwrite; std::vector& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; - HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); + HashKeyAndReferences.reserve(1 + References.size()); HashKeyAndReferences.push_back(HashKey); - HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end()); - return; + HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end()); + return Result; + } + + if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + { + return Result; } + uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) @@ -2958,6 +3011,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, } m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); + return Result; } std::string -- cgit v1.2.3 From f9b2a1e64e4077925c4a57ac3fc2367c698dada1 Mon Sep 17 00:00:00 2001 From: zousar Date: Thu, 7 Aug 2025 01:06:03 -0600 Subject: Avoid committing chunks for batch rejected puts Previously rejected puts would put the chunks, but not write them to the index, which was wrong. --- src/zenstore/cache/cachedisklayer.cpp | 77 ++++++++++++++++------------------- 1 file changed, 35 insertions(+), 42 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 33d8489c6..4535f785b 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1400,56 +1400,55 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept ZEN_ASSERT(Batch); if (!Batch->Buffers.empty()) { - std::vector EntryFlags; - for (const IoBuffer& Buffer : Batch->Buffers) + ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size()); + std::vector EntryFlags; + std::vector BufferToEntryIndexes; + std::vector BuffersToCommit; + BuffersToCommit.reserve(Batch->Buffers.size()); + for (size_t Index = 0; Index < Batch->Entries.size(); Index++) { - uint8_t Flags = 0; - if (Buffer.GetContentType() == ZenContentType::kCbObject) + const std::vector& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() > 0); + + ZenCacheValue TemporaryValue; + TemporaryValue.Value = Batch->Buffers[Index]; + std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]]; + OutResult = PutResult{zen::PutStatus::Success}; + if (!ShouldRejectPut(HashKeyAndReferences[0], + TemporaryValue, + ReferenceSpan, + Batch->Entries[Index].Overwrite, + OutResult)) { - Flags |= DiskLocation::kStructured; - } - else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary) - { - Flags |= DiskLocation::kCompressed; + BufferToEntryIndexes.push_back(Index); + BuffersToCommit.push_back(TemporaryValue.Value); + + uint8_t Flags = 0; + if (TemporaryValue.Value.GetContentType() == ZenContentType::kCbObject) + { + Flags |= DiskLocation::kStructured; + } + else if (TemporaryValue.Value.GetContentType() == ZenContentType::kCompressedBinary) + { + Flags |= DiskLocation::kCompressed; + } + EntryFlags.push_back(Flags); } - EntryFlags.push_back(Flags); } size_t IndexOffset = 0; - m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span Locations) { + m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); std::vector DiskEntries; - std::vector OutResults; - OutResults.reserve(Locations.size()); { - // Initial pass without an exclusive index lock to process put rejections - for (size_t Index = 0; Index < Locations.size(); Index++) - { - const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 0); - - ZenCacheValue TemporaryValue; - TemporaryValue.Value = Batch->Buffers[IndexOffset + Index]; - std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); - OutResults.push_back({zen::PutStatus::Success}); - ShouldRejectPut(HashKeyAndReferences[0], - TemporaryValue, - ReferenceSpan, - Batch->Entries[IndexOffset + Index].Overwrite, - OutResults.back()); - } RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { - if (OutResults[Index].Status != zen::PutStatus::Success) - { - // The put was rejected, skip any effort to commit it. - continue; - } - DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); - const std::vector& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; + const std::vector& HashKeyAndReferences = + Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences; ZEN_ASSERT(HashKeyAndReferences.size() > 0); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); @@ -1485,12 +1484,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept } } m_SlogFile.Append(DiskEntries); - for (size_t Index = 0; Index < Locations.size(); Index++) - { - size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; - ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = std::move(OutResults[Index]); - } IndexOffset += Locations.size(); }); } -- cgit v1.2.3 From f1496f149259303601c0ed60747034c54873c4ac Mon Sep 17 00:00:00 2001 From: zousar Date: Thu, 7 Aug 2025 01:07:39 -0600 Subject: precommit --- src/zenstore/cache/cachedisklayer.cpp | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) (limited to 'src/zenstore/cache/cachedisklayer.cpp') diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 4535f785b..502ab1508 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1402,7 +1402,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size()); std::vector EntryFlags; - std::vector BufferToEntryIndexes; + std::vector BufferToEntryIndexes; std::vector BuffersToCommit; BuffersToCommit.reserve(Batch->Buffers.size()); for (size_t Index = 0; Index < Batch->Entries.size(); Index++) @@ -1414,12 +1414,8 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept TemporaryValue.Value = Batch->Buffers[Index]; std::span ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]]; - OutResult = PutResult{zen::PutStatus::Success}; - if (!ShouldRejectPut(HashKeyAndReferences[0], - TemporaryValue, - ReferenceSpan, - Batch->Entries[Index].Overwrite, - OutResult)) + OutResult = PutResult{zen::PutStatus::Success}; + if (!ShouldRejectPut(HashKeyAndReferences[0], TemporaryValue, ReferenceSpan, Batch->Entries[Index].Overwrite, OutResult)) { BufferToEntryIndexes.push_back(Index); BuffersToCommit.push_back(TemporaryValue.Value); @@ -1440,9 +1436,8 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept size_t IndexOffset = 0; m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); - std::vector DiskEntries; + std::vector DiskEntries; { - RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { -- cgit v1.2.3