aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-05-19 11:37:25 +0200
committerDan Engelbrecht <[email protected]>2022-05-19 11:37:25 +0200
commita9130d34b5318b0da5d3547c432a8734213fbe9b (patch)
tree2cdb96f85e221cc24227b410d4d5f8f4e4af7a41 /zenserver/cache/structuredcache.cpp
parentMerge pull request #98 from EpicGames/de/fix-bucket-name-rules (diff)
downloadzen-a9130d34b5318b0da5d3547c432a8734213fbe9b.tar.xz
zen-a9130d34b5318b0da5d3547c432a8734213fbe9b.zip
Keep Namespace out of CacheKey and store it on request level
RPC requests now has a Namespace field under Params instead of one Namespace per cache key Fall back to legacy upstream HTTP URI format if default namespace is requested
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp221
1 files changed, 125 insertions, 96 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index bc6f31dd3..a349f13e1 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -70,6 +70,7 @@ struct AttachmentCount
struct PutRequestData
{
+ std::string Namespace;
CacheKey Key;
CbObjectView RecordObject;
CacheRecordPolicy Policy;
@@ -244,30 +245,27 @@ namespace {
}
}
- bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key)
+ std::optional<std::string> GetRpcRequestNamespace(const CbObjectView Params)
{
- CbFieldView NamespaceField = KeyView["Namespace"sv];
- std::optional<std::string> Namespace;
+ CbFieldView NamespaceField = Params["Namespace"sv];
if (!NamespaceField)
{
- Namespace = ZenCacheStore::DefaultNamespace;
+ return std::string(ZenCacheStore::DefaultNamespace);
}
- else
+
+ if (NamespaceField.HasError())
{
- if (NamespaceField.HasError())
- {
- return false;
- }
- if (!NamespaceField.IsString())
- {
- return false;
- }
- Namespace = GetValidNamespaceName(NamespaceField.AsString());
+ return {};
}
- if (!Namespace.has_value())
+ if (!NamespaceField.IsString())
{
- return false;
+ return {};
}
+ return GetValidNamespaceName(NamespaceField.AsString());
+ }
+
+ bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key)
+ {
CbFieldView BucketField = KeyView["Bucket"sv];
if (BucketField.HasError())
{
@@ -292,7 +290,7 @@ namespace {
return false;
}
IoHash Hash = HashField.AsHash();
- Key = CacheKey::Create(*Namespace, *Bucket, Hash);
+ Key = CacheKey::Create(*Bucket, Hash);
return true;
}
@@ -596,7 +594,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming);
if (GetUpstreamCacheResult UpstreamResult =
- m_UpstreamCache.GetCacheRecord({Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, AcceptType);
+ m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType);
UpstreamResult.Success)
{
Success = true;
@@ -769,7 +767,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote))
{
- m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey}});
+ m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -819,7 +817,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey},
+ .Namespace = Ref.Namespace,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
.ValueContentIds = std::move(ValidAttachments)});
}
@@ -904,7 +903,8 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .Key = {Ref.Namespace, Ref.BucketSegment, Ref.HashKey},
+ .Namespace = Ref.Namespace,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
.ValueContentIds = std::move(ValidAttachments)});
}
@@ -946,7 +946,7 @@ HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request,
if (QueryUpstream)
{
- if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.Namespace, Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
+ if (auto UpstreamResult = m_UpstreamCache.GetCacheValue(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
UpstreamResult.Success)
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
@@ -1124,8 +1124,13 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
std::vector<bool> Results;
for (CbFieldView RequestField : Params["Requests"sv])
{
@@ -1139,7 +1144,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
- PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)};
+ PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)};
PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
@@ -1203,7 +1208,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
else
{
ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Request.Key.Namespace,
+ Request.Namespace,
Request.Key.Bucket,
Request.Key.Hash,
ToString(HttpContentType::kCbPackage),
@@ -1225,7 +1230,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
}
ZEN_DEBUG("PUT - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)",
- Request.Key.Namespace,
+ Request.Namespace,
Request.Key.Bucket,
Request.Key.Hash,
NiceBytes(TransferredSize),
@@ -1237,14 +1242,16 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
CacheValue.Value = IoBuffer(Record.GetSize());
Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue);
+ m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue);
const bool IsPartialRecord = Count.Valid != Count.Total;
if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .Namespace = Request.Namespace,
+ .Key = Request.Key,
+ .ValueContentIds = std::move(ValidAttachments)});
}
return PutResult::Success;
}
@@ -1277,8 +1284,13 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
bool UsedUpstream = false;
};
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
std::vector<RecordRequestData> Requests;
std::vector<size_t> UpstreamIndexes;
CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
@@ -1322,7 +1334,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
ZenCacheValue RecordCacheValue;
if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) &&
- m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, RecordCacheValue))
+ m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, RecordCacheValue))
{
Request.RecordCacheValue = std::move(RecordCacheValue.Value);
if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject)
@@ -1436,7 +1448,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
}
- const auto OnCacheRecordGetComplete = [this, &ParseValues](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, Namespace, &ParseValues](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
@@ -1453,7 +1465,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
Request.RecordObject = ObjectBuffer;
if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
{
- m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
+ m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
}
ParseValues(Request);
Request.UsedUpstream = true;
@@ -1493,7 +1505,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
{
ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}/{}'",
Value.ContentId,
- Key.Namespace,
+ *Namespace,
Key.Bucket,
Key.Hash);
}
@@ -1510,7 +1522,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
};
- m_UpstreamCache.GetCacheRecords(UpstreamRequests, std::move(OnCacheRecordGetComplete));
+ m_UpstreamCache.GetCacheRecords(*Namespace, UpstreamRequests, std::move(OnCacheRecordGetComplete));
}
CbPackage ResponsePackage;
@@ -1533,7 +1545,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
ZEN_DEBUG("HIT - '{}/{}/{}' {}{}{}",
- Key.Namespace,
+ *Namespace,
Key.Bucket,
Key.Hash,
NiceBytes(Request.RecordCacheValue.Size()),
@@ -1549,11 +1561,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query))
{
// If they requested no query, do not record this as a miss
- ZEN_DEBUG("DISABLEDQUERY - '{}/{}/{}'", Key.Namespace, Key.Bucket, Key.Hash);
+ ZEN_DEBUG("DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
}
else
{
- ZEN_DEBUG("MISS - '{}/{}/{}' {}", Key.Namespace, Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv);
+ ZEN_DEBUG("MISS - '{}/{}/{}' {}", *Namespace, Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv);
m_CacheStats.MissCount++;
}
}
@@ -1579,8 +1591,13 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv);
- std::string_view PolicyText = Params["DefaultPolicy"].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
std::vector<bool> Results;
for (CbFieldView RequestField : Params["Requests"sv])
{
@@ -1615,21 +1632,21 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
{
IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
Value.SetContentType(ZenContentType::kCompressedBinary);
- m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = Value});
+ m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value});
TransferredSize = Chunk.GetCompressedSize();
}
Succeeded = true;
}
else
{
- ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", Key.Namespace, Key.Bucket, Key.Hash, RawHash);
+ ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
}
else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
ZenCacheValue ExistingValue;
- if (m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
+ if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) &&
IsCompressedBinary(ExistingValue.Value.GetContentType()))
{
Succeeded = true;
@@ -1640,11 +1657,11 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
{
- m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key});
}
Results.push_back(Succeeded);
ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}'",
- Key.Namespace,
+ *Namespace,
Key.Bucket,
Key.Hash,
NiceBytes(TransferredSize),
@@ -1679,9 +1696,15 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
{
ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
+ if (!Namespace)
+ {
+ return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
struct RequestData
{
CacheKey Key;
@@ -1717,7 +1740,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
ZenCacheValue CacheValue;
if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
{
- if (m_CacheStore.Get(Key.Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
+ if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
{
Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
}
@@ -1725,7 +1748,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
if (Result)
{
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
- Key.Namespace,
+ *Namespace,
Key.Bucket,
Key.Hash,
NiceBytes(Result.GetCompressed().GetSize()),
@@ -1740,12 +1763,12 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
{
// If they requested no query, do not record this as a miss
- ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", Key.Namespace, Key.Bucket, Key.Hash);
+ ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash);
}
else
{
ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- Key.Namespace,
+ *Namespace,
Key.Bucket,
Key.Hash,
"LOCAL"sv,
@@ -1763,13 +1786,14 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
for (size_t Index : RemoteRequestIndexes)
{
RequestData& Request = Requests[Index];
- RequestedRecordsData.push_back({{Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash}});
+ RequestedRecordsData.push_back({Request.Key.Bucket, Request.Key.Hash});
CacheChunkRequests.push_back(&RequestedRecordsData.back());
}
Stopwatch Timer;
m_UpstreamCache.GetCacheValues(
+ *Namespace,
CacheChunkRequests,
- [this, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) {
+ [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) {
CacheChunkRequest& ChunkRequest = Params.Request;
if (Params.Value)
{
@@ -1783,9 +1807,9 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
// that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive
// for us to keep the data only on the upstream server.
// if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
- m_CacheStore.Put(Request.Key.Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value});
+ m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value});
ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}",
- ChunkRequest.Key.Namespace,
+ *Namespace,
ChunkRequest.Key.Bucket,
ChunkRequest.Key.Hash,
NiceBytes(Request.Result.GetCompressed().GetSize()),
@@ -1797,7 +1821,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
}
}
ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}",
- ChunkRequest.Key.Namespace,
+ *Namespace,
ChunkRequest.Key.Bucket,
ChunkRequest.Key.Hash,
"UPSTREAM"sv,
@@ -1888,6 +1912,7 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
+ std::string Namespace;
std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
std::vector<CacheChunkRequest> RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream
@@ -1897,27 +1922,28 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
// Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
- if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
+ if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
{
return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
}
// For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
// have it locally, and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks);
+ GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks);
// For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks
- GetLocalCacheValues(ValueRequests, UpstreamChunks);
+ GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks);
// Call GetCacheChunks on the upstream for any payloads we do not have locally
- GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests);
+ GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests);
// Send the payload and descriptive data about each chunk to the client
- WriteGetCacheChunksResponse(Requests, HttpRequest);
+ WriteGetCacheChunksResponse(Namespace, Requests, HttpRequest);
}
bool
-HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyRequest>& RecordKeys,
+HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<CacheChunkRequest>& RequestKeys,
std::vector<cache::detail::ChunkRequest>& Requests,
@@ -1929,11 +1955,20 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
- CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
- CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
- size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
+
+ std::optional<std::string> NamespaceText = GetRpcRequestNamespace(Params);
+ if (!NamespaceText)
+ {
+ ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest.");
+ return false;
+ }
+ Namespace = *NamespaceText;
+
+ CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
+ size_t NumRequests = static_cast<size_t>(ChunkRequestsArray.Num());
// Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed,
// we will need to change the pointers to indexes to handle reallocations.
@@ -1996,11 +2031,9 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque
}
else
{
- ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{}/{} came after {}/{}/{}.",
- RequestKey.Key.Namespace,
+ ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
RequestKey.Key.Bucket,
RequestKey.Key.Hash,
- PreviousRecordKey->Key.Namespace,
PreviousRecordKey->Key.Bucket,
PreviousRecordKey->Key.Hash);
return false;
@@ -2022,7 +2055,8 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector<CacheKeyReque
}
void
-HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>& RecordKeys,
+HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace,
+ std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<cache::detail::ChunkRequest*>& RecordRequests,
std::vector<CacheChunkRequest*>& OutUpstreamChunks)
@@ -2041,7 +2075,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>&
if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal))
{
ZenCacheValue CacheValue;
- if (m_CacheStore.Get(RecordKey.Key.Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
+ if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue))
{
Record.Exists = true;
Record.CacheValue = std::move(CacheValue.Value);
@@ -2058,7 +2092,7 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>&
if (!UpstreamRecordRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) {
if (!Params.Record)
{
return;
@@ -2076,10 +2110,10 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>&
if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal))
{
- m_CacheStore.Put(Key.Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
+ m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue});
}
};
- m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
+ m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
}
std::vector<CacheChunkRequest*> UpstreamPayloadRequests;
@@ -2163,7 +2197,8 @@ HttpStructuredCacheService::GetLocalCacheRecords(std::vector<CacheKeyRequest>&
}
void
-HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::ChunkRequest*>& ValueRequests,
+HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest*>& ValueRequests,
std::vector<CacheChunkRequest*>& OutUpstreamChunks)
{
using namespace cache::detail;
@@ -2173,7 +2208,7 @@ HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::Chunk
if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal))
{
ZenCacheValue CacheValue;
- if (m_CacheStore.Get(Request->Key->Key.Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
+ if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue))
{
if (IsCompressedBinary(CacheValue.Value.GetContentType()))
{
@@ -2207,7 +2242,8 @@ HttpStructuredCacheService::GetLocalCacheValues(std::vector<cache::detail::Chunk
}
void
-HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest*>& UpstreamChunks,
+HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace,
+ std::vector<CacheChunkRequest*>& UpstreamChunks,
std::vector<CacheChunkRequest>& RequestKeys,
std::vector<cache::detail::ChunkRequest>& Requests)
{
@@ -2215,7 +2251,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest
if (!UpstreamChunks.empty())
{
- const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) {
+ const auto OnCacheValueGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) {
if (Params.RawHash == Params.RawHash.Zero)
{
return;
@@ -2242,7 +2278,7 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest
}
else
{
- m_CacheStore.Put(Key.Key.Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value});
+ m_CacheStore.Put(Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value});
}
}
if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData))
@@ -2259,12 +2295,13 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector<CacheChunkRequest
m_CacheStats.UpstreamHitCount++;
};
- m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete));
+ m_UpstreamCache.GetCacheValues(Namespace, UpstreamChunks, std::move(OnCacheValueGetComplete));
}
}
void
-HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detail::ChunkRequest>& Requests,
+HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace,
+ std::vector<cache::detail::ChunkRequest>& Requests,
zen::HttpServerRequest& HttpRequest)
{
using namespace cache::detail;
@@ -2290,7 +2327,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai
}
ZEN_DEBUG("HIT - '{}/{}/{}/{}' {} '{}' ({})",
- Request.Key->Key.Namespace,
+ Namespace,
Request.Key->Key.Bucket,
Request.Key->Key.Hash,
Request.Key->ValueId,
@@ -2301,19 +2338,11 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector<cache::detai
}
else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query))
{
- ZEN_DEBUG("SKIP - '{}/{}/{}/{}'",
- Request.Key->Key.Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId);
+ ZEN_DEBUG("SKIP - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
}
else
{
- ZEN_DEBUG("MISS - '{}/{}/{}/{}'",
- Request.Key->Key.Namespace,
- Request.Key->Key.Bucket,
- Request.Key->Key.Hash,
- Request.Key->ValueId);
+ ZEN_DEBUG("MISS - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId);
m_CacheStats.MissCount++;
}
}