diff options
| author | zousar <[email protected]> | 2025-08-05 23:33:56 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-08-05 23:33:56 -0600 |
| commit | 4f3c5221253b627282ce6405e79f9c90dacc9b62 (patch) | |
| tree | 2678e60b9aa7ed1c72cbd27404815761910613db /src/zenstore/cache/cachedisklayer.cpp | |
| parent | Merge branch 'main' into zs/put-overwrite-policy (diff) | |
| download | zen-4f3c5221253b627282ce6405e79f9c90dacc9b62.tar.xz zen-4f3c5221253b627282ce6405e79f9c90dacc9b62.zip | |
Moving put rejections to happen in batch handling
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 120 |
1 files changed, 87 insertions, 33 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 60475e30c..33d8489c6 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1374,6 +1374,7 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle struct Entry { std::vector<IoHash> HashKeyAndReferences; + bool Overwrite; }; std::vector<IoBuffer> Buffers; std::vector<Entry> Entries; @@ -1417,14 +1418,39 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept size_t IndexOffset = 0; m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { ZEN_MEMSCOPE(GetCacheDiskTag()); - std::vector<DiskIndexEntry> DiskEntries; + std::vector<DiskIndexEntry> DiskEntries; + std::vector<ZenCacheDiskLayer::PutResult> OutResults; + OutResults.reserve(Locations.size()); { + // Initial pass without an exclusive index lock to process put rejections + for (size_t Index = 0; Index < Locations.size(); Index++) + { + const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; + ZEN_ASSERT(HashKeyAndReferences.size() > 0); + + ZenCacheValue TemporaryValue; + TemporaryValue.Value = Batch->Buffers[IndexOffset + Index]; + std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end()); + OutResults.push_back({zen::PutStatus::Success}); + ShouldRejectPut(HashKeyAndReferences[0], + TemporaryValue, + ReferenceSpan, + Batch->Entries[IndexOffset + Index].Overwrite, + OutResults.back()); + } + RwLock::ExclusiveLockScope IndexLock(m_IndexLock); for (size_t Index = 0; Index < Locations.size(); Index++) { + if (OutResults[Index].Status != zen::PutStatus::Success) + { + // The put was rejected, skip any effort to commit it. + continue; + } + DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]); const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences; - ZEN_ASSERT(HashKeyAndReferences.size() > 1); + ZEN_ASSERT(HashKeyAndReferences.size() > 0); const IoHash HashKey = HashKeyAndReferences[0]; DiskEntries.push_back({.Key = HashKey, .Location = Location}); if (m_TrackedCacheKeys) @@ -1463,7 +1489,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept { size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index]; ZEN_ASSERT(ResultIndex < Batch->OutResults.size()); - Batch->OutResults[ResultIndex] = {zen::PutStatus::Success}; + Batch->OutResults[ResultIndex] = std::move(OutResults[Index]); } IndexOffset += Locations.size(); }); @@ -1960,16 +1986,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal } } -ZenCacheDiskLayer::PutResult -ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, - const ZenCacheValue& Value, - std::span<IoHash> References, - bool Overwrite, - PutBatchHandle* OptionalBatchHandle) +bool +ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<const IoHash> References, + bool Overwrite, + ZenCacheDiskLayer::PutResult& OutPutResult) { - ZEN_TRACE_CPU("Z$::Bucket::Put"); - - metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + ZEN_UNUSED(References); const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite; if (CheckExisting) @@ -1991,15 +2015,10 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, { if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; })) { - if (OptionalBatchHandle) - { - OptionalBatchHandle->OutResults.push_back(PutResult{ - zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}); - } - return PutResult{ + OutPutResult = PutResult{ zen::PutStatus::Conflict, fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)}; + return true; } ComparisonComplete = true; } @@ -2007,6 +2026,12 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, if (!ComparisonComplete) { + // We must release the index lock before calling Get as that will acquire it. + // Having the Get method not acquire the index lock is not viable because Get has potentially multiple acquisitions + // of the index lock, first as a shared lock, then as an exclusive lock. If the caller has an exclusive lock, we + // could just accept that, and not have Get do any lock acquisitions, but it creates situations where Get is doing + // slow operations (eg: reading a file from disk) while under an exclusive index lock, and this would lead to + // responsiveness issues where zenserver could fail to respond to requests because of a long-held exclusive lock. IndexLock.ReleaseNow(); ZenCacheValue ExistingValue; if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue)) @@ -2014,21 +2039,39 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, uint64_t RawSize = 0; IoHash RawHash = IoHash::Zero; cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash); - if (OptionalBatchHandle) - { - OptionalBatchHandle->OutResults.push_back( - PutResult{zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}); - } - return PutResult{zen::PutStatus::Conflict, - fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; + OutPutResult = PutResult{zen::PutStatus::Conflict, + fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)}; + return true; } } } } + return false; +} + +ZenCacheDiskLayer::PutResult +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, + const ZenCacheValue& Value, + std::span<IoHash> References, + bool Overwrite, + PutBatchHandle* OptionalBatchHandle) +{ + ZEN_TRACE_CPU("Z$::Bucket::Put"); + + metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size()); + + PutResult Result{zen::PutStatus::Success}; if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold) { + if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + { + if (OptionalBatchHandle) + { + OptionalBatchHandle->OutResults.push_back(Result); + } + return Result; + } PutStandaloneCacheValue(HashKey, Value, References); if (OptionalBatchHandle) { @@ -2037,11 +2080,11 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, } else { - PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle); + Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle); } m_DiskWriteCount++; - return PutResult{zen::PutStatus::Success}; + return Result; } uint64_t @@ -2890,25 +2933,35 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck return {}; } -void +ZenCacheDiskLayer::PutResult ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, const ZenCacheValue& Value, std::span<IoHash> References, + bool Overwrite, PutBatchHandle* OptionalBatchHandle) { ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue"); + PutResult Result{zen::PutStatus::Success}; if (OptionalBatchHandle != nullptr) { OptionalBatchHandle->Buffers.push_back(Value.Value); OptionalBatchHandle->Entries.push_back({}); OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size()); OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail}); + PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back(); + CurrentEntry.Overwrite = Overwrite; std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences; - HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size()); + HashKeyAndReferences.reserve(1 + References.size()); HashKeyAndReferences.push_back(HashKey); - HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end()); - return; + HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end()); + return Result; + } + + if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result)) + { + return Result; } + uint8_t EntryFlags = 0; if (Value.Value.GetContentType() == ZenContentType::kCbObject) @@ -2958,6 +3011,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, } m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); + return Result; } std::string |