aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache
diff options
context:
space:
mode:
authorZousar Shaker <[email protected]>2025-08-08 12:51:37 -0600
committerGitHub Enterprise <[email protected]>2025-08-08 12:51:37 -0600
commitb23e5165112848284fbc0c62fdeb2e6207693e9f (patch)
tree1dff0546e0ae7ae31a8cf0f36771639a4e68d19c /src/zenstore/cache
parent5.6.15 (diff)
parentMerge branch 'main' into zs/put-overwrite-policy (diff)
downloadzen-b23e5165112848284fbc0c62fdeb2e6207693e9f.tar.xz
zen-b23e5165112848284fbc0c62fdeb2e6207693e9f.zip
Merge pull request #434 from ue-foundation/zs/put-overwrite-policy
Zs/put overwrite policy
Diffstat (limited to 'src/zenstore/cache')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp227
-rw-r--r--src/zenstore/cache/cacherpc.cpp234
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp66
3 files changed, 385 insertions, 142 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index fdf879e1f..e0030230f 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -233,6 +233,61 @@ using namespace std::literals;
namespace zen::cache::impl {
+static bool
+GetValueRawSizeAndHash(const ZenCacheValue& Value, uint64_t& OutValueRawSize, IoHash& OutValueRawHash)
+{
+ if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero))
+ {
+ OutValueRawSize = Value.RawSize;
+ OutValueRawHash = Value.RawHash;
+ return true;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value, OutValueRawHash, OutValueRawSize);
+ }
+
+ OutValueRawSize = Value.Value.GetSize();
+ OutValueRawHash = IoHash::HashBuffer(Value.Value);
+ return true;
+}
+
+static bool
+ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function<IoHash()>& RawHashProvider)
+{
+ if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero))
+ {
+ return ((RawSize == Value.RawSize) && (RawHashProvider() == Value.RawHash));
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ uint64_t ValueRawSize = 0;
+ IoHash ValueRawHash = IoHash::Zero;
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value, ValueRawHash, ValueRawSize) && (RawSize == ValueRawSize) &&
+ (RawHashProvider() == ValueRawHash);
+ }
+
+ return (RawSize == Value.Value.GetSize()) && (RawHashProvider() == IoHash::HashBuffer(Value.Value));
+}
+
+static bool
+ValueMatchesValue(const ZenCacheValue& Value1, const ZenCacheValue& Value2)
+{
+ if ((Value1.RawSize != 0) || (Value1.RawHash != IoHash::Zero))
+ {
+ return ValueMatchesRawSizeAndHash(Value2, Value1.RawSize, [&Value1]() { return Value1.RawHash; });
+ }
+ else if (Value1.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ uint64_t Value1RawSize = 0;
+ IoHash Value1RawHash = IoHash::Zero;
+ return CompressedBuffer::ValidateCompressedHeader(Value1.Value, Value1RawHash, Value1RawSize) &&
+ ValueMatchesRawSizeAndHash(Value2, Value1RawSize, [Value1RawHash]() { return Value1RawHash; });
+ }
+
+ return ValueMatchesRawSizeAndHash(Value2, Value1.Value.GetSize(), [&Value1]() { return IoHash::HashBuffer(Value1.Value); });
+}
+
class BucketManifestSerializer
{
using MetaDataIndex = ZenCacheDiskLayer::CacheBucket::MetaDataIndex;
@@ -1315,20 +1370,21 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
{
- PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
struct Entry
{
std::vector<IoHash> HashKeyAndReferences;
+ bool Overwrite;
};
std::vector<IoBuffer> Buffers;
std::vector<Entry> Entries;
std::vector<size_t> EntryResultIndexes;
- std::vector<bool>& OutResults;
+ std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
};
ZenCacheDiskLayer::CacheBucket::PutBatchHandle*
-ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults)
+ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults)
{
ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch");
return new PutBatchHandle(OutResults);
@@ -1344,23 +1400,41 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
ZEN_ASSERT(Batch);
if (!Batch->Buffers.empty())
{
- std::vector<uint8_t> EntryFlags;
- for (const IoBuffer& Buffer : Batch->Buffers)
+ ZEN_ASSERT(Batch->Buffers.size() == Batch->Entries.size());
+ std::vector<uint8_t> EntryFlags;
+ std::vector<size_t> BufferToEntryIndexes;
+ std::vector<IoBuffer> BuffersToCommit;
+ BuffersToCommit.reserve(Batch->Buffers.size());
+ for (size_t Index = 0; Index < Batch->Entries.size(); Index++)
{
- uint8_t Flags = 0;
- if (Buffer.GetContentType() == ZenContentType::kCbObject)
+ const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[Index].HashKeyAndReferences;
+ ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
+
+ ZenCacheValue TemporaryValue;
+ TemporaryValue.Value = Batch->Buffers[Index];
+ std::span<const IoHash> ReferenceSpan(HashKeyAndReferences.begin() + 1, HashKeyAndReferences.end());
+ PutResult& OutResult = Batch->OutResults[Batch->EntryResultIndexes[Index]];
+ OutResult = PutResult{zen::PutStatus::Success};
+ if (!ShouldRejectPut(HashKeyAndReferences[0], TemporaryValue, ReferenceSpan, Batch->Entries[Index].Overwrite, OutResult))
{
- Flags |= DiskLocation::kStructured;
- }
- else if (Buffer.GetContentType() == ZenContentType::kCompressedBinary)
- {
- Flags |= DiskLocation::kCompressed;
+ BufferToEntryIndexes.push_back(Index);
+ BuffersToCommit.push_back(TemporaryValue.Value);
+
+ uint8_t Flags = 0;
+ if (TemporaryValue.Value.GetContentType() == ZenContentType::kCbObject)
+ {
+ Flags |= DiskLocation::kStructured;
+ }
+ else if (TemporaryValue.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ Flags |= DiskLocation::kCompressed;
+ }
+ EntryFlags.push_back(Flags);
}
- EntryFlags.push_back(Flags);
}
size_t IndexOffset = 0;
- m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
+ m_BlockStore.WriteChunks(BuffersToCommit, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) {
ZEN_MEMSCOPE(GetCacheDiskTag());
std::vector<DiskIndexEntry> DiskEntries;
{
@@ -1368,7 +1442,8 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
for (size_t Index = 0; Index < Locations.size(); Index++)
{
DiskLocation Location(Locations[Index], m_Configuration.PayloadAlignment, EntryFlags[IndexOffset + Index]);
- const std::vector<IoHash>& HashKeyAndReferences = Batch->Entries[IndexOffset + Index].HashKeyAndReferences;
+ const std::vector<IoHash>& HashKeyAndReferences =
+ Batch->Entries[BufferToEntryIndexes[IndexOffset + Index]].HashKeyAndReferences;
ZEN_ASSERT(HashKeyAndReferences.size() >= 1);
const IoHash HashKey = HashKeyAndReferences[0];
DiskEntries.push_back({.Key = HashKey, .Location = Location});
@@ -1404,12 +1479,6 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
}
}
m_SlogFile.Append(DiskEntries);
- for (size_t Index = 0; Index < Locations.size(); Index++)
- {
- size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index];
- ZEN_ASSERT(ResultIndex < Batch->OutResults.size());
- Batch->OutResults[ResultIndex] = true;
- }
IndexOffset += Locations.size();
});
}
@@ -1905,30 +1974,105 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
}
-void
+bool
+ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<const IoHash> References,
+ bool Overwrite,
+ ZenCacheDiskLayer::PutResult& OutPutResult)
+{
+ ZEN_UNUSED(References);
+
+ const bool CheckExisting = m_Configuration.LimitOverwrites && !Overwrite;
+ if (CheckExisting)
+ {
+ RwLock::SharedLockScope IndexLock(m_IndexLock);
+ auto It = m_Index.find(HashKey);
+ if (It != m_Index.end())
+ {
+ PayloadIndex EntryIndex = It.value();
+ m_AccessTimes[EntryIndex] = GcClock::TickCount();
+ DiskLocation Location = m_Payloads[EntryIndex].Location;
+
+ bool ComparisonComplete = false;
+ const BucketPayload* Payload = &m_Payloads[EntryIndex];
+ if (Payload->MetaData)
+ {
+ const BucketMetaData& MetaData = m_MetaDatas[Payload->MetaData];
+ if (MetaData)
+ {
+ if (!cache::impl::ValueMatchesRawSizeAndHash(Value, MetaData.RawSize, [&MetaData]() { return MetaData.RawHash; }))
+ {
+ OutPutResult = PutResult{
+ zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)};
+ return true;
+ }
+ ComparisonComplete = true;
+ }
+ }
+
+ if (!ComparisonComplete)
+ {
+ // We must release the index lock before calling Get as that will acquire it.
+ // Having the Get method not acquire the index lock is not viable because Get has potentially multiple acquisitions
+ // of the index lock, first as a shared lock, then as an exclusive lock. If the caller has an exclusive lock, we
+ // could just accept that, and not have Get do any lock acquisitions, but it creates situations where Get is doing
+ // slow operations (eg: reading a file from disk) while under an exclusive index lock, and this would lead to
+ // responsiveness issues where zenserver could fail to respond to requests because of a long-held exclusive lock.
+ IndexLock.ReleaseNow();
+ ZenCacheValue ExistingValue;
+ if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue))
+ {
+ uint64_t RawSize = 0;
+ IoHash RawHash = IoHash::Zero;
+ cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash);
+ OutPutResult = PutResult{zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)};
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::Put");
metrics::RequestStats::Scope $(m_PutOps, Value.Value.Size());
+ PutResult Result{zen::PutStatus::Success};
+
if (Value.Value.Size() >= m_Configuration.LargeObjectThreshold)
{
+ if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result))
+ {
+ if (OptionalBatchHandle)
+ {
+ OptionalBatchHandle->OutResults.push_back(Result);
+ }
+ return Result;
+ }
PutStandaloneCacheValue(HashKey, Value, References);
if (OptionalBatchHandle)
{
- OptionalBatchHandle->OutResults.push_back(true);
+ OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success});
}
}
else
{
- PutInlineCacheValue(HashKey, Value, References, OptionalBatchHandle);
+ Result = PutInlineCacheValue(HashKey, Value, References, Overwrite, OptionalBatchHandle);
}
m_DiskWriteCount++;
+ return Result;
}
uint64_t
@@ -2777,25 +2921,35 @@ ZenCacheDiskLayer::CacheBucket::GetMetaData(RwLock::SharedLockScope&, const Buck
return {};
}
-void
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Bucket::PutInlineCacheValue");
+ PutResult Result{zen::PutStatus::Success};
if (OptionalBatchHandle != nullptr)
{
OptionalBatchHandle->Buffers.push_back(Value.Value);
OptionalBatchHandle->Entries.push_back({});
OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size());
- OptionalBatchHandle->OutResults.push_back(false);
+ OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail});
+ PutBatchHandle::Entry& CurrentEntry = OptionalBatchHandle->Entries.back();
+ CurrentEntry.Overwrite = Overwrite;
std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences;
HashKeyAndReferences.reserve(1 + References.size());
HashKeyAndReferences.push_back(HashKey);
HashKeyAndReferences.insert(HashKeyAndReferences.end(), References.begin(), References.end());
- return;
+ return Result;
+ }
+
+ if (ShouldRejectPut(HashKey, Value, References, Overwrite, Result))
+ {
+ return Result;
}
+
uint8_t EntryFlags = 0;
if (Value.Value.GetContentType() == ZenContentType::kCbObject)
@@ -2845,6 +2999,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
}
m_SlogFile.Append({.Key = HashKey, .Location = Location});
});
+ return Result;
}
std::string
@@ -3781,7 +3936,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
struct ZenCacheDiskLayer::PutBatchHandle
{
- PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
CacheBucket* Bucket;
@@ -3840,13 +3995,13 @@ struct ZenCacheDiskLayer::PutBatchHandle
return NewBucketHandle;
}
- RwLock Lock;
- std::vector<BucketHandle> BucketHandles;
- std::vector<bool>& OutResults;
+ RwLock Lock;
+ std::vector<BucketHandle> BucketHandles;
+ std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
};
ZenCacheDiskLayer::PutBatchHandle*
-ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults)
+ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults)
{
return new PutBatchHandle(OutResults);
}
@@ -3983,21 +4138,23 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
}
}
-void
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Put");
-
+ PutResult RetVal = {zen::PutStatus::Fail};
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
- Bucket->Put(HashKey, Value, References, BucketBatchHandle);
+ RetVal = Bucket->Put(HashKey, Value, References, Overwrite, BucketBatchHandle);
TryMemCacheTrim();
}
+ return RetVal;
}
void
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index de4b0a37c..5d9a68919 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -334,13 +334,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
.Policy = std::move(Policy),
.Context = Context};
- PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
+ PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest);
- if (Result == PutResult::Invalid)
+ if (Result == PutStatus::Invalid)
{
return CbPackage{};
}
- Results.push_back(Result == PutResult::Success);
+ Results.push_back(Result == PutStatus::Success);
}
if (Results.empty())
{
@@ -360,7 +360,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
return RpcResponse;
}
-PutResult
+PutStatus
CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
{
CbObjectView Record = Request.RecordObject;
@@ -422,14 +422,28 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (Count.Invalid > 0)
{
- return PutResult::Invalid;
+ return PutStatus::Invalid;
}
ZenCacheValue CacheValue;
CacheValue.Value = IoBuffer(Record.GetSize());
Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr);
+ bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) &&
+ !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context,
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return PutResult.Status;
+ }
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
@@ -466,7 +480,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
.Key = Request.Key,
.ValueContentIds = std::move(ValidAttachments)});
}
- return PutResult::Success;
+ return PutStatus::Success;
}
CbPackage
@@ -765,18 +779,24 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
+ bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal);
std::vector<IoHash> ReferencedAttachments;
ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
const IoHash ValueHash = HashView.AsHash();
ReferencedAttachments.push_back(ValueHash);
});
- m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = {Request.RecordCacheValue}},
- ReferencedAttachments,
- nullptr);
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = {Request.RecordCacheValue}},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return;
+ }
m_CacheStats.WriteCount++;
}
ParseValues(Request);
@@ -924,10 +944,10 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<bool> BatchResults;
- eastl::fixed_vector<size_t, 32> BatchResultIndexes;
- eastl::fixed_vector<bool, 32> Results;
- eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
+ std::vector<ZenCacheStore::PutResult> BatchResults;
+ eastl::fixed_vector<size_t, 32> BatchResultIndexes;
+ eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results;
+ eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
uint64_t RequestCount = RequestsArray.Num();
{
@@ -977,34 +997,39 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
{
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
+ IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
Value.SetContentType(ZenContentType::kCompressedBinary);
if (RawSize == 0)
{
RawSize = Chunk.DecodeRawSize();
}
- m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
- {},
- Batch.get());
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ Batch.get());
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
if (Batch)
{
BatchResultIndexes.push_back(Results.size());
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail});
}
else
{
- Results.push_back(true);
+ Results.push_back(PutResult);
}
TransferredSize = Chunk.GetCompressedSize();
}
else
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
}
Valid = true;
}
@@ -1020,12 +1045,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
IsCompressedBinary(ExistingValue.Value.GetContentType()))
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
Valid = true;
}
else
{
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)});
}
}
// We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
@@ -1060,13 +1085,13 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
{
size_t BatchResultIndex = BatchResultIndexes[Index];
ZEN_ASSERT(BatchResultIndex < Results.size());
- ZEN_ASSERT(Results[BatchResultIndex] == false);
+ ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success);
Results[BatchResultIndex] = BatchResults[Index];
}
for (std::size_t Index = 0; Index < Results.size(); Index++)
{
- if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty)
+ if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty)
{
m_UpstreamCache.EnqueueUpstream(
{.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
@@ -1076,11 +1101,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
CbObjectWriter ResponseObject{1024};
ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
+ bool bAnyErrors = false;
+ for (const ZenCacheStore::PutResult& Value : Results)
{
- ResponseObject.AddBool(Value);
+ if (Value.Status == zen::PutStatus::Success)
+ {
+ ResponseObject.AddBool(true);
+ }
+ else
+ {
+ bAnyErrors = true;
+ ResponseObject.AddBool(false);
+ }
}
ResponseObject.EndArray();
+ if (bAnyErrors)
+ {
+ ResponseObject.BeginArray("ErrorMessages"sv);
+ for (const ZenCacheStore::PutResult& Value : Results)
+ {
+ if (Value.Status != zen::PutStatus::Success)
+ {
+ ResponseObject.AddString(Value.Message);
+ }
+ }
+ ResponseObject.EndArray();
+ }
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
@@ -1239,6 +1285,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ const bool Overwrite = StoreData && !EnumHasAllFlags(Request.Policy, CachePolicy::QueryLocal);
const bool IsHit = SkipData || HasData;
if (IsHit)
{
@@ -1249,14 +1296,19 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
if (HasData && StoreData)
{
- m_CacheStore.Put(Context,
- *Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
- {},
- nullptr);
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(
+ Context,
+ *Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
}
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
@@ -1527,36 +1579,48 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](
- CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
- CacheKeyRequest& RecordKey = Params.Request;
- size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
- RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- RecordBody& Record = Records[RecordIndex];
-
- const CacheKey& Key = RecordKey.Key;
- Record.Exists = true;
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Record.CacheValue.SetContentType(ZenContentType::kCbObject);
- Record.Source = Params.Source;
-
- bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- std::vector<IoHash> ReferencedAttachments;
- ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- });
- m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr);
- m_CacheStats.WriteCount++;
- }
- };
+ const auto OnCacheRecordGetComplete =
+ [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ RecordBody& Record = Records[RecordIndex];
+
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = Params.Source;
+
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ bool Overwrite = !EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal);
+ std::vector<IoHash> ReferencedAttachments;
+ ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ });
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Record.CacheValue},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return;
+ }
+ m_CacheStats.WriteCount++;
+ }
+ };
m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
@@ -1774,20 +1838,26 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
+ bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal);
if (Request.IsRecordRequest)
{
m_CidStore.AddChunk(Params.Value, Params.RawHash);
}
else
{
- m_CacheStore.Put(Context,
- Namespace,
- Key.Key.Bucket,
- Key.Key.Hash,
- {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
- {},
- nullptr);
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore.Put(Context,
+ Namespace,
+ Key.Key.Bucket,
+ Key.Key.Hash,
+ {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
}
}
if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index d956384ca..973af52b2 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -154,7 +154,7 @@ struct ZenCacheNamespace::PutBatchHandle
};
ZenCacheNamespace::PutBatchHandle*
-ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult)
+ZenCacheNamespace::BeginPutBatch(std::vector<PutResult>& OutResult)
{
ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle;
Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult);
@@ -252,11 +252,12 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
return;
}
-void
+ZenCacheNamespace::PutResult
ZenCacheNamespace::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU(OptionalBatchHandle ? "Z$::Namespace::Put(Batched)" : "Z$::Namespace::Put");
@@ -268,8 +269,12 @@ ZenCacheNamespace::Put(std::string_view InBucket,
ZEN_ASSERT(Value.Value.Size());
ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr;
- m_DiskLayer.Put(InBucket, HashKey, Value, References, DiskLayerBatchHandle);
- m_WriteCount++;
+ PutResult RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle);
+ if (RetVal.Status == zen::PutStatus::Success)
+ {
+ m_WriteCount++;
+ }
+ return RetVal;
}
bool
@@ -557,7 +562,7 @@ ZenCacheStore::LogWorker()
}
}
-ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<bool>& OutResult)
+ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<PutResult>& OutResult)
: m_CacheStore(CacheStore)
{
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -720,13 +725,14 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
m_MissCount++;
}
-void
+ZenCacheStore::PutResult
ZenCacheStore::Put(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
+ bool Overwrite,
PutBatch* OptionalBatchHandle)
{
// Ad hoc rejection of known bad usage patterns for DDC bucket names
@@ -734,7 +740,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (IsKnownBadBucketName(Bucket))
{
m_RejectedWriteCount++;
- return;
+ return PutResult{zen::PutStatus::Invalid, "Bad bucket name"};
}
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -764,9 +770,16 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
{
ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr;
- Store->Put(Bucket, HashKey, Value, References, BatchHandle);
- m_WriteCount++;
- return;
+ PutResult RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle);
+ if (RetVal.Status == zen::PutStatus::Success)
+ {
+ m_WriteCount++;
+ }
+ else
+ {
+ m_RejectedWriteCount++;
+ }
+ return RetVal;
}
ZEN_WARN("request for unknown namespace '{}' in ZenCacheStore::Put [{}] bucket '{}', key '{}'",
@@ -774,6 +787,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
Namespace,
Bucket,
HashKey.ToHexString());
+ return PutResult{zen::PutStatus::Fail, fmt::format("Unknown namespace '{}'", Namespace)};
}
bool
@@ -1378,7 +1392,7 @@ TEST_CASE("cachestore.store")
Value.Value = Obj.GetBuffer().AsIoBuffer();
Value.Value.SetContentType(ZenContentType::kCbObject);
- Zcs.Put("test_bucket"sv, Key, Value, {});
+ Zcs.Put("test_bucket"sv, Key, Value, {}, false);
}
for (int i = 0; i < kIterationCount; ++i)
@@ -1432,7 +1446,7 @@ TEST_CASE("cachestore.size")
const size_t Bucket = Key % 4;
std::string BucketName = fmt::format("test_bucket-{}", Bucket);
IoHash Hash = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {});
+ Zcs.Put(BucketName, Hash, ZenCacheValue{.Value = Buffer}, {}, false);
Keys.push_back({BucketName, Hash});
}
CacheSize = Zcs.StorageSize();
@@ -1486,7 +1500,7 @@ TEST_CASE("cachestore.size")
for (size_t Key = 0; Key < Count; ++Key)
{
const size_t Bucket = Key % 4;
- Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {});
+ Zcs.Put(fmt::format("test_bucket-{}", Bucket), IoHash::HashBuffer(&Key, sizeof(uint32_t)), {.Value = Buffer}, {}, false);
}
CacheSize = Zcs.StorageSize();
@@ -1569,7 +1583,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : Chunks)
{
ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, &Chunk]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
WorkCompleted.fetch_add(1);
});
}
@@ -1650,7 +1664,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
for (const auto& Chunk : NewChunks)
{
ThreadPool.ScheduleWork([&Zcs, &WorkCompleted, Chunk, &AddedChunkCount]() {
- Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {});
+ Zcs.Put(Chunk.second.Bucket, Chunk.first, {.Value = Chunk.second.Buffer}, {}, false);
AddedChunkCount.fetch_add(1);
WorkCompleted.fetch_add(1);
});
@@ -1755,14 +1769,14 @@ TEST_CASE("cachestore.namespaces")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {});
+ Zcs.Put(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, PutValue, {}, false);
ZenCacheValue GetValue;
CHECK(Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key1, GetValue));
CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
// This should just be dropped as we don't allow creating of namespaces on the fly
- Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {});
+ Zcs.Put(Context, CustomNamespace, Bucket, Key1, PutValue, {}, false);
CHECK(!Zcs.Get(Context, CustomNamespace, Bucket, Key1, GetValue));
}
@@ -1778,7 +1792,7 @@ TEST_CASE("cachestore.namespaces")
IoBuffer Buffer2 = CacheValue2.GetBuffer().AsIoBuffer();
Buffer2.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue2 = {.Value = Buffer2};
- Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {});
+ Zcs.Put(Context, CustomNamespace, Bucket, Key2, PutValue2, {}, false);
ZenCacheValue GetValue;
CHECK(!Zcs.Get(Context, ZenCacheStore::DefaultNamespace, Bucket, Key2, GetValue));
@@ -1820,7 +1834,7 @@ TEST_CASE("cachestore.drop.bucket")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false);
return Key;
};
auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
@@ -1893,7 +1907,7 @@ TEST_CASE("cachestore.drop.namespace")
Buffer.SetContentType(ZenContentType::kCbObject);
ZenCacheValue PutValue = {.Value = Buffer};
- Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {});
+ Zcs.Put(Context, Namespace, Bucket, Key, PutValue, {}, false);
return Key;
};
auto GetValue = [&Context](ZenCacheStore& Zcs, std::string_view Namespace, std::string_view Bucket, const IoHash& Key) {
@@ -1979,7 +1993,7 @@ TEST_CASE("cachestore.blocked.disklayer.put")
size_t Key = Buffer.Size();
IoHash HashKey = IoHash::HashBuffer(&Key, sizeof(uint32_t));
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {});
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer}, {}, false);
ZenCacheValue BufferGet;
CHECK(Zcs.Get("test_bucket", HashKey, BufferGet));
@@ -1989,7 +2003,7 @@ TEST_CASE("cachestore.blocked.disklayer.put")
Buffer2.SetContentType(ZenContentType::kCbObject);
// We should be able to overwrite even if the file is open for read
- Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {});
+ Zcs.Put("test_bucket", HashKey, {.Value = Buffer2}, {}, false);
MemoryView OldView = BufferGet.Value.GetView();
@@ -2080,7 +2094,7 @@ TEST_CASE("cachestore.scrub")
AttachmentHashes.push_back(Attachment.DecodeRawHash());
CidStore.AddChunk(Attachment.GetCompressed().Flatten().AsIoBuffer(), AttachmentHashes.back());
}
- Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes);
+ Zcs.Put("mybucket", Cid, {.Value = Record.Record}, AttachmentHashes, false);
}
};
@@ -2129,7 +2143,8 @@ TEST_CASE("cachestore.newgc.basics")
{.Value = Record.second,
.RawSize = Record.second.GetSize(),
.RawHash = IoHash::HashBuffer(Record.second.GetData(), Record.second.GetSize())},
- AttachmentKeys);
+ AttachmentKeys,
+ false);
for (const auto& Attachment : Attachments)
{
CidStore.AddChunk(Attachment.second.GetCompressed().Flatten().AsIoBuffer(), Attachment.second.DecodeRawHash());
@@ -2145,7 +2160,8 @@ TEST_CASE("cachestore.newgc.basics")
{.Value = CacheValue.second,
.RawSize = CacheValue.second.GetSize(),
.RawHash = IoHash::HashBuffer(CacheValue.second.GetData(), CacheValue.second.GetSize())},
- {});
+ {},
+ false);
CacheEntries.insert({Key, CacheEntry{CacheValue.second, {}}});
return Key;
};