aboutsummaryrefslogtreecommitdiff
path: root/src/zenstore/cache/cacherpc.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenstore/cache/cacherpc.cpp')
-rw-r--r--src/zenstore/cache/cacherpc.cpp234
1 files changed, 152 insertions, 82 deletions
diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp
index de4b0a37c..5d9a68919 100644
--- a/src/zenstore/cache/cacherpc.cpp
+++ b/src/zenstore/cache/cacherpc.cpp
@@ -334,13 +334,13 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
.Policy = std::move(Policy),
.Context = Context};
- PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
+ PutStatus Result = PutCacheRecord(PutRequest, &BatchRequest);
- if (Result == PutResult::Invalid)
+ if (Result == PutStatus::Invalid)
{
return CbPackage{};
}
- Results.push_back(Result == PutResult::Success);
+ Results.push_back(Result == PutStatus::Success);
}
if (Results.empty())
{
@@ -360,7 +360,7 @@ CacheRpcHandler::HandleRpcPutCacheRecords(const CacheRequestContext& Context, co
return RpcResponse;
}
-PutResult
+PutStatus
CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
{
CbObjectView Record = Request.RecordObject;
@@ -422,14 +422,28 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
if (Count.Invalid > 0)
{
- return PutResult::Invalid;
+ return PutStatus::Invalid;
}
ZenCacheValue CacheValue;
CacheValue.Value = IoBuffer(Record.GetSize());
Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Context, Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue, ReferencedAttachments, nullptr);
+ bool Overwrite = EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreLocal) &&
+ !EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal);
+ // TODO: Propagation for rejected PUTs
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Request.Context,
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ CacheValue,
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return PutResult.Status;
+ }
m_CacheStats.WriteCount++;
if (!WriteAttachmentBuffers.empty())
@@ -466,7 +480,7 @@ CacheRpcHandler::PutCacheRecord(PutRequestData& Request, const CbPackage* Packag
.Key = Request.Key,
.ValueContentIds = std::move(ValidAttachments)});
}
- return PutResult::Success;
+ return PutStatus::Success;
}
CbPackage
@@ -765,18 +779,24 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb
EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
+ bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::QueryLocal);
std::vector<IoHash> ReferencedAttachments;
ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
const IoHash ValueHash = HashView.AsHash();
ReferencedAttachments.push_back(ValueHash);
});
- m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = {Request.RecordCacheValue}},
- ReferencedAttachments,
- nullptr);
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = {Request.RecordCacheValue}},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return;
+ }
m_CacheStats.WriteCount++;
}
ParseValues(Request);
@@ -924,10 +944,10 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
const bool HasUpstream = m_UpstreamCache.IsActive();
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
- std::vector<bool> BatchResults;
- eastl::fixed_vector<size_t, 32> BatchResultIndexes;
- eastl::fixed_vector<bool, 32> Results;
- eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
+ std::vector<ZenCacheStore::PutResult> BatchResults;
+ eastl::fixed_vector<size_t, 32> BatchResultIndexes;
+ eastl::fixed_vector<ZenCacheStore::PutResult, 32> Results;
+ eastl::fixed_vector<CacheKey, 32> UpstreamCacheKeys;
uint64_t RequestCount = RequestsArray.Num();
{
@@ -977,34 +997,39 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
{
- IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ bool Overwrite = !EnumHasAllFlags(Policy, CachePolicy::QueryLocal);
+ IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
Value.SetContentType(ZenContentType::kCompressedBinary);
if (RawSize == 0)
{
RawSize = Chunk.DecodeRawSize();
}
- m_CacheStore.Put(Context,
- *Namespace,
- Key.Bucket,
- Key.Hash,
- {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
- {},
- Batch.get());
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ *Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Value, .RawSize = RawSize, .RawHash = RawHash},
+ {},
+ Overwrite,
+ Batch.get());
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
if (Batch)
{
BatchResultIndexes.push_back(Results.size());
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail});
}
else
{
- Results.push_back(true);
+ Results.push_back(PutResult);
}
TransferredSize = Chunk.GetCompressedSize();
}
else
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
}
Valid = true;
}
@@ -1020,12 +1045,12 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
if (m_CacheStore.Get(Context, *Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
IsCompressedBinary(ExistingValue.Value.GetContentType()))
{
- Results.push_back(true);
+ Results.push_back({zen::PutStatus::Success});
Valid = true;
}
else
{
- Results.push_back(false);
+ Results.push_back({zen::PutStatus::Fail, fmt::format("Missing attachment with raw hash {}", RawHash)});
}
}
// We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
@@ -1060,13 +1085,13 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
{
size_t BatchResultIndex = BatchResultIndexes[Index];
ZEN_ASSERT(BatchResultIndex < Results.size());
- ZEN_ASSERT(Results[BatchResultIndex] == false);
+ ZEN_ASSERT(Results[BatchResultIndex].Status != zen::PutStatus::Success);
Results[BatchResultIndex] = BatchResults[Index];
}
for (std::size_t Index = 0; Index < Results.size(); Index++)
{
- if (Results[Index] && UpstreamCacheKeys[Index] != CacheKey::Empty)
+ if ((Results[Index].Status == zen::PutStatus::Success) && UpstreamCacheKeys[Index] != CacheKey::Empty)
{
m_UpstreamCache.EnqueueUpstream(
{.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = UpstreamCacheKeys[Index]});
@@ -1076,11 +1101,32 @@ CacheRpcHandler::HandleRpcPutCacheValues(const CacheRequestContext& Context, con
ZEN_TRACE_CPU("Z$::RpcPutCacheValues::Response");
CbObjectWriter ResponseObject{1024};
ResponseObject.BeginArray("Result"sv);
- for (bool Value : Results)
+ bool bAnyErrors = false;
+ for (const ZenCacheStore::PutResult& Value : Results)
{
- ResponseObject.AddBool(Value);
+ if (Value.Status == zen::PutStatus::Success)
+ {
+ ResponseObject.AddBool(true);
+ }
+ else
+ {
+ bAnyErrors = true;
+ ResponseObject.AddBool(false);
+ }
}
ResponseObject.EndArray();
+ if (bAnyErrors)
+ {
+ ResponseObject.BeginArray("ErrorMessages"sv);
+ for (const ZenCacheStore::PutResult& Value : Results)
+ {
+ if (Value.Status != zen::PutStatus::Success)
+ {
+ ResponseObject.AddString(Value.Message);
+ }
+ }
+ ResponseObject.EndArray();
+ }
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
@@ -1239,6 +1285,7 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
const bool HasData = IsCompressedBinary(Params.Value.GetContentType());
const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData);
const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ const bool Overwrite = StoreData && !EnumHasAllFlags(Request.Policy, CachePolicy::QueryLocal);
const bool IsHit = SkipData || HasData;
if (IsHit)
{
@@ -1249,14 +1296,19 @@ CacheRpcHandler::HandleRpcGetCacheValues(const CacheRequestContext& Context, CbO
if (HasData && StoreData)
{
- m_CacheStore.Put(Context,
- *Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
- {},
- nullptr);
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(
+ Context,
+ *Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ZenCacheValue{.Value = Params.Value, .RawSize = Request.RawSize, .RawHash = Request.RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
}
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
@@ -1527,36 +1579,48 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context,
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](
- CacheRecordGetCompleteParams&& Params) {
- if (!Params.Record)
- {
- return;
- }
- CacheKeyRequest& RecordKey = Params.Request;
- size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
- RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
- RecordBody& Record = Records[RecordIndex];
-
- const CacheKey& Key = RecordKey.Key;
- Record.Exists = true;
- CbObject ObjectBuffer = CbObject::Clone(Params.Record);
- Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
- Record.CacheValue.SetContentType(ZenContentType::kCbObject);
- Record.Source = Params.Source;
-
- bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
- if (StoreLocal)
- {
- std::vector<IoHash> ReferencedAttachments;
- ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- ReferencedAttachments.push_back(ValueHash);
- });
- m_CacheStore.Put(Context, Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}, ReferencedAttachments, nullptr);
- m_CacheStats.WriteCount++;
- }
- };
+ const auto OnCacheRecordGetComplete =
+ [this, Namespace, &RecordKeys, &Records, &RecordRequests, Context](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
+ CacheKeyRequest& RecordKey = Params.Request;
+ size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey);
+ RecordRequests[RecordIndex]->ElapsedTimeUs += static_cast<uint64_t>(Params.ElapsedSeconds * 1000000.0);
+ RecordBody& Record = Records[RecordIndex];
+
+ const CacheKey& Key = RecordKey.Key;
+ Record.Exists = true;
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Record.CacheValue.SetContentType(ZenContentType::kCbObject);
+ Record.Source = Params.Source;
+
+ bool StoreLocal = EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
+ if (StoreLocal)
+ {
+ bool Overwrite = !EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal);
+ std::vector<IoHash> ReferencedAttachments;
+ ObjectBuffer.IterateAttachments([&ReferencedAttachments](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ ReferencedAttachments.push_back(ValueHash);
+ });
+ ZenCacheStore::PutResult PutResult = m_CacheStore.Put(Context,
+ Namespace,
+ Key.Bucket,
+ Key.Hash,
+ {.Value = Record.CacheValue},
+ ReferencedAttachments,
+ Overwrite,
+ nullptr);
+ if (PutResult.Status != zen::PutStatus::Success)
+ {
+ return;
+ }
+ m_CacheStats.WriteCount++;
+ }
+ };
m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
@@ -1774,20 +1838,26 @@ CacheRpcHandler::GetUpstreamCacheChunks(const CacheRequestContext& Context,
bool StoreLocal = EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) && AreDiskWritesAllowed();
if (StoreLocal)
{
+ bool Overwrite = !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::QueryLocal);
if (Request.IsRecordRequest)
{
m_CidStore.AddChunk(Params.Value, Params.RawHash);
}
else
{
- m_CacheStore.Put(Context,
- Namespace,
- Key.Key.Bucket,
- Key.Key.Hash,
- {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
- {},
- nullptr);
- m_CacheStats.WriteCount++;
+ ZenCacheStore::PutResult PutResult =
+ m_CacheStore.Put(Context,
+ Namespace,
+ Key.Key.Bucket,
+ Key.Key.Hash,
+ {.Value = Params.Value, .RawSize = Params.RawSize, .RawHash = Params.RawHash},
+ {},
+ Overwrite,
+ nullptr);
+ if (PutResult.Status == zen::PutStatus::Success)
+ {
+ m_CacheStats.WriteCount++;
+ }
}
}
if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))