aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authormattpetersepic <[email protected]>2022-02-01 08:06:36 -0700
committerGitHub <[email protected]>2022-02-01 08:06:36 -0700
commit154743f2d2ff2b7163bcf8d7b76eea3e3579aaba (patch)
treeaef417b5c9a0d5502c7afdb01c4cc598071e956d /zenserver/cache/structuredcache.cpp
parentTweaked remote_build.py TTY output (diff)
downloadzen-154743f2d2ff2b7163bcf8d7b76eea3e3579aaba.tar.xz
zen-154743f2d2ff2b7163bcf8d7b76eea3e3579aaba.zip
Cache policy support (#47)
Add HandleRpc methods for the remaining ICacheStore requests from unreal: PutCacheValues/GetCacheValues. We now have batched versions for PutCacheRecords,GetCacheRecords,PutCacheValues,GetCacheValues,GetCacheChunks. Add support for CachePolicy flags to all of these batched methods. * Add Batched PutCacheValues/GetCacheValues. Rename old GetCacheValues to GetCacheChunks. * HandleRpcGetCacheRecords: Receive a CacheRecordPolicy with each key, and skipdata on attachments we already have. * Changes to CachePolicy copied from Release-5.0 depot. Change serialization to use the key BasePolicy instead of DefaultValuePolicy. * GetChunks: Read CacheRecords from remote if necessary to find ContentId. Implement QueryLocal, StoreLocal, and SkipData.
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp1170
1 files changed, 953 insertions, 217 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index facb29e1d..49e5896d1 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -48,6 +48,13 @@ ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default;
}
+CacheRecordPolicy
+LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default)
+{
+ OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object);
+ return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy);
+}
+
struct AttachmentCount
{
uint32_t New = 0;
@@ -829,10 +836,18 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
{
HandleRpcGetCacheRecords(AsyncRequest, Object);
}
+ else if (Method == "PutCacheValues"sv)
+ {
+ HandleRpcPutCacheValues(AsyncRequest, Package);
+ }
else if (Method == "GetCacheValues"sv)
{
HandleRpcGetCacheValues(AsyncRequest, Object);
}
+ else if (Method == "GetCacheChunks"sv)
+ {
+ HandleRpcGetCacheChunks(AsyncRequest, Object);
+ }
else
{
AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
@@ -872,7 +887,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- CacheRecordPolicy Policy = CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
+ CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)};
PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
@@ -966,7 +981,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
Count.Total);
ZenCacheValue CacheValue;
- CacheValue.Value = IoBuffer(Record.GetSize());
+ CacheValue.Value = IoBuffer(Record.GetSize());
Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue);
@@ -981,14 +996,15 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
return PutResult::Success;
}
+#if BACKWARDS_COMPATABILITY_JAN2022
void
-HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
+HttpStructuredCacheService::HandleRpcGetCacheRecordsLegacy(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
{
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
CbPackage RpcResponse;
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
- CacheRecordPolicy BatchPolicy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView());
+ CacheRecordPolicy BatchPolicy = LoadCacheRecordPolicy(Params["Policy"sv].AsObjectView());
std::vector<CacheKey> CacheKeys;
std::vector<IoBuffer> CacheValues;
std::vector<size_t> UpstreamRequests;
@@ -1014,7 +1030,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
uint32_t MissingCount = 0;
uint32_t MissingReadFromUpstreamCount = 0;
- if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue) &&
+ CacheValue.Value.GetContentType() == ZenContentType::kCbObject)
{
CbObjectView CacheRecord(CacheValue.Value.Data());
CacheRecord.IterateAttachments(
@@ -1060,12 +1077,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
});
}
- if ((!CacheValue.Value && EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryRemote)) ||
- MissingReadFromUpstreamCount != 0)
- {
- UpstreamRequests.push_back(KeyIndex);
- }
- else if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
+ // Searching upstream is not implemented in this legacy support function
+ if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
{
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}",
Key.Bucket,
@@ -1094,116 +1107,445 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
++KeyIndex;
}
- if (!UpstreamRequests.empty())
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginArray("Result"sv);
+ for (const IoBuffer& Value : CacheValues)
{
- const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, &BatchPolicy](CacheRecordGetCompleteParams&& Params) {
- ZEN_ASSERT(Params.KeyIndex < CacheValues.size());
+ if (Value)
+ {
+ CbObjectView Record(Value.Data());
+ ResponseObject << Record;
+ }
+ else
+ {
+ ResponseObject.AddNull();
+ }
+ }
+ ResponseObject.EndArray();
- IoBuffer CacheValue;
- AttachmentCount Count;
+ RpcResponse.SetObject(ResponseObject.Save());
- if (Params.Record)
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+#endif
+
+void
+HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
+{
+#if BACKWARDS_COMPATABILITY_JAN2022
+ // Backwards compatability;
+ if (RpcRequest["Params"sv].AsObjectView()["CacheKeys"sv])
+ {
+ return HandleRpcGetCacheRecordsLegacy(HttpRequest, RpcRequest);
+ }
+#endif
+ ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
+
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
+
+ struct ValueRequestData
+ {
+ Oid ValueId;
+ IoHash ContentId;
+ CompressedBuffer Payload;
+ CachePolicy DownstreamPolicy;
+ bool Exists = false;
+ bool ReadFromUpstream = false;
+ };
+ struct RecordRequestData
+ {
+ CacheKeyRequest Upstream;
+ CbObjectView RecordObject;
+ IoBuffer RecordCacheValue;
+ CacheRecordPolicy DownstreamPolicy;
+ std::vector<ValueRequestData> Values;
+ bool Complete = false;
+ bool UsedUpstream = false;
+ };
+
+ std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::vector<RecordRequestData> Requests;
+ std::vector<size_t> UpstreamIndexes;
+ CbArrayView RequestsArray = Params["Requests"sv].AsArrayView();
+ Requests.reserve(RequestsArray.Num());
+
+ auto ParseValues = [](RecordRequestData& Request) {
+ CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView();
+ Request.Values.reserve(ValuesArray.Num());
+ for (CbFieldView ValueField : ValuesArray)
+ {
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ValueId = ValueObject["Id"sv].AsObjectId();
+ CbFieldView RawHashField = ValueObject["RawHash"sv];
+ IoHash RawHash = RawHashField.AsBinaryAttachment();
+ if (ValueId && !RawHashField.HasError())
+ {
+ Request.Values.push_back({ValueId, RawHash});
+ Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId);
+ }
+ }
+ };
+
+ for (CbFieldView RequestField : RequestsArray)
+ {
+ RecordRequestData& Request = Requests.emplace_back();
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ CbFieldView BucketField = KeyObject["Bucket"sv];
+ CbFieldView HashField = KeyObject["Hash"sv];
+ CacheKey& Key = Request.Upstream.Key;
+ Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash());
+ if (HashField.HasError() || Key.Bucket.empty())
+ {
+ return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
+ const CacheRecordPolicy& Policy = Request.DownstreamPolicy;
+
+ ZenCacheValue CacheValue;
+ bool NeedUpstreamAttachment = false;
+ bool FoundLocalInvalid = false;
+ ZenCacheValue RecordCacheValue;
+
+ if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, RecordCacheValue))
+ {
+ Request.RecordCacheValue = std::move(RecordCacheValue.Value);
+ if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject)
{
- Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count, &BatchPolicy](CbFieldView HashView) {
- CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy();
- bool FoundInUpstream = false;
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ FoundLocalInvalid = true;
+ }
+ else
+ {
+ Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData());
+ ParseValues(Request);
+
+ Request.Complete = true;
+ for (ValueRequestData& Value : Request.Values)
+ {
+ CachePolicy ValuePolicy = Value.DownstreamPolicy;
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
{
- if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash()))
+ // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we
+ // didn't ask for it and thus the record is complete in its absence.
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
{
- FoundInUpstream = true;
- if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
- {
- FoundInUpstream = true;
- if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
- {
- auto InsertResult = m_CidStore.AddChunk(Compressed);
- if (InsertResult.New)
- {
- Count.New++;
- }
- }
- Count.Valid++;
-
- if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
- {
- RpcResponse.AddAttachment(CbAttachment(Compressed));
- }
- }
- else
+ Value.Exists = true;
+ }
+ else
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ Request.Complete = false;
+ }
+ }
+ else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ if (m_CidStore.ContainsChunk(Value.ContentId))
+ {
+ Value.Exists = true;
+ }
+ else
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
{
- ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'",
- HashView.AsHash(),
- Params.Key.Bucket,
- Params.Key.Hash);
- Count.Invalid++;
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
}
+ Request.Complete = false;
}
}
- if (!FoundInUpstream && EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal) &&
- m_CidStore.ContainsChunk(HashView.AsHash()))
+ else
{
- // We added the attachment for this Value in the local loop before calling m_UpstreamCache
- Count.Valid++;
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId))
+ {
+ ZEN_ASSERT(Chunk.GetSize() > 0);
+ Value.Payload = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ Value.Exists = true;
+ }
+ else
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ NeedUpstreamAttachment = true;
+ Value.ReadFromUpstream = true;
+ }
+ Request.Complete = false;
+ }
}
- Count.Total++;
- });
+ }
+ }
+ }
+ if (!Request.Complete)
+ {
+ bool NeedUpstreamRecord =
+ !Request.RecordObject && !FoundLocalInvalid && EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
+ if (NeedUpstreamRecord || NeedUpstreamAttachment)
+ {
+ UpstreamIndexes.push_back(Requests.size() - 1);
+ }
+ }
+ }
+ if (Requests.empty())
+ {
+ return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ if (!UpstreamIndexes.empty())
+ {
+ std::vector<CacheKeyRequest*> UpstreamRequests;
+ UpstreamRequests.reserve(UpstreamIndexes.size());
+ for (size_t Index : UpstreamIndexes)
+ {
+ RecordRequestData& Request = Requests[Index];
+ UpstreamRequests.push_back(&Request.Upstream);
- if ((Count.Valid == Count.Total) || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))
+ if (Request.Values.size())
+ {
+ // We will be returning the local object and know all the value Ids that exist in it
+ // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have.
+ CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta;
+ CacheRecordPolicyBuilder Builder(UpstreamBasePolicy);
+ for (ValueRequestData& Value : Request.Values)
{
- CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer();
+ CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy);
+ UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None;
+ Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy);
}
+ Request.Upstream.Policy = Builder.Build();
+ }
+ else
+ {
+ // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants,
+ // and convert the CacheRecordPolicy to an upstream policy
+ Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream();
}
+ }
- if (CacheValue)
+ const auto OnCacheRecordGetComplete = [this, &ParseValues](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
{
- ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)",
- Params.Key.Bucket,
- Params.Key.Hash,
- NiceBytes(CacheValue.GetSize()),
- ToString(HttpContentType::kCbPackage),
- Count.New,
- Count.Valid,
- Count.Total);
-
- CacheValue.SetContentType(ZenContentType::kCbObject);
- CacheValues[Params.KeyIndex] = CacheValue;
- if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
+ return;
+ }
+
+ RecordRequestData& Request =
+ *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream));
+ const CacheKey& Key = Request.Upstream.Key;
+ if (!Request.RecordObject)
+ {
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject);
+ Request.RecordObject = ObjectBuffer;
+ if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
{
- m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue});
+ m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}});
}
-
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
+ ParseValues(Request);
+ Request.UsedUpstream = true;
}
- else
+
+ Request.Complete = true;
+ for (ValueRequestData& Value : Request.Values)
{
- const bool IsPartial = Count.Valid != Count.Total;
- ZEN_DEBUG("MISS - '{}/{}' {}", Params.Key.Bucket, Params.Key.Hash, IsPartial ? "(partial)"sv : ""sv);
- m_CacheStats.MissCount++;
+ if (Value.Exists)
+ {
+ continue;
+ }
+ CachePolicy ValuePolicy = Value.DownstreamPolicy;
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ Request.Complete = false;
+ continue;
+ }
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
+ {
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId))
+ {
+ if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ {
+ Request.UsedUpstream = true;
+ Value.Exists = true;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
+ {
+ m_CidStore.AddChunk(Compressed);
+ }
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ Value.Payload = Compressed;
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", Value.ContentId, Key.Bucket, Key.Hash);
+ }
+ }
+ if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ Request.Complete = false;
+ }
+ // Request.Complete does not need to be set to false for upstream SkipData attachments.
+ // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment
+ // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of
+ // any missing SkipData attachments.
+ }
}
};
- m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, BatchPolicy, std::move(OnCacheRecordGetComplete));
+ m_UpstreamCache.GetCacheRecords(UpstreamRequests, std::move(OnCacheRecordGetComplete));
}
+ CbPackage ResponsePackage;
CbObjectWriter ResponseObject;
ResponseObject.BeginArray("Result"sv);
- for (const IoBuffer& Value : CacheValues)
+ for (RecordRequestData& Request : Requests)
{
- if (Value)
+ const CacheKey& Key = Request.Upstream.Key;
+ if (Request.Complete ||
+ (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
{
- CbObjectView Record(Value.Data());
- ResponseObject << Record;
+ ResponseObject << Request.RecordObject;
+ for (ValueRequestData& Value : Request.Values)
+ {
+ if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload)
+ {
+ ResponsePackage.AddAttachment(CbAttachment(Value.Payload));
+ }
+ }
+
+ ZEN_DEBUG("HIT - '{}/{}' {}{}{}",
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(Request.RecordCacheValue.Size()),
+ Request.Complete ? ""sv : " (PARTIAL)"sv,
+ Request.UsedUpstream ? " (UPSTREAM)"sv : ""sv);
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount += Request.UsedUpstream ? 1 : 0;
}
else
{
ResponseObject.AddNull();
+
+ if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query))
+ {
+ // If they requested no query, do not record this as a miss
+ ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash);
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv);
+ m_CacheStats.MissCount++;
+ }
}
}
ResponseObject.EndArray();
+ ResponsePackage.SetObject(ResponseObject.Save());
+
+ BinaryWriter MemStream;
+ ResponsePackage.Save(MemStream);
+
+ HttpRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+void
+HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcPutCacheValues");
+ CbObjectView BatchObject = BatchRequest.GetObject();
+
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+
+ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv);
+
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::vector<bool> Results;
+ for (CbFieldView RequestField : Params["Requests"sv])
+ {
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyView = RequestObject["Key"sv].AsObjectView();
+ CbFieldView BucketField = KeyView["Bucket"sv];
+ CbFieldView HashField = KeyView["Hash"sv];
+ CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash());
+ if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ PolicyText = RequestObject["Policy"sv].AsString();
+ CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+ IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment();
+ bool Succeeded = false;
+ uint64_t TransferredSize = 0;
+
+ if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash))
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
+ {
+ // TODO: Implement upstream puts of CacheValues with StoreLocal == false.
+ // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream.
+ Policy |= CachePolicy::StoreLocal;
+ }
+
+ if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
+ {
+ IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ Value.SetContentType(ZenContentType::kCompressedBinary);
+ m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Value});
+ TransferredSize = Chunk.GetCompressedSize();
+ }
+ Succeeded = true;
+ }
+ else
+ {
+ ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}' FAILED, value is not compressed", Key.Bucket, Key.Hash, RawHash);
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ }
+ else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ {
+ ZenCacheValue ExistingValue;
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType()))
+ {
+ Succeeded = true;
+ }
+ }
+ // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put.
+ // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream.
+
+ if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote))
+ {
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = Key});
+ }
+ Results.push_back(Succeeded);
+ ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid");
+ }
+ if (Results.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (bool Value : Results)
+ {
+ ResponseObject.AddBool(Value);
+ }
+ ResponseObject.EndArray();
+
+ CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
BinaryWriter MemStream;
@@ -1215,216 +1557,610 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
}
void
-HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
+HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
{
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
-
- ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
+#if BACKWARDS_COMPATABILITY_JAN2022
+ if (RpcRequest["Params"sv].AsObjectView()["ChunkRequests"])
+ {
+ return HandleRpcGetCacheChunks(HttpRequest, RpcRequest);
+ }
+#endif
- std::vector<CacheChunkRequest> ChunkRequests;
- std::vector<size_t> UpstreamRequests;
- std::vector<IoBuffer> Chunks;
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
- for (CbFieldView RequestView : Params["ChunkRequests"sv])
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ struct RequestData
{
- CbObjectView RequestObject = RequestView.AsObjectView();
- CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
- const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
- const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash();
- const Oid ValueId = RequestObject["ValueId"sv].AsObjectId();
- const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64();
- const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
- std::string_view PolicyText = RequestObject["Policy"sv].AsString();
- const CachePolicy ChunkPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ CacheKey Key;
+ CachePolicy Policy;
+ CompressedBuffer Result;
+ };
+ std::vector<RequestData> Requests;
- // Note we could use emplace_back here but [Apple] LLVM-12's C++ library
- // can't infer a constructor like other platforms (or can't handle an
- // initializer list like others do).
- ChunkRequests.push_back({Key, ChunkId, ValueId, RawOffset, RawSize, ChunkPolicy});
- }
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
- if (ChunkRequests.empty())
+ for (CbFieldView RequestField : Params["Requests"sv])
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
+ RequestData& Request = Requests.emplace_back();
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ CbFieldView BucketField = KeyObject["Bucket"sv];
+ CbFieldView HashField = KeyObject["Hash"sv];
+ Request.Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash());
+ if (BucketField.HasError() || HashField.HasError() || Request.Key.Bucket.empty())
+ {
+ return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ PolicyText = RequestObject["Policy"sv].AsString();
+ Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
- Chunks.resize(ChunkRequests.size());
+ CacheKey& Key = Request.Key;
+ CachePolicy Policy = Request.Policy;
+ CompressedBuffer& Result = Request.Result;
- // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
- // is missing, load the cache record and try to find the raw hash from the ValueId.
- {
- const auto GetChunkIdFromValueId = [](CbObjectView Record, const Oid& ValueId) -> IoHash {
- if (ValueId)
+ ZenCacheValue CacheValue;
+ std::string_view Source;
+ if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
+ {
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType()))
{
- // A valid ValueId indicates that the caller is searching for a Value in a Record
- // that was Put with ICacheStore::Put
- for (CbFieldView ValueView : Record["Values"sv])
+ Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value));
+ if (Result)
{
- CbObjectView ValueObject = ValueView.AsObjectView();
- const Oid Id = ValueObject["Id"sv].AsObjectId();
-
- if (Id == ValueId)
- {
- return ValueObject["RawHash"sv].AsHash();
- }
+ Source = "LOCAL"sv;
}
-
- // Legacy fields from previous version of CacheRecord serialization:
- if (CbObjectView ValueObject = Record["Value"sv].AsObjectView())
+ }
+ }
+ if (!Result && EnumHasAllFlags(Policy, CachePolicy::QueryRemote))
+ {
+ GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary);
+ if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType()))
+ {
+ Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value));
+ if (Result)
{
- const Oid Id = ValueObject["Id"sv].AsObjectId();
- if (Id == ValueId)
+ UpstreamResult.Value.SetContentType(ZenContentType::kCompressedBinary);
+ Source = "UPSTREAM"sv;
+ // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement
+ // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive
+ // for us to keep the data only on the upstream server.
+ // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal))
{
- return ValueObject["RawHash"sv].AsHash();
+ m_CacheStore.Put(Key.Bucket, Key.Hash, ZenCacheValue{UpstreamResult.Value});
}
}
+ }
+ }
- for (CbFieldView AttachmentView : Record["Attachments"sv])
- {
- CbObjectView AttachmentObject = AttachmentView.AsObjectView();
- const Oid Id = AttachmentObject["Id"sv].AsObjectId();
+ if (Result)
+ {
+ ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({})", Key.Bucket, Key.Hash, NiceBytes(Result.GetCompressed().GetSize()), Source);
+ m_CacheStats.HitCount++;
+ }
+ else if (!EnumHasAnyFlags(Policy, CachePolicy::Query))
+ {
+ // If they requested no query, do not record this as a miss
+ ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash);
+ }
+ else
+ {
+ ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}'", Key.Bucket, Key.Hash);
+ m_CacheStats.MissCount++;
+ }
+ }
+ if (Requests.empty())
+ {
+ return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
- if (Id == ValueId)
- {
- return AttachmentObject["RawHash"sv].AsHash();
- }
- }
- return IoHash::Zero;
- }
- else
+ CbPackage RpcResponse;
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (const RequestData& Request : Requests)
+ {
+ ResponseObject.BeginObject();
+ {
+ const CompressedBuffer& Result = Request.Result;
+ if (Result)
{
- // An invalid ValueId indicates that the caller is requesting a Value that
- // was Put with ICacheStore::PutValue
- return Record["RawHash"sv].AsHash();
+ ResponseObject.AddHash("RawHash"sv, IoHash::FromBLAKE3(Result.GetRawHash()));
+ if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData))
+ {
+ RpcResponse.AddAttachment(CbAttachment(Result));
+ }
+ else
+ {
+ ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize());
+ }
}
- };
+ }
+ ResponseObject.EndObject();
+ }
+ ResponseObject.EndArray();
- CacheKey CurrentKey = CacheKey::Empty;
- IoBuffer CurrentRecordBuffer;
+ RpcResponse.SetObject(ResponseObject.Save());
+
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ HttpRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+namespace GetCacheChunks::detail {
+
+ struct ValueData
+ {
+ Oid ValueId;
+ IoHash ContentId;
+ uint64_t RawSize;
+ };
+ struct KeyRequestData
+ {
+ CacheKeyRequest Upstream;
+ IoBuffer CacheValue;
+ std::vector<ValueData> Values;
+ CachePolicy DownstreamRecordPolicy;
+ CachePolicy DownstreamPolicy;
+ std::string_view Source;
+ bool Exists = false;
+ bool HasRequest = false;
+ bool HasRecordRequest = false;
+ bool HasValueRequest = false;
+ bool ValuesRead = false;
+ };
+ struct ChunkRequestData
+ {
+ CacheChunkRequest Upstream;
+ KeyRequestData* KeyRequest;
+ size_t KeyRequestIndex;
+ CachePolicy DownstreamPolicy;
+ CompressedBuffer Value;
+ std::string_view Source;
+ uint64_t TotalSize = 0;
+ bool Exists = false;
+ bool IsRecordRequest = false;
+ bool TotalSizeKnown = false;
+ };
+
+} // namespace GetCacheChunks::detail
+
+void
+HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
+{
+ using namespace GetCacheChunks::detail;
+
+ ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
+
+ std::vector<KeyRequestData> KeyRequests;
+ std::vector<ChunkRequestData> Chunks;
+ BACKWARDS_COMPATABILITY_JAN2022_CODE(bool SendValueOnly = false;)
+ if (!TryGetCacheChunks_Parse(KeyRequests, Chunks BACKWARDS_COMPATABILITY_JAN2022_CODE(, SendValueOnly), RpcRequest))
+ {
+ return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ GetCacheChunks_LoadKeys(KeyRequests);
+ GetCacheChunks_LoadChunks(Chunks);
+ GetCacheChunks_SendResults(Chunks, HttpRequest BACKWARDS_COMPATABILITY_JAN2022_CODE(, SendValueOnly));
+}
+
+bool
+HttpStructuredCacheService::TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests,
+ std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks,
+ BACKWARDS_COMPATABILITY_JAN2022_CODE(bool& SendValueOnly, ) CbObjectView RpcRequest)
+{
+ using namespace GetCacheChunks::detail;
+
+#if BACKWARDS_COMPATABILITY_JAN2022
+ SendValueOnly = RpcRequest["MethodVersion"sv].AsInt32() < 1;
+#else
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
+#endif
+
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
+ CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
+
+ KeyRequestData* PreviousKeyRequest = nullptr;
+ CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView();
+ Chunks.reserve(ChunkRequestsArray.Num());
+ for (CbFieldView RequestView : ChunkRequestsArray)
+ {
+ ChunkRequestData& Chunk = Chunks.emplace_back();
+ CbObjectView RequestObject = RequestView.AsObjectView();
+
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ CbFieldView HashField = KeyObject["Hash"sv];
+ Chunk.Upstream.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash());
+ if (Chunk.Upstream.Key.Bucket.empty() || HashField.HasError())
+ {
+ ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest.");
+ return false;
+ }
- for (CacheChunkRequest& ChunkRequest : ChunkRequests)
+ KeyRequestData* KeyRequest = nullptr;
+ if (!PreviousKeyRequest || PreviousKeyRequest->Upstream.Key < Chunk.Upstream.Key)
{
- if (ChunkRequest.ChunkId != IoHash::Zero)
+ KeyRequest = &KeyRequests.emplace_back();
+ KeyRequest->Upstream.Key = Chunk.Upstream.Key;
+ PreviousKeyRequest = KeyRequest;
+ }
+ else if (!(Chunk.Upstream.Key < PreviousKeyRequest->Upstream.Key))
+ {
+ KeyRequest = PreviousKeyRequest;
+ }
+ else
+ {
+ ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.",
+ Chunk.Upstream.Key.Bucket,
+ Chunk.Upstream.Key.Hash,
+ PreviousKeyRequest->Upstream.Key.Bucket,
+ PreviousKeyRequest->Upstream.Key.Hash);
+ return false;
+ }
+ Chunk.KeyRequestIndex = std::distance(KeyRequests.data(), KeyRequest);
+
+ Chunk.Upstream.ChunkId = RequestObject["ChunkId"sv].AsHash();
+ Chunk.Upstream.ValueId = RequestObject["ValueId"sv].AsObjectId();
+ Chunk.Upstream.RawOffset = RequestObject["RawOffset"sv].AsUInt64();
+ Chunk.Upstream.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX);
+ std::string_view PolicyText = RequestObject["Policy"sv].AsString();
+ Chunk.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy;
+#if BACKWARDS_COMPATABILITY_JAN2022
+ if (SendValueOnly)
+ {
+ Chunk.DownstreamPolicy = Chunk.DownstreamPolicy & (~CachePolicy::SkipData);
+ }
+#endif
+ Chunk.IsRecordRequest = (bool)Chunk.Upstream.ValueId;
+
+ if (!Chunk.IsRecordRequest || Chunk.Upstream.ChunkId == IoHash::Zero)
+ {
+ KeyRequest->DownstreamPolicy =
+ KeyRequest->HasRequest ? Union(KeyRequest->DownstreamPolicy, Chunk.DownstreamPolicy) : Chunk.DownstreamPolicy;
+ KeyRequest->HasRequest = true;
+ (Chunk.IsRecordRequest ? KeyRequest->HasRecordRequest : KeyRequest->HasValueRequest) = true;
+ }
+ }
+ if (Chunks.empty())
+ {
+ return false;
+ }
+ for (ChunkRequestData& Chunk : Chunks)
+ {
+ Chunk.KeyRequest = &KeyRequests[Chunk.KeyRequestIndex];
+ }
+ return true;
+}
+
+void
+HttpStructuredCacheService::GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests)
+{
+ using namespace GetCacheChunks::detail;
+
+ std::vector<CacheKeyRequest*> UpstreamRecordRequests;
+ std::vector<KeyRequestData*> UpstreamValueRequests;
+ for (KeyRequestData& KeyRequest : KeyRequests)
+ {
+ if (KeyRequest.HasRequest)
+ {
+ if (KeyRequest.HasRecordRequest)
{
- continue;
+ KeyRequest.DownstreamRecordPolicy = KeyRequest.DownstreamPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta;
}
- if (ChunkRequest.Key != CurrentKey)
+ if (!KeyRequest.Exists && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryLocal))
{
- CurrentKey = ChunkRequest.Key;
-
+ // There's currently no interface for checking only whether a CacheValue exists without loading it,
+ // so we load it here even if SkipData is true and its a CacheValue request.
ZenCacheValue CacheValue;
- if (m_CacheStore.Get(CurrentKey.Bucket, CurrentKey.Hash, CacheValue))
+ if (m_CacheStore.Get(KeyRequest.Upstream.Key.Bucket, KeyRequest.Upstream.Key.Hash, CacheValue))
{
- CurrentRecordBuffer = CacheValue.Value;
+ KeyRequest.Exists = true;
+ KeyRequest.CacheValue = std::move(CacheValue.Value);
+ KeyRequest.Source = "LOCAL"sv;
}
}
-
- if (CurrentRecordBuffer)
+ if (!KeyRequest.Exists)
{
- ChunkRequest.ChunkId = GetChunkIdFromValueId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.ValueId);
+ // At most one of RecordRequest or ValueRequest will succeed for the upstream request of the key a given key, but we don't
+ // know which,
+ // and if the requests (from arbitrary Unreal Class code) includes both types of request for a key, we want to ask for both
+ // kinds and pass the request that uses the one that succeeds.
+ if (KeyRequest.HasRecordRequest && EnumHasAllFlags(KeyRequest.DownstreamRecordPolicy, CachePolicy::QueryRemote))
+ {
+ KeyRequest.Upstream.Policy = CacheRecordPolicy(ConvertToUpstream(KeyRequest.DownstreamRecordPolicy));
+ UpstreamRecordRequests.push_back(&KeyRequest.Upstream);
+ }
+ if (KeyRequest.HasValueRequest && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ UpstreamValueRequests.push_back(&KeyRequest);
+ }
}
}
}
- for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests)
+ if (!UpstreamRecordRequests.empty())
{
- const bool QueryLocal = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryLocal);
- const bool QueryRemote = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryRemote);
+ const auto OnCacheRecordGetComplete = [this](CacheRecordGetCompleteParams&& Params) {
+ if (!Params.Record)
+ {
+ return;
+ }
- if (QueryLocal)
+ KeyRequestData& KeyRequest =
+ *reinterpret_cast<KeyRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(KeyRequestData, Upstream));
+ const CacheKey& Key = KeyRequest.Upstream.Key;
+ KeyRequest.Exists = true;
+ CbObject ObjectBuffer = CbObject::Clone(Params.Record);
+ KeyRequest.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer();
+ KeyRequest.CacheValue.SetContentType(ZenContentType::kCbObject);
+ KeyRequest.Source = "UPSTREAM"sv;
+
+ if (EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::StoreLocal))
+ {
+ m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue});
+ }
+ };
+ m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete));
+ }
+
+ if (!UpstreamValueRequests.empty())
+ {
+ for (KeyRequestData* KeyRequestPtr : UpstreamValueRequests)
{
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId))
+ KeyRequestData& KeyRequest = *KeyRequestPtr;
+ CacheKey& Key = KeyRequest.Upstream.Key;
+ GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary);
+ if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType()))
{
- ZEN_ASSERT(Chunk.GetSize() > 0);
-
- ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
- ChunkRequest.Key.Bucket,
- ChunkRequest.Key.Hash,
- ChunkRequest.ChunkId,
- NiceBytes(Chunk.Size()),
- ToString(Chunk.GetContentType()),
- "LOCAL");
-
- Chunks[RequestIndex] = Chunk;
- m_CacheStats.HitCount++;
+ CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value));
+ if (Result)
+ {
+ KeyRequest.CacheValue = std::move(UpstreamResult.Value);
+ KeyRequest.CacheValue.SetContentType(ZenContentType::kCompressedBinary);
+ KeyRequest.Exists = true;
+ KeyRequest.Source = "UPSTREAM"sv;
+ // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement
+ // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive
+ // for us to keep the data only on the upstream server.
+ // if (EnumHasAllFlags(KeyRequest->DownstreamValuePolicy, CachePolicy::StoreLocal))
+ {
+ m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue});
+ }
+ }
}
- else if (QueryRemote)
+ }
+ }
+}
+
+void
+HttpStructuredCacheService::GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks)
+{
+ using namespace GetCacheChunks::detail;
+
+ std::vector<CacheChunkRequest*> UpstreamPayloadRequests;
+ for (ChunkRequestData& Chunk : Chunks)
+ {
+ if (Chunk.IsRecordRequest)
+ {
+ if (Chunk.Upstream.ChunkId == IoHash::Zero)
{
- UpstreamRequests.push_back(RequestIndex);
+ // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
+ // is missing, parse the cache record and try to find the raw hash from the ValueId.
+ KeyRequestData& KeyRequest = *Chunk.KeyRequest;
+ if (!KeyRequest.ValuesRead)
+ {
+ KeyRequest.ValuesRead = true;
+ if (KeyRequest.CacheValue && KeyRequest.CacheValue.GetContentType() == ZenContentType::kCbObject)
+ {
+ CbObjectView RecordObject = CbObjectView(KeyRequest.CacheValue.GetData());
+ CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView();
+ KeyRequest.Values.reserve(ValuesArray.Num());
+ for (CbFieldView ValueField : ValuesArray)
+ {
+ CbObjectView ValueObject = ValueField.AsObjectView();
+ Oid ValueId = ValueObject["Id"sv].AsObjectId();
+ CbFieldView RawHashField = ValueObject["RawHash"sv];
+ IoHash RawHash = RawHashField.AsBinaryAttachment();
+ if (ValueId && !RawHashField.HasError())
+ {
+ KeyRequest.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()});
+ }
+ }
+ }
+ }
+
+ for (const ValueData& Value : KeyRequest.Values)
+ {
+ if (Value.ValueId == Chunk.Upstream.ValueId)
+ {
+ Chunk.Upstream.ChunkId = Value.ContentId;
+ Chunk.TotalSize = Value.RawSize;
+ Chunk.TotalSizeKnown = true;
+ break;
+ }
+ }
}
- else
+
+ // Now load the ContentId from the local ContentIdStore or from the upstream
+ if (Chunk.Upstream.ChunkId != IoHash::Zero)
{
- ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId);
- m_CacheStats.MissCount++;
+ if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryLocal))
+ {
+ if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData) && Chunk.TotalSizeKnown)
+ {
+ if (m_CidStore.ContainsChunk(Chunk.Upstream.ChunkId))
+ {
+ Chunk.Exists = true;
+ Chunk.Source = "LOCAL"sv;
+ }
+ }
+ else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Upstream.ChunkId))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload));
+ if (Compressed)
+ {
+ if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Chunk.Value = Compressed;
+ }
+ Chunk.Exists = true;
+ Chunk.TotalSize = Compressed.GetRawSize();
+ Chunk.TotalSizeKnown = true;
+ Chunk.Source = "LOCAL"sv;
+ }
+ }
+ }
+ if (!Chunk.Exists && EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryRemote))
+ {
+ Chunk.Upstream.Policy = ConvertToUpstream(Chunk.DownstreamPolicy);
+ UpstreamPayloadRequests.push_back(&Chunk.Upstream);
+ }
}
}
else
{
- ZEN_DEBUG("SKIP - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId);
+ if (Chunk.KeyRequest->Exists)
+ {
+ if (Chunk.KeyRequest->CacheValue && IsCompressedBinary(Chunk.KeyRequest->CacheValue.GetContentType()))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk.KeyRequest->CacheValue));
+ if (Compressed)
+ {
+ if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Chunk.Value = Compressed;
+ }
+ Chunk.Exists = true;
+ Chunk.TotalSize = Compressed.GetRawSize();
+ Chunk.TotalSizeKnown = true;
+ Chunk.Source = Chunk.KeyRequest->Source;
+ Chunk.Upstream.ChunkId = IoHash::FromBLAKE3(Compressed.GetRawHash());
+ }
+ }
+ }
}
-
- ++RequestIndex;
}
- if (!UpstreamRequests.empty())
+ if (!UpstreamPayloadRequests.empty())
{
- const auto OnCacheValueGetComplete = [this, &Chunks](CacheValueGetCompleteParams&& Params) {
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)))
+ const auto OnCacheValueGetComplete = [this](CacheValueGetCompleteParams&& Params) {
+ if (Params.RawHash == Params.RawHash.Zero)
{
- m_CidStore.AddChunk(Compressed);
-
- ZEN_DEBUG("HIT - '{}/{}/{}' {} ({})",
- Params.Request.Key.Bucket,
- Params.Request.Key.Hash,
- Params.Request.ChunkId,
- NiceBytes(Params.Value.GetSize()),
- "UPSTREAM");
-
- ZEN_ASSERT(Params.RequestIndex < Chunks.size());
- Chunks[Params.RequestIndex] = std::move(Params.Value);
-
- m_CacheStats.HitCount++;
- m_CacheStats.UpstreamHitCount++;
+ return;
}
- else
+
+ ChunkRequestData& Chunk =
+ *reinterpret_cast<ChunkRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(ChunkRequestData, Upstream));
+ if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal) ||
+ !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
{
- ZEN_DEBUG("MISS - '{}/{}/{}'", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId);
- m_CacheStats.MissCount++;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value));
+ if (!Compressed || Compressed.GetRawSize() != Params.RawSize)
+ {
+ return;
+ }
+
+ if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal))
+ {
+ m_CidStore.AddChunk(Compressed);
+ }
+ if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ {
+ Chunk.Value = std::move(Compressed);
+ }
}
+ Chunk.Exists = true;
+ Chunk.TotalSize = Params.RawSize;
+ Chunk.TotalSizeKnown = true;
+ Chunk.Source = "UPSTREAM"sv;
+
+ m_CacheStats.UpstreamHitCount++;
};
- m_UpstreamCache.GetCacheValues(ChunkRequests, UpstreamRequests, std::move(OnCacheValueGetComplete));
+ m_UpstreamCache.GetCacheValues(UpstreamPayloadRequests, std::move(OnCacheValueGetComplete));
}
+}
- CbPackage RpcResponse;
- CbObjectWriter ResponseObject;
+void
+HttpStructuredCacheService::GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks,
+ zen::HttpServerRequest& HttpRequest
+ BACKWARDS_COMPATABILITY_JAN2022_CODE(, bool SendValueOnly))
+{
+ using namespace GetCacheChunks::detail;
- ResponseObject.BeginArray("Result"sv);
+ CbPackage RpcResponse;
+ CbObjectWriter Writer;
- for (size_t ChunkIndex = 0; ChunkIndex < Chunks.size(); ++ChunkIndex)
+ Writer.BeginArray("Result"sv);
+ for (ChunkRequestData& Chunk : Chunks)
{
- if (Chunks[ChunkIndex])
+#if BACKWARDS_COMPATABILITY_JAN2022
+ if (SendValueOnly)
{
- ResponseObject << ChunkRequests[ChunkIndex].ChunkId;
- RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex])))));
+ if (Chunk.Value)
+ {
+ Writer << Chunk.Upstream.ChunkId;
+ RpcResponse.AddAttachment(CbAttachment(Chunk.Value));
+ }
+ else
+ {
+ Writer << IoHash::Zero;
+ }
}
else
+#endif
{
- ResponseObject << IoHash::Zero;
+ Writer.BeginObject();
+ {
+ if (Chunk.Exists)
+ {
+ Writer.AddHash("RawHash"sv, Chunk.Upstream.ChunkId);
+ if (Chunk.Value && !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData))
+ {
+ RpcResponse.AddAttachment(CbAttachment(Chunk.Value));
+ }
+ else
+ {
+ Writer.AddInteger("RawSize"sv, Chunk.TotalSize);
+ }
+
+ ZEN_DEBUG("CHUNKHIT - '{}/{}/{}' {} '{}' ({})",
+ Chunk.Upstream.Key.Bucket,
+ Chunk.Upstream.Key.Hash,
+ Chunk.Upstream.ValueId,
+ NiceBytes(Chunk.TotalSize),
+ Chunk.IsRecordRequest ? "Record"sv : "Value"sv,
+ Chunk.Source);
+ m_CacheStats.HitCount++;
+ }
+ else if (!EnumHasAnyFlags(Chunk.DownstreamPolicy, CachePolicy::Query))
+ {
+ ZEN_DEBUG("CHUNKSKIP - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId);
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId);
+ m_CacheStats.MissCount++;
+ }
+ }
+ Writer.EndObject();
}
}
- ResponseObject.EndArray();
+ Writer.EndArray();
- RpcResponse.SetObject(ResponseObject.Save());
+ RpcResponse.SetObject(Writer.Save());
BinaryWriter MemStream;
RpcResponse.Save(MemStream);
- Request.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ HttpRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
void