aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzousar <[email protected]>2025-06-24 00:42:13 -0600
committerzousar <[email protected]>2025-06-24 00:42:13 -0600
commit081b55a5cf3d9af66d4d0be64fc38ea0055acede (patch)
treebb192956333884a7d724e9d924980912d7cf668b
parentEstablish TODOs and unit test for rejected PUT propagation (diff)
downloadzen-081b55a5cf3d9af66d4d0be64fc38ea0055acede.tar.xz
zen-081b55a5cf3d9af66d4d0be64fc38ea0055acede.zip
Change to PutResult structure
Result structure contains status and a string message (may be empty)
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp89
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp67
-rw-r--r--src/zenstore/cache/cacherpc.cpp157
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp20
-rw-r--r--src/zenstore/include/zenstore/cache/cachedisklayer.h32
-rw-r--r--src/zenstore/include/zenstore/cache/cacherpc.h11
-rw-r--r--src/zenstore/include/zenstore/cache/cacheshared.h8
-rw-r--r--src/zenstore/include/zenstore/cache/structuredcachestore.h40
8 files changed, 258 insertions, 166 deletions
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index 932e456d2..224cc6678 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -996,9 +996,11 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (Success && StoreLocal)
{
- const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
- if (m_CacheStore
- .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr))
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore
+ .Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue, {}, Overwrite, nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
{
m_CacheStats.WriteCount++;
}
@@ -1086,15 +1088,16 @@ HttpStructuredCacheService::HandleGetCacheRecord(HttpServerRequest& Request, con
if (StoreLocal)
{
- const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
- if (m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- CacheValue,
- ReferencedAttachments,
- Overwrite,
- nullptr))
+ const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
{
m_CacheStats.WriteCount++;
@@ -1204,6 +1207,24 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
return Request.WriteResponse(HttpResponseCode::InsufficientStorage);
}
+ auto WriteFailureResponse = [&Request](const ZenCacheStore::PutResult& PutResult) {
+ ZEN_UNUSED(PutResult);
+
+ HttpResponseCode ResponseCode = HttpResponseCode::InternalServerError;
+ switch (PutResult.Status)
+ {
+ case zen::PutStatus::Conflict:
+ ResponseCode = HttpResponseCode::Conflict;
+ break;
+ case zen::PutStatus::Invalid:
+ ResponseCode = HttpResponseCode::BadRequest;
+ break;
+ }
+
+ return PutResult.Message.empty() ? Request.WriteResponse(ResponseCode)
+ : Request.WriteResponse(ResponseCode, zen::HttpContentType::kText, PutResult.Message);
+ };
+
const HttpContentType ContentType = Request.RequestContentType();
Body.SetContentType(ContentType);
@@ -1234,16 +1255,17 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
}
const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
// TODO: Propagation for rejected PUTs
- if (!m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
- {},
- Overwrite,
- nullptr))
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
{
- return Request.WriteResponse(HttpResponseCode::Conflict);
+ return WriteFailureResponse(PutResult);
}
m_CacheStats.WriteCount++;
@@ -1296,16 +1318,17 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
const bool Overwrite = !EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal);
// TODO: Propagation for rejected PUTs
- if (!m_CacheStore.Put(RequestContext,
- Ref.Namespace,
- Ref.BucketSegment,
- Ref.HashKey,
- {.Value = Body},
- ReferencedAttachments,
- Overwrite,
- nullptr))
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(RequestContext,
+ Ref.Namespace,
+ Ref.BucketSegment,
+ Ref.HashKey,
+ {.Value = Body},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
{
- return Request.WriteResponse(HttpResponseCode::Conflict);
+ return WriteFailureResponse(PutResult);
}
m_CacheStats.WriteCount++;
@@ -1405,9 +1428,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(HttpServerRequest& Request, con
CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
// TODO: Propagation for rejected PUTs
- if (!m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite))
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore.Put(RequestContext, Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue, ReferencedAttachments, Overwrite);
+ if (PutResult.Status != zen::PutStatus::Success)
{
- return Request.WriteResponse(HttpResponseCode::Conflict);
+ return WriteFailureResponse(PutResult);
}
m_CacheStats.WriteCount++;
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index d3748e70f..72a767645 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -234,6 +234,25 @@ using namespace std::literals;
namespace zen::cache::impl {
static bool
+GetValueRawSizeAndHash(const ZenCacheValue& Value, uint64_t& OutValueRawSize, IoHash& OutValueRawHash)
+{
+ if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero))
+ {
+ OutValueRawSize = Value.RawSize;
+ OutValueRawHash = Value.RawHash;
+ return true;
+ }
+ else if (Value.Value.GetContentType() == ZenContentType::kCompressedBinary)
+ {
+ return CompressedBuffer::ValidateCompressedHeader(Value.Value, OutValueRawHash, OutValueRawSize);
+ }
+
+ OutValueRawSize = Value.Value.GetSize();
+ OutValueRawHash = IoHash::HashBuffer(Value.Value);
+ return true;
+}
+
+static bool
ValueMatchesRawSizeAndHash(const ZenCacheValue& Value, uint64_t RawSize, const std::function<IoHash()>& RawHashProvider)
{
if ((Value.RawSize != 0) || (Value.RawHash != IoHash::Zero))
@@ -1257,7 +1276,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc,
struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
{
- PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
struct Entry
{
std::vector<IoHash> HashKeyAndReferences;
@@ -1266,11 +1285,11 @@ struct ZenCacheDiskLayer::CacheBucket::PutBatchHandle
std::vector<Entry> Entries;
std::vector<size_t> EntryResultIndexes;
- std::vector<bool>& OutResults;
+ std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
};
ZenCacheDiskLayer::CacheBucket::PutBatchHandle*
-ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<bool>& OutResults)
+ZenCacheDiskLayer::CacheBucket::BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResults)
{
ZEN_TRACE_CPU("Z$::Bucket::BeginPutBatch");
return new PutBatchHandle(OutResults);
@@ -1350,7 +1369,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept
{
size_t ResultIndex = Batch->EntryResultIndexes[IndexOffset + Index];
ZEN_ASSERT(ResultIndex < Batch->OutResults.size());
- Batch->OutResults[ResultIndex] = true;
+ Batch->OutResults[ResultIndex] = {zen::PutStatus::Success};
}
IndexOffset += Locations.size();
});
@@ -1829,7 +1848,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal
}
}
-bool
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
@@ -1862,9 +1881,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
{
if (OptionalBatchHandle)
{
- OptionalBatchHandle->OutResults.push_back(false);
+ OptionalBatchHandle->OutResults.push_back(PutResult{
+ zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)});
}
- return false;
+ return PutResult{
+ zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", MetaData.RawSize, MetaData.RawHash)};
}
ComparisonComplete = true;
}
@@ -1876,11 +1899,17 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
ZenCacheValue ExistingValue;
if (Get(HashKey, ExistingValue) && !cache::impl::ValueMatchesValue(Value, ExistingValue))
{
+ uint64_t RawSize = 0;
+ IoHash RawHash = IoHash::Zero;
+ cache::impl::GetValueRawSizeAndHash(ExistingValue, RawSize, RawHash);
if (OptionalBatchHandle)
{
- OptionalBatchHandle->OutResults.push_back(false);
+ OptionalBatchHandle->OutResults.push_back(
+ PutResult{zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)});
}
- return false;
+ return PutResult{zen::PutStatus::Conflict,
+ fmt::format("Value exists with different size '{}' or hash '{}'", RawSize, RawHash)};
}
}
}
@@ -1891,7 +1920,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
PutStandaloneCacheValue(HashKey, Value, References);
if (OptionalBatchHandle)
{
- OptionalBatchHandle->OutResults.push_back(true);
+ OptionalBatchHandle->OutResults.push_back({zen::PutStatus::Success});
}
}
else
@@ -1900,7 +1929,7 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey,
}
m_DiskWriteCount++;
- return true;
+ return PutResult{zen::PutStatus::Success};
}
uint64_t
@@ -2742,7 +2771,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey,
OptionalBatchHandle->Buffers.push_back(Value.Value);
OptionalBatchHandle->Entries.push_back({});
OptionalBatchHandle->EntryResultIndexes.push_back(OptionalBatchHandle->OutResults.size());
- OptionalBatchHandle->OutResults.push_back(false);
+ OptionalBatchHandle->OutResults.push_back(PutResult{zen::PutStatus::Fail});
std::vector<IoHash>& HashKeyAndReferences = OptionalBatchHandle->Entries.back().HashKeyAndReferences;
HashKeyAndReferences.reserve(1 + HashKeyAndReferences.size());
HashKeyAndReferences.push_back(HashKey);
@@ -3717,7 +3746,7 @@ ZenCacheDiskLayer::GetOrCreateBucket(std::string_view InBucket)
struct ZenCacheDiskLayer::PutBatchHandle
{
- PutBatchHandle(std::vector<bool>& OutResults) : OutResults(OutResults) {}
+ PutBatchHandle(std::vector<ZenCacheDiskLayer::PutResult>& OutResults) : OutResults(OutResults) {}
struct BucketHandle
{
CacheBucket* Bucket;
@@ -3776,13 +3805,13 @@ struct ZenCacheDiskLayer::PutBatchHandle
return NewBucketHandle;
}
- RwLock Lock;
- std::vector<BucketHandle> BucketHandles;
- std::vector<bool>& OutResults;
+ RwLock Lock;
+ std::vector<BucketHandle> BucketHandles;
+ std::vector<ZenCacheDiskLayer::PutResult>& OutResults;
};
ZenCacheDiskLayer::PutBatchHandle*
-ZenCacheDiskLayer::BeginPutBatch(std::vector<bool>& OutResults)
+ZenCacheDiskLayer::BeginPutBatch(std::vector<PutResult>& OutResults)
{
return new PutBatchHandle(OutResults);
}
@@ -3919,7 +3948,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
}
}
-bool
+ZenCacheDiskLayer::PutResult
ZenCacheDiskLayer::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
@@ -3928,7 +3957,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket,
PutBatchHandle* OptionalBatchHandle)
{
ZEN_TRACE_CPU("Z$::Put");
- bool RetVal = false;
+ PutResult RetVal = {zen::PutStatus::Fail};
if (CacheBucket* Bucket = GetOrCreateBucket(InBucket); Bucket != nullptr)
{
CacheBucket::PutBatchHandle* BucketBatchHandle = OptionalBatchHandle == nullptr ? nullptr : OptionalBatchHandle->GetHandle(Bucket);
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 5b36437f2..20c244250 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -327,13 +327,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
.Policy = std::move(Policy),
.Context = Context};
- PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
+ PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest);
- if (Result == PutResult::Invalid)
+ if (Result == PutStatus::Invalid)
{
return CbPackage{};
}
- Results.push_back(Result == PutResult::Success);
+ Results.push_back(Result == PutStatus::Success);
}
if (Results.empty())
{
@@ -353,7 +353,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
return RpcResponse;
}
-PutResult
+PutStatus
CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
{
CbObjectView Record = Request.RecordObject;
@@ -415,7 +415,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (Count.Invalid > 0)
{
- return PutResult::Invalid;
+ return PutStatus::Invalid;
}
ZenCacheValue CacheValue;
@@ -425,16 +425,17 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) &&
!EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal);
// TODO: Propagation for rejected PUTs
- if (!m_CacheStore.Put(Request.Context,
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- CacheValue,
- ReferencedAttachments,
- Overwrite,
- nullptr))
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context,
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
{
- return PutResult::Conflict;
+ return PutResult.Status;
}
m_CacheStats.WriteCount++;
@@ -472,7 +473,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
.Key = Request.Key,
.ValueContentIds = std::move(ValidAttachments)});
}
- return PutResult::Success;
+ return PutStatus::Success;
}
CbPackage
@@ -772,14 +773,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
const IoHash ValueHash = HashView.AsHash();
ReferencedAttachments.push_back(ValueHash);
});
- if (!m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = {Request.RecordCacheValue}},
- ReferencedAttachments,
- Overwrite,
- nullptr))
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = {Request.RecordCacheValue}},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
{
return;
}
@@ -928,11 +930,11 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<bool> BatchResults;
- std::vector<size_t> BatchResultIndexes;
- std::vector<bool> Results;
- std::vector<CacheKey> UpstreamCacheKeys;
- uint64_t RequestCount = RequestsArray.Num();
+ std::vector<ZenCacheStore::PutResult> BatchResults;
+ std::vector<size_t> BatchResultIndexes;
+ std::vector<ZenCacheStore::PutResult> Results;
+ std::vector<CacheKey> UpstreamCacheKeys;
+ uint64_t RequestCount = RequestsArray.Num();
{
Results.reserve(RequestCount);
std::unique_ptr<ZenCacheStore::PutBatch> Batch;
@@ -987,32 +989,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
{
RawSize = Chunk.DecodeRawSize();
}
- bool PutSucceeded = m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
- {},
- Overwrite,
- Batch.get());
- if (PutSucceeded)
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ Batch.get());
+ if (PutResult.Status == zen::PutStatus::Success)
{
m_CacheStats.WriteCount++;
}
if (Batch)
{
BatchResultIndexes.push_back(Results.size());
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail});
}
else
{
- Results.push_back(PutSucceeded);
+ Results.push_back(PutResult);
}
TransferredSize = Chunk.GetCompressedSize();
}
else
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
}
Valid = true;
}
@@ -1028,12 +1030,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
IsCompressedBinary(ExistingValue.Value.GetContentType()))
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
Valid = true;
}
else
{
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)});
}
}
// We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
@@ -1068,13 +1070,13 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
{
size_t BatchResultIndex = BatchResultIndexes[Index];
ZEN_ASSERT(BatchResultIndex < Results.size());
- ZEN_ASSERT(Results[BatchResultIndex] == false);
+ ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success);
Results[BatchResultIndex] = BatchResults[Index];
}
for (std::size_t Index = 0; Index < Results.size(); Index++)
{
- if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty)
+ if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty)
{
m_UpstreamCache.EnqueueUpstream(
{.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
@@ -1084,11 +1086,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
CbObjectWriter ResponseObject{1024};
ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
+ bool bAnyErrors = false;
+ for (const ZenCacheStore::PutResult& Value : Results)
{
- ResponseObject.AddBool(Value);
+ if (Value.Status == zen::PutStatus::Success)
+ {
+ ResponseObject.AddBool(true);
+ }
+ else
+ {
+ bAnyErrors = true;
+ ResponseObject.AddBool(false);
+ }
}
ResponseObject.EndArray();
+ if (bAnyErrors)
+ {
+ ResponseObject.BeginArray("ErrorMessages"sv);
+ for (const ZenCacheStore::PutResult& Value : Results)
+ {
+ if (Value.Status != zen::PutStatus::Success)
+ {
+ ResponseObject.AddString(Value.Message);
+ }
+ }
+ ResponseObject.EndArray();
+ }
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
@@ -1259,15 +1282,16 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
if (HasData && StoreData)
{
- if (m_CacheStore.Put(
- Context,
- *Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
- {},
- Overwrite,
- nullptr))
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(
+ Context,
+ *Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
{
m_CacheStats.WriteCount++;
}
@@ -1565,14 +1589,15 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
const IoHash ValueHash = HashView.AsHash();
ReferencedAttachments.push_back(ValueHash);
});
- if (!m_CacheStore.Put(Context,
- Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = Record.CacheValue},
- ReferencedAttachments,
- Overwrite,
- nullptr))
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Record.CacheValue},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
{
return;
}
@@ -1794,14 +1819,16 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
}
else
{
- if (m_CacheStore.Put(Context,
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore.Put(Context,
Namespace,
Key.Key.Bucket,
Key.Key.Hash,
{.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
{},
Overwrite,
- nullptr))
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
{
m_CacheStats.WriteCount++;
}
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index 25fd23b93..a3f80099f 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -154,7 +154,7 @@ struct ZenCacheNamespace::PutBatchHandle
};
ZenCacheNamespace::PutBatchHandle*
-ZenCacheNamespace::BeginPutBatch(std::vector<bool>& OutResult)
+ZenCacheNamespace::BeginPutBatch(std::vector<PutResult>& OutResult)
{
ZenCacheNamespace::PutBatchHandle* Handle = new ZenCacheNamespace::PutBatchHandle;
Handle->DiskLayerHandle = m_DiskLayer.BeginPutBatch(OutResult);
@@ -252,7 +252,7 @@ ZenCacheNamespace::Get(std::string_view InBucket, const IoHash& HashKey, GetBatc
return;
}
-bool
+ZenCacheNamespace::PutResult
ZenCacheNamespace::Put(std::string_view InBucket,
const IoHash& HashKey,
const ZenCacheValue& Value,
@@ -269,8 +269,8 @@ ZenCacheNamespace::Put(std::string_view InBucket,
ZEN_ASSERT(Value.Value.Size());
ZenCacheDiskLayer::PutBatchHandle* DiskLayerBatchHandle = OptionalBatchHandle ? OptionalBatchHandle->DiskLayerHandle : nullptr;
- bool RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle);
- if (RetVal)
+ PutResult RetVal = m_DiskLayer.Put(InBucket, HashKey, Value, References, Overwrite, DiskLayerBatchHandle);
+ if (RetVal.Status == zen::PutStatus::Success)
{
m_WriteCount++;
}
@@ -558,7 +558,7 @@ ZenCacheStore::LogWorker()
}
}
-ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<bool>& OutResult)
+ZenCacheStore::PutBatch::PutBatch(ZenCacheStore& CacheStore, std::string_view InNamespace, std::vector<PutResult>& OutResult)
: m_CacheStore(CacheStore)
{
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -721,7 +721,7 @@ ZenCacheStore::Get(const CacheRequestContext& Context,
m_MissCount++;
}
-bool
+ZenCacheStore::PutResult
ZenCacheStore::Put(const CacheRequestContext& Context,
std::string_view Namespace,
std::string_view Bucket,
@@ -736,7 +736,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (IsKnownBadBucketName(Bucket))
{
m_RejectedWriteCount++;
- return false;
+ return PutResult{zen::PutStatus::Invalid, "Bad bucket name"};
}
ZEN_MEMSCOPE(GetCacheStoreTag());
@@ -766,8 +766,8 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
if (ZenCacheNamespace* Store = GetNamespace(Namespace); Store)
{
ZenCacheNamespace::PutBatchHandle* BatchHandle = OptionalBatchHandle ? OptionalBatchHandle->m_NamespaceBatchHandle : nullptr;
- bool RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle);
- if (RetVal)
+ PutResult RetVal = Store->Put(Bucket, HashKey, Value, References, Overwrite, BatchHandle);
+ if (RetVal.Status == zen::PutStatus::Success)
{
m_WriteCount++;
}
@@ -783,7 +783,7 @@ ZenCacheStore::Put(const CacheRequestContext& Context,
Namespace,
Bucket,
HashKey.ToHexString());
- return false;
+ return PutResult{zen::PutStatus::Fail, fmt::format("Unknown namespace '{}'", Namespace)};
}
bool
diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h
index 239b0d1aa..4f5c905ee 100644
--- a/src/zenstore/include/zenstore/cache/cachedisklayer.h
+++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h
@@ -166,6 +166,12 @@ public:
uint64_t MemorySize;
};
+ struct PutResult
+ {
+ zen::PutStatus Status;
+ std::string Message;
+ };
+
explicit ZenCacheDiskLayer(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheDiskLayer();
@@ -176,19 +182,19 @@ public:
void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& BatchHandle);
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
+ PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResult);
void EndPutBatch(PutBatchHandle* Batch) noexcept;
- bool Put(std::string_view Bucket,
- const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- bool Overwrite,
- PutBatchHandle* OptionalBatchHandle);
- bool Drop();
- bool DropBucket(std::string_view Bucket);
- void Flush();
- void ScrubStorage(ScrubContext& Ctx);
+ PutResult Put(std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle);
+ bool Drop();
+ bool DropBucket(std::string_view Bucket);
+ void Flush();
+ void ScrubStorage(ScrubContext& Ctx);
void DiscoverBuckets();
GcStorageSize StorageSize() const;
@@ -230,9 +236,9 @@ public:
void Get(const IoHash& HashKey, GetBatchHandle& BatchHandle);
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResult);
+ PutBatchHandle* BeginPutBatch(std::vector<ZenCacheDiskLayer::PutResult>& OutResult);
void EndPutBatch(PutBatchHandle* Batch) noexcept;
- bool Put(const IoHash& HashKey,
+ PutResult Put(const IoHash& HashKey,
const ZenCacheValue& Value,
std::span<IoHash> References,
bool Overwrite,
diff --git a/src/zenstore/include/zenstore/cache/cacherpc.h b/src/zenstore/include/zenstore/cache/cacherpc.h
index d489a5386..104746aba 100644
--- a/src/zenstore/include/zenstore/cache/cacherpc.h
+++ b/src/zenstore/include/zenstore/cache/cacherpc.h
@@ -4,6 +4,7 @@
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
+#include <zenstore/cache/cacheshared.h>
#include <zenutil/cache/cache.h>
#include <atomic>
@@ -56,14 +57,6 @@ struct CacheStats
std::atomic_uint64_t RpcChunkBatchRequests{};
};
-enum class PutResult
-{
- Success,
- Fail,
- Conflict,
- Invalid,
-};
-
/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers.
@@ -108,7 +101,7 @@ private:
CbPackage HandleRpcGetCacheValues(const CacheRequestContext& Context, CbObjectView BatchRequest);
CbPackage HandleRpcGetCacheChunks(const CacheRequestContext& Context, RpcAcceptOptions AcceptOptions, CbObjectView BatchRequest);
- PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
+ PutStatus PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
/** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
bool ParseGetCacheChunksRequest(std::string& Namespace,
diff --git a/src/zenstore/include/zenstore/cache/cacheshared.h b/src/zenstore/include/zenstore/cache/cacheshared.h
index 9b45c7b21..dc0c341d0 100644
--- a/src/zenstore/include/zenstore/cache/cacheshared.h
+++ b/src/zenstore/include/zenstore/cache/cacheshared.h
@@ -65,6 +65,14 @@ struct CacheContentStats
std::vector<IoHash> Attachments;
};
+enum class PutStatus
+{
+ Success,
+ Fail,
+ Conflict,
+ Invalid,
+};
+
bool IsKnownBadBucketName(std::string_view BucketName);
bool ValidateIoBuffer(ZenContentType ContentType, IoBuffer Buffer);
diff --git a/src/zenstore/include/zenstore/cache/structuredcachestore.h b/src/zenstore/include/zenstore/cache/structuredcachestore.h
index 1b5e0b76b..581f7861b 100644
--- a/src/zenstore/include/zenstore/cache/structuredcachestore.h
+++ b/src/zenstore/include/zenstore/cache/structuredcachestore.h
@@ -78,25 +78,27 @@ public:
ZenCacheDiskLayer::DiskStats DiskStats;
};
+ using PutResult = ZenCacheDiskLayer::PutResult;
+
ZenCacheNamespace(GcManager& Gc, JobQueue& JobQueue, const std::filesystem::path& RootDir, const Configuration& Config);
~ZenCacheNamespace();
struct PutBatchHandle;
- PutBatchHandle* BeginPutBatch(std::vector<bool>& OutResults);
+ PutBatchHandle* BeginPutBatch(std::vector<PutResult>& OutResults);
void EndPutBatch(PutBatchHandle* Batch) noexcept;
struct GetBatchHandle;
GetBatchHandle* BeginGetBatch(std::vector<ZenCacheValue>& OutResults);
void EndGetBatch(GetBatchHandle* Batch) noexcept;
- bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
- void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle);
- bool Put(std::string_view Bucket,
- const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- bool Overwrite,
- PutBatchHandle* OptionalBatchHandle = nullptr);
+ bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Get(std::string_view Bucket, const IoHash& HashKey, GetBatchHandle& OptionalBatchHandle);
+ PutResult Put(std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatchHandle* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Bucket);
void EnumerateBucketContents(std::string_view Bucket,
@@ -197,6 +199,8 @@ public:
std::vector<NamedNamespaceStats> NamespaceStats;
};
+ using PutResult = ZenCacheNamespace::PutResult;
+
ZenCacheStore(GcManager& Gc,
JobQueue& JobQueue,
const std::filesystem::path& BasePath,
@@ -207,7 +211,7 @@ public:
class PutBatch
{
public:
- PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<bool>& OutResult);
+ PutBatch(ZenCacheStore& CacheStore, std::string_view Namespace, std::vector<PutResult>& OutResult);
~PutBatch();
private:
@@ -244,14 +248,14 @@ public:
const IoHash& HashKey,
GetBatch& BatchHandle);
- bool Put(const CacheRequestContext& Context,
- std::string_view Namespace,
- std::string_view Bucket,
- const IoHash& HashKey,
- const ZenCacheValue& Value,
- std::span<IoHash> References,
- bool Overwrite,
- PutBatch* OptionalBatchHandle = nullptr);
+ PutResult Put(const CacheRequestContext& Context,
+ std::string_view Namespace,
+ std::string_view Bucket,
+ const IoHash& HashKey,
+ const ZenCacheValue& Value,
+ std::span<IoHash> References,
+ bool Overwrite,
+ PutBatch* OptionalBatchHandle = nullptr);
bool DropBucket(std::string_view Namespace, std::string_view Bucket);
bool DropNamespace(std::string_view Namespace);