aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cachedisklayer.cpp
diff options
context:
space:
mode:
authorzousar <[email protected]>2025-08-05 23:33:56 -0600
committerzousar <[email protected]>2025-08-05 23:33:56 -0600
commit4f3c5221253b627282ce6405e79f9c90dacc9b62 (patch)
tree2678e60b9aa7ed1c72cbd27404815761910613db /src/zenstore/cache/cachedisklayer.cpp
parentMerge branch 'main' into zs/put-overwrite-policy (diff)
downloadzen-4f3c5221253b627282ce6405e79f9c90dacc9b62.tar.xz
zen-4f3c5221253b627282ce6405e79f9c90dacc9b62.zip
Moving put rejections to happen in batch handling
Diffstat (limited to 'src/zenstore/cache/cachedisklayer.cpp')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp120
1 files changed, 87 insertions, 33 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index 60475e30c..33d8489c6 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1374,6 +1374,7 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
struct Entry
{
std::vector<IoHash> HashKeyAndReferences;
+ bool Overwrite;
};
std::vector<IoBuffer> Buffers;
std::vector<Entry> Entries;
@@ -1417,14 +1418,39 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
size_t IndexOffset = 0;
m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
ZEN_MEMSCOPE(GetCacheDiskTag());
- std::vector<DiskIndexEntry> DiskEntries;
+ std::vector<DiskIndexEntry> DiskEntries;
+ std::vector<ZenCacheDiskLayer::PutResult> OutResults;
+ OutResults.reserve(Locations.size());
{
+ // Initial pass without an exclusive index lock to process put rejections
+ for (size_t Index = 0; Index < Locations.size(); Index++)
+ {
+ const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences;
+ ZEN_ASSERT(HashKeyAndReferences.size() > 0);
+
+ ZenCacheValue TemporaryValue;
+ TemporaryValue.Value = Batch->Buffers[IndexOffset + Index];
+ std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end());
+ OutResults.push_back({zen::PutStatus::Success});
+ ShouldRejectPut(HashKeyAndReferences[0],
+ TemporaryValue,
+ ReferenceSpan,
+ Batch->Entries[IndexOffset + Index].Overwrite,
+ OutResults.back());
+ }
+
RwLock::ExclusiveLockScope IndexLock(m_IndexLock);
for (size_t Index = 0; Index < Locations.size(); Index++)
{
+ if (OutResults[Index].Status != zen::PutStatus::Success)
+ {
+ // The put was rejected, skip any effort to commit it.
+ continue;
+ }
+
DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]);
const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences;
- ZEN_ASSERT(HashKeyAndReferences.size() > 1);
+ ZEN_ASSERT(HashKeyAndReferences.size() > 0);
const IoHash HashKey = HashKeyAndReferences[0];
DiskEntries.push_back({.Key = HashKey, .Location = Location});
if (m_TrackedCacheKeys)
@@ -1463,7 +1489,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
{
size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index];
ZEN_ASSERT(ResultIndex < Batch->OutResults.size());
- Batch->OutResults[ResultIndex] = {zen::PutStatus::Success};
+ Batch->OutResults[ResultIndex] = std::move(OutResults[Index]);
}
IndexOffset += Locations.size();
});
@@ -1960,16 +1986,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
}
-ZenCacheDiskLayer::PutResult
-ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- bool Overwrite,
- PutBatchHandle* OptionalBatchHandle)
+bool
+ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<const IoHash> References,
+ bool Overwrite,
+ ZenCacheDiskLayer::PutResult& OutPutResult)
{
- ZEN_TRACE_CPU("Z$::Bucket::Put");
-
- metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
+ ZEN_UNUSED(References);
const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite;
if (CheckExisting)
@@ -1991,15 +2015,10 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
{
if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; }))
{
- if (OptionalBatchHandle)
- {
- OptionalBatchHandle->OutResults.push_back(PutResult{
- zen::PutStatus::Conflict,
- fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)});
- }
- return PutResult{
+ OutPutResult = PutResult{
zen::PutStatus::Conflict,
fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)};
+ return true;
}
ComparisonComplete = true;
}
@@ -2007,6 +2026,12 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
if (!ComparisonComplete)
{
+ // We must release the index lock before calling Get as that will acquire it.
+ // Having the Get method not acquire the index lock is not viable because Get has potentially multiple acquisitions
+ // of the index lock, first as a shared lock, then as an exclusive lock. If the caller has an exclusive lock, we
+ // could just accept that, and not have Get do any lock acquisitions, but it creates situations where Get is doing
+ // slow operations (eg: reading a file from disk) while under an exclusive index lock, and this would lead to
+ // responsiveness issues where zenserver could fail to respond to requests because of a long-held exclusive lock.
IndexLock.ReleaseNow();
ZenCacheValue ExistingValue;
if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue))
@@ -2014,21 +2039,39 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
uint64_t RawSize = 0;
IoHash RawHash = IoHash::Zero;
cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash);
- if (OptionalBatchHandle)
- {
- OptionalBatchHandle->OutResults.push_back(
- PutResult{zen::PutStatus::Conflict,
- fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)});
- }
- return PutResult{zen::PutStatus::Conflict,
- fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)};
+ OutPutResult = PutResult{zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)};
+ return true;
}
}
}
}
+ return false;
+}
+
+ZenCacheDiskLayer::PutResult
+ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle)
+{
+ ZEN_TRACE_CPU("Z$::Bucket::Put");
+
+ metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
+
+ PutResult Result{zen::PutStatus::Success};
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
+ if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result))
+ {
+ if (OptionalBatchHandle)
+ {
+ OptionalBatchHandle->OutResults.push_back(Result);
+ }
+ return Result;
+ }
PutStandaloneCacheValue(HashKey, Value, References);
if (OptionalBatchHandle)
{
@@ -2037,11 +2080,11 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
}
else
{
- PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle);
+ Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle);
}
m_DiskWriteCount++;
- return PutResult{zen::PutStatus::Success};
+ return Result;
}
uint64_t
@@ -2890,25 +2933,35 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck
return {};
}
-void
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
+ PutResult Result{zen::PutStatus::Success};
if (OptionalBatchHandle != nullptr)
{
OptionalBatchHandle->Buffers.push_back(Value.Value);
OptionalBatchHandle->Entries.push_back({});
OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size());
OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail});
+ PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back();
+ CurrentEntry.Overwrite = Overwrite;
std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences;
- HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size());
+ HashKeyAndReferences.reserve(1 + References.size());
HashKeyAndReferences.push_back(HashKey);
- HashKeyAndReferences.insert(HashKeyAndReferences.end(), HashKeyAndReferences.begin(), HashKeyAndReferences.end());
- return;
+ HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end());
+ return Result;
+ }
+
+ if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result))
+ {
+ return Result;
}
+
uint8_t EntryFlags = 0;
if (Value.Value.GetContentType() == ZenContentType::kCbObject)
@@ -2958,6 +3011,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
}
m_SlogFile.Append({.Key = HashKey, .Location = Location});
});
+ return Result;
}
std::string