aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cacherpc.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
-rw-r--r--src/zenstore/cache/cacherpc.cpp157
1 files changed, 92 insertions, 65 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index 5b36437f2..20c244250 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -327,13 +327,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
.Policy = std::move(Policy),
.Context = Context};
- PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
+ PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest);
- if (Result == PutResult::Invalid)
+ if (Result == PutStatus::Invalid)
{
return CbPackage{};
}
- Results.push_back(Result == PutResult::Success);
+ Results.push_back(Result == PutStatus::Success);
}
if (Results.empty())
{
@@ -353,7 +353,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
return RpcResponse;
}
-PutResult
+PutStatus
CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
{
CbObjectView Record = Request.RecordObject;
@@ -415,7 +415,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (Count.Invalid > 0)
{
- return PutResult::Invalid;
+ return PutStatus::Invalid;
}
ZenCacheValue CacheValue;
@@ -425,16 +425,17 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) &&
!EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal);
// TODO: Propagation for rejected PUTs
- if (!m_CacheStore.Put(Request.Context,
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- CacheValue,
- ReferencedAttachments,
- Overwrite,
- nullptr))
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context,
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
{
- return PutResult::Conflict;
+ return PutResult.Status;
}
m_CacheStats.WriteCount++;
@@ -472,7 +473,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
.Key = Request.Key,
.ValueContentIds = std::move(ValidAttachments)});
}
- return PutResult::Success;
+ return PutStatus::Success;
}
CbPackage
@@ -772,14 +773,15 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
const IoHash ValueHash = HashView.AsHash();
ReferencedAttachments.push_back(ValueHash);
});
- if (!m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = {Request.RecordCacheValue}},
- ReferencedAttachments,
- Overwrite,
- nullptr))
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = {Request.RecordCacheValue}},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
{
return;
}
@@ -928,11 +930,11 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<bool> BatchResults;
- std::vector<size_t> BatchResultIndexes;
- std::vector<bool> Results;
- std::vector<CacheKey> UpstreamCacheKeys;
- uint64_t RequestCount = RequestsArray.Num();
+ std::vector<ZenCacheStore::PutResult> BatchResults;
+ std::vector<size_t> BatchResultIndexes;
+ std::vector<ZenCacheStore::PutResult> Results;
+ std::vector<CacheKey> UpstreamCacheKeys;
+ uint64_t RequestCount = RequestsArray.Num();
{
Results.reserve(RequestCount);
std::unique_ptr<ZenCacheStore::PutBatch> Batch;
@@ -987,32 +989,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
{
RawSize = Chunk.DecodeRawSize();
}
- bool PutSucceeded = m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
- {},
- Overwrite,
- Batch.get());
- if (PutSucceeded)
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ Batch.get());
+ if (PutResult.Status == zen::PutStatus::Success)
{
m_CacheStats.WriteCount++;
}
if (Batch)
{
BatchResultIndexes.push_back(Results.size());
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail});
}
else
{
- Results.push_back(PutSucceeded);
+ Results.push_back(PutResult);
}
TransferredSize = Chunk.GetCompressedSize();
}
else
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
}
Valid = true;
}
@@ -1028,12 +1030,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
IsCompressedBinary(ExistingValue.Value.GetContentType()))
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
Valid = true;
}
else
{
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)});
}
}
// We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
@@ -1068,13 +1070,13 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
{
size_t BatchResultIndex = BatchResultIndexes[Index];
ZEN_ASSERT(BatchResultIndex < Results.size());
- ZEN_ASSERT(Results[BatchResultIndex] == false);
+ ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success);
Results[BatchResultIndex] = BatchResults[Index];
}
for (std::size_t Index = 0; Index < Results.size(); Index++)
{
- if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty)
+ if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty)
{
m_UpstreamCache.EnqueueUpstream(
{.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
@@ -1084,11 +1086,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
CbObjectWriter ResponseObject{1024};
ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
+ bool bAnyErrors = false;
+ for (const ZenCacheStore::PutResult& Value : Results)
{
- ResponseObject.AddBool(Value);
+ if (Value.Status == zen::PutStatus::Success)
+ {
+ ResponseObject.AddBool(true);
+ }
+ else
+ {
+ bAnyErrors = true;
+ ResponseObject.AddBool(false);
+ }
}
ResponseObject.EndArray();
+ if (bAnyErrors)
+ {
+ ResponseObject.BeginArray("ErrorMessages"sv);
+ for (const ZenCacheStore::PutResult& Value : Results)
+ {
+ if (Value.Status != zen::PutStatus::Success)
+ {
+ ResponseObject.AddString(Value.Message);
+ }
+ }
+ ResponseObject.EndArray();
+ }
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
@@ -1259,15 +1282,16 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
if (HasData && StoreData)
{
- if (m_CacheStore.Put(
- Context,
- *Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
- {},
- Overwrite,
- nullptr))
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(
+ Context,
+ *Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
{
m_CacheStats.WriteCount++;
}
@@ -1565,14 +1589,15 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
const IoHash ValueHash = HashView.AsHash();
ReferencedAttachments.push_back(ValueHash);
});
- if (!m_CacheStore.Put(Context,
- Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = Record.CacheValue},
- ReferencedAttachments,
- Overwrite,
- nullptr))
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Record.CacheValue},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
{
return;
}
@@ -1794,14 +1819,16 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
}
else
{
- if (m_CacheStore.Put(Context,
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore.Put(Context,
Namespace,
Key.Key.Bucket,
Key.Key.Hash,
{.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
{},
Overwrite,
- nullptr))
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
{
m_CacheStats.WriteCount++;
}