aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache
diff options
context:
space:
mode:
authorZousar Shaker <[email protected]>2025-09-25 10:52:11 -0600
committerGitHub Enterprise <[email protected]>2025-09-25 10:52:11 -0600
commitae107e8e7fe1cdf573a93c8d8908592ea671ee51 (patch)
tree2accd8ecaa662f3527b7ba6567c07aed8cbe6693 /src/zenstore/cache
parent5.7.2-pre2 (diff)
parentImprovement to Incomplete Result Iteration (diff)
downloadzen-ae107e8e7fe1cdf573a93c8d8908592ea671ee51.tar.xz
zen-ae107e8e7fe1cdf573a93c8d8908592ea671ee51.zip
Merge pull request #509 from ue-foundation/zs/put-overwrite-policy-response
Zs/put overwrite policy response
Diffstat (limited to 'src/zenstore/cache')
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp30
-rw-r--r--src/zenstore/cache/cacherpc.cpp107
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp9
3 files changed, 108 insertions, 38 deletions
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index fd52cdab5..ba82dd942 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -1968,17 +1968,19 @@ ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey,
IndexLock.ReleaseNow();
if (!cache::impl::UpdateValueWithRawSizeAndHash(InOutValue))
{
- OutPutResult = PutResult{zen::PutStatus::Fail, "Value provided is of bad format"};
+ CbObjectWriter DetailWriter;
+ DetailWriter.AddString("Message", "Value provided is of bad format");
+ OutPutResult = PutResult{zen::PutStatus::Fail, DetailWriter.Save()};
return true;
}
else if (MetaData.RawSize != InOutValue.RawSize || MetaData.RawHash != InOutValue.RawHash)
{
- OutPutResult = PutResult{
- zen::PutStatus::Conflict,
- fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)};
- return true;
+ // Deliberate fall through without return so that we load the value and include it in the result
+ }
+ else
+ {
+ return false;
}
- return false;
}
}
@@ -2008,16 +2010,22 @@ ZenCacheDiskLayer::CacheBucket::ShouldRejectPut(const IoHash& HashKey,
{
if (!cache::impl::UpdateValueWithRawSizeAndHash(InOutValue))
{
- OutPutResult = PutResult{zen::PutStatus::Fail, "Value provided is of bad format"};
+ CbObjectWriter DetailWriter;
+ DetailWriter.AddString("Message", "Value provided is of bad format");
+ OutPutResult = PutResult{zen::PutStatus::Fail, DetailWriter.Save()};
return true;
}
if (ExistingValue.RawSize != InOutValue.RawSize || ExistingValue.RawHash != InOutValue.RawHash)
{
- OutPutResult = PutResult{zen::PutStatus::Conflict,
- fmt::format("Value exists with different size '{}' or hash '{}'",
- ExistingValue.RawSize,
- ExistingValue.RawHash)};
+ CbObjectWriter DetailWriter;
+ DetailWriter.AddInteger("RawSize", ExistingValue.RawSize);
+ DetailWriter.AddHash("RawHash", ExistingValue.RawHash);
+ if (Location.IsFlagSet(DiskLocation::kStructured))
+ {
+ DetailWriter.AddObject("Record", CbObject(SharedBuffer(ExistingValue.Value)));
+ }
+ OutPutResult = PutResult{zen::PutStatus::Conflict, DetailWriter.Save()};
return true;
}
}
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 83301f863..e4f23816d 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -310,7 +310,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
}
DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
- eastl::fixed_vector<bool, 32> Results;
+ eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results;
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
for (CbFieldView RequestField : RequestsArray)
@@ -332,33 +332,50 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
.Policy = std::move(Policy),
.Context = Context};
- PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest);
+ ZenCacheStore::PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
- if (Result == PutStatus::Invalid)
+ if (Result.Status == PutStatus::Invalid)
{
return CbPackage{};
}
- Results.push_back(Result == PutStatus::Success);
+ Results.push_back(Result);
}
if (Results.empty())
{
return CbPackage{};
}
+ bool bWriteAllDetails = false;
CbObjectWriter ResponseObject{256};
ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
+ for (const ZenCacheStore::PutResult& Result : Results)
{
- ResponseObject.AddBool(Value);
+ // Conflicts will be treated for response purposes
+ // as successful puts because the key is present in the cache.
+ ResponseObject.AddBool((Result.Status == PutStatus::Success) || (Result.Status == PutStatus::Conflict));
+ if (Result.Details)
+ {
+ bWriteAllDetails = true;
+ }
}
ResponseObject.EndArray();
+ if (bWriteAllDetails)
+ {
+ ResponseObject.BeginArray("Details"sv);
+ for (const ZenCacheStore::PutResult& Result : Results)
+ {
+ ResponseObject.AddObject(Result.Details);
+ }
+ ResponseObject.EndArray();
+ }
+
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
return RpcResponse;
}
-PutStatus
+ZenCacheStore::PutResult
CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
{
CbObjectView Record = Request.RecordObject;
@@ -420,7 +437,9 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (Count.Invalid > 0)
{
- return PutStatus::Invalid;
+ CbObjectWriter DetailWriter;
+ DetailWriter.AddString("Message", fmt::format("Found {}/{} invalid attachments", Count.Invalid, Count.Total));
+ return ZenCacheStore::PutResult{PutStatus::Invalid, DetailWriter.Save()};
}
ZenCacheValue CacheValue;
@@ -440,7 +459,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
nullptr);
if (PutResult.Status != zen::PutStatus::Success)
{
- return PutResult.Status;
+ return PutResult;
}
m_CacheStats.WriteCount++;
@@ -478,7 +497,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
.Key = Request.Key,
.ValueContentIds = std::move(ValidAttachments)});
}
- return PutStatus::Success;
+ return PutResult;
}
CbPackage
@@ -841,14 +860,10 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
Key.Hash);
}
}
- if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ if (!Value.Exists)
{
Request.Complete = false;
}
- // Request.Complete does not need to be set to false for upstream SkipData attachments.
- // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment
- // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of
- // any missing SkipData attachments.
}
Request.ElapsedTimeUs += Timer.GetElapsedTimeUs();
}
@@ -864,7 +879,9 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
ResponsePackage.ReserveAttachments(Requests.size());
+ eastl::fixed_vector<size_t, 4> IncompleteResultIndexes;
ResponseObject.BeginArray("Result"sv);
+ size_t ResultIndex = 0;
for (RecordRequestData& Request : Requests)
{
const CacheKey& Key = Request.Key;
@@ -880,6 +897,12 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
}
}
+ if (!Request.Complete)
+ {
+ // If requesting a partial record, report back that the record overall is incomplete to the client
+ IncompleteResultIndexes.push_back(ResultIndex);
+ }
+
ZEN_DEBUG("GETCACHERECORD HIT - '{}/{}/{}' {}{} ({}) in {}",
*Namespace,
Key.Bucket,
@@ -916,8 +939,37 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
m_CacheStats.MissCount++;
}
}
+ ++ResultIndex;
}
ResponseObject.EndArray();
+
+ if (!IncompleteResultIndexes.empty())
+ {
+ size_t IndexIntoIncompleteResultArray = 0;
+ size_t IncompleteResultIndex = IncompleteResultIndexes[IndexIntoIncompleteResultArray];
+ ResultIndex = 0;
+ ResponseObject.BeginArray("Incomplete"sv);
+ for (ResultIndex = 0; ResultIndex < Requests.size(); ++ResultIndex)
+ {
+ if (IncompleteResultIndex == ResultIndex)
+ {
+ ResponseObject.AddBool(true);
+ if (++IndexIntoIncompleteResultArray >= IncompleteResultIndexes.size())
+ {
+ IncompleteResultIndex = (size_t)-1;
+ }
+ else
+ {
+ IncompleteResultIndex = IncompleteResultIndexes[IndexIntoIncompleteResultArray];
+ }
+ }
+ else
+ {
+ ResponseObject.AddBool(true);
+ }
+ }
+ ResponseObject.EndArray();
+ }
ResponsePackage.SetObject(ResponseObject.Save());
return ResponsePackage;
}
@@ -1048,7 +1100,9 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
}
else
{
- Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)});
+ CbObjectWriter DetailWriter;
+ DetailWriter.AddString("Message", fmt::format("Missing attachment with raw hash {}", RawHash));
+ Results.push_back({zen::PutStatus::Fail, DetailWriter.Save()});
}
}
// We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
@@ -1099,29 +1153,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
CbObjectWriter ResponseObject{1024};
ResponseObject.BeginArray("Result"sv);
- bool bAnyErrors = false;
+ bool bWriteAllDetails = false;
for (const ZenCacheStore::PutResult& Value : Results)
{
- if (Value.Status == zen::PutStatus::Success)
+ // Conflicts will be treated for response purposes
+ // as successful puts because the key is present in the cache.
+ if ((Value.Status == zen::PutStatus::Success) || (Value.Status == zen::PutStatus::Conflict))
{
ResponseObject.AddBool(true);
}
else
{
- bAnyErrors = true;
ResponseObject.AddBool(false);
}
+ // If one result has details, we must write all details
+ if (Value.Details)
+ {
+ bWriteAllDetails = true;
+ }
}
ResponseObject.EndArray();
- if (bAnyErrors)
+ if (bWriteAllDetails)
{
- ResponseObject.BeginArray("ErrorMessages"sv);
+ ResponseObject.BeginArray("Details"sv);
for (const ZenCacheStore::PutResult& Value : Results)
{
- if (Value.Status != zen::PutStatus::Success)
- {
- ResponseObject.AddString(Value.Message);
- }
+ ResponseObject.AddObject(Value.Details);
}
ResponseObject.EndArray();
}
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 3f27e6d21..b58f70ea7 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -730,7 +730,9 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (IsKnownBadBucketName(Bucket))
{
m_RejectedWriteCount++;
- return PutResult{zen::PutStatus::Invalid, "Bad bucket name"};
+ CbObjectWriter DetailWriter;
+ DetailWriter.AddString("Message", "Bad bucket name");
+ return PutResult{zen::PutStatus::Invalid, DetailWriter.Save()};
}
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -777,7 +779,10 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
Namespace,
Bucket,
HashKey.ToHexString());
- return PutResult{zen::PutStatus::Fail, fmt::format("Unknown namespace '{}'", Namespace)};
+
+ CbObjectWriter DetailWriter;
+ DetailWriter.AddString("Message", fmt::format("Unknown namespace '{}'", Namespace));
+ return PutResult{zen::PutStatus::Fail, DetailWriter.Save()};
}
bool