aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2022-01-28 13:30:07 +0100
committerStefan Boberg <[email protected]>2022-01-28 13:30:07 +0100
commit31a2c8a818a904969f17d24dbec7c50dcd688638 (patch)
tree3c1e58e1cc24c86134acfd40c8bf12f22b4f16d6 /zenserver/cache/structuredcache.cpp
parentStructured cache PUTs now preserve content type for binary and compressed binary (diff)
parentCompile fix (diff)
downloadzen-31a2c8a818a904969f17d24dbec7c50dcd688638.tar.xz
zen-31a2c8a818a904969f17d24dbec7c50dcd688638.zip
Merge branch 'main' of https://github.com/EpicGames/zen
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp560
1 files changed, 406 insertions, 154 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 72eabb053..facb29e1d 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -15,6 +15,7 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zenhttp/httpserver.h>
+#include <zenhttp/httpshared.h>
#include <zenstore/cas.h>
#include <zenutil/cache/cache.h>
@@ -55,6 +56,13 @@ struct AttachmentCount
uint32_t Total = 0;
};
+struct PutRequestData
+{
+ CacheKey Key;
+ CbObjectView RecordObject;
+ CacheRecordPolicy Policy;
+};
+
//////////////////////////////////////////////////////////////////////////
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
@@ -134,13 +142,13 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
CachePolicy PolicyFromURL = ParseCachePolicy(Request.GetQueryParams());
- if (Ref.PayloadId == IoHash::Zero)
+ if (Ref.ValueContentId == IoHash::Zero)
{
return HandleCacheRecordRequest(Request, Ref, PolicyFromURL);
}
else
{
- return HandleCachePayloadRequest(Request, Ref, PolicyFromURL);
+ return HandleCacheValueRequest(Request, Ref, PolicyFromURL);
}
return;
@@ -202,12 +210,15 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
const ZenContentType AcceptType = Request.AcceptContentType();
const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData);
const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord);
- const bool QueryUpstream = EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote);
bool Success = false;
- ZenCacheValue LocalCacheValue;
+ ZenCacheValue ClientResultValue;
+ if (!EnumHasAnyFlags(PolicyFromURL, CachePolicy::Query))
+ {
+ return Request.WriteResponse(HttpResponseCode::OK);
+ }
- if (m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, LocalCacheValue))
+ if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, ClientResultValue))
{
Success = true;
@@ -216,15 +227,22 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
CbPackage Package;
uint32_t MissingCount = 0;
- CbObjectView CacheRecord(LocalCacheValue.Value.Data());
- CacheRecord.IterateAttachments([this, &MissingCount, &Package](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ CbObjectView CacheRecord(ClientResultValue.Value.Data());
+ CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) {
+ if (SkipData)
{
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ MissingCount += m_CidStore.ContainsChunk(AttachmentHash.AsHash()) ? 0 : 1;
}
else
{
- MissingCount++;
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ }
+ else
+ {
+ MissingCount++;
+ }
}
});
@@ -232,13 +250,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (Success)
{
- Package.SetObject(LoadCompactBinaryObject(LocalCacheValue.Value));
+ Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value));
BinaryWriter MemStream;
Package.Save(MemStream);
- LocalCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- LocalCacheValue.Value.SetContentType(HttpContentType::kCbPackage);
+ ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage);
}
}
}
@@ -248,21 +266,21 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)",
Ref.BucketSegment,
Ref.HashKey,
- NiceBytes(LocalCacheValue.Value.Size()),
- ToString(LocalCacheValue.Value.GetContentType()));
+ NiceBytes(ClientResultValue.Value.Size()),
+ ToString(ClientResultValue.Value.GetContentType()));
m_CacheStats.HitCount++;
-
- if (SkipData)
+ if (SkipData && AcceptType == ZenContentType::kBinary)
{
return Request.WriteResponse(HttpResponseCode::OK);
}
else
{
- return Request.WriteResponse(HttpResponseCode::OK, LocalCacheValue.Value.GetContentType(), LocalCacheValue.Value);
+ // Other types handled SkipData when constructing the ClientResultValue
+ return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
}
}
- else if (!QueryUpstream)
+ else if (!EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote))
{
ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType));
m_CacheStats.MissCount++;
@@ -272,9 +290,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
// Issue upstream query asynchronously in order to keep requests flowing without
// hogging I/O servicing threads with blocking work
- Request.WriteResponseAsync([this, AcceptType, SkipData, PartialRecord, Ref](HttpServerRequest& AsyncRequest) {
- bool Success = false;
- ZenCacheValue UpstreamCacheValue;
+ Request.WriteResponseAsync([this, AcceptType, PolicyFromURL, Ref](HttpServerRequest& AsyncRequest) {
+ bool Success = false;
+ const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord);
+ const bool QueryLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal);
+ const bool StoreLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreLocal);
+ const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData);
+ ZenCacheValue ClientResultValue;
metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming);
@@ -283,8 +305,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
{
Success = true;
- UpstreamCacheValue.Value = UpstreamResult.Value;
- UpstreamCacheValue.Value.SetContentType(AcceptType);
+ ClientResultValue.Value = UpstreamResult.Value;
+ ClientResultValue.Value.SetContentType(AcceptType);
if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject)
{
@@ -299,30 +321,35 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
Ref.HashKey,
ToString(AcceptType));
}
+
+ // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data
}
- if (Success)
+ if (Success && StoreLocal)
{
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, UpstreamCacheValue);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, ClientResultValue);
}
}
else if (AcceptType == ZenContentType::kCbPackage)
{
CbPackage Package;
- if (Package.TryLoad(UpstreamCacheValue.Value))
+ if (Package.TryLoad(ClientResultValue.Value))
{
CbObject CacheRecord = Package.GetObject();
AttachmentCount Count;
- CacheRecord.IterateAttachments([this, &Package, &Ref, &Count](CbFieldView HashView) {
+ CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) {
if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash()))
{
if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
{
- auto InsertResult = m_CidStore.AddChunk(Compressed);
- if (InsertResult.New)
+ if (StoreLocal)
{
- Count.New++;
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
}
Count.Valid++;
}
@@ -335,10 +362,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
Count.Invalid++;
}
}
- else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash()))
+ else if (QueryLocal)
{
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
- Count.Valid++;
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash()))
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ Count.Valid++;
+ }
}
Count.Total++;
});
@@ -349,13 +379,24 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+ if (StoreLocal)
+ {
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+ }
BinaryWriter MemStream;
- Package.Save(MemStream);
+ if (SkipData)
+ {
+ // Save a package containing only the object.
+ CbPackage(Package.GetObject()).Save(MemStream);
+ }
+ else
+ {
+ Package.Save(MemStream);
+ }
- UpstreamCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- UpstreamCacheValue.Value.SetContentType(ZenContentType::kCbPackage);
+ ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage);
}
else
{
@@ -379,19 +420,21 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)",
Ref.BucketSegment,
Ref.HashKey,
- NiceBytes(UpstreamCacheValue.Value.Size()),
- ToString(UpstreamCacheValue.Value.GetContentType()));
+ NiceBytes(ClientResultValue.Value.Size()),
+ ToString(ClientResultValue.Value.GetContentType()));
m_CacheStats.HitCount++;
m_CacheStats.UpstreamHitCount++;
- if (SkipData)
+ if (SkipData && AcceptType == ZenContentType::kBinary)
{
AsyncRequest.WriteResponse(HttpResponseCode::OK);
}
else
{
- AsyncRequest.WriteResponse(HttpResponseCode::OK, UpstreamCacheValue.Value.GetContentType(), UpstreamCacheValue.Value);
+ // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally
+ // metadata.
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value);
}
}
else
@@ -468,8 +511,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -550,8 +594,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
- m_UpstreamCache.EnqueueUpstream(
- {.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)});
+ m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .Key = {Ref.BucketSegment, Ref.HashKey},
+ .ValueContentIds = std::move(ValidAttachments)});
}
Request.WriteResponse(HttpResponseCode::Created);
@@ -563,16 +608,16 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
}
void
-HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
switch (Request.RequestVerb())
{
case HttpVerb::kHead:
case HttpVerb::kGet:
- HandleGetCachePayload(Request, Ref, PolicyFromURL);
+ HandleGetCacheValue(Request, Ref, PolicyFromURL);
break;
case HttpVerb::kPut:
- HandlePutCachePayload(Request, Ref, PolicyFromURL);
+ HandlePutCacheValue(Request, Ref, PolicyFromURL);
break;
default:
break;
@@ -580,16 +625,17 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request
}
void
-HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
- IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
+ IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId);
bool InUpstreamCache = false;
CachePolicy Policy = PolicyFromURL;
- const bool QueryUpstream = !Payload && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
+ const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote);
if (QueryUpstream)
{
- if (auto UpstreamResult = m_UpstreamCache.GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success)
+ if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId);
+ UpstreamResult.Success)
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
{
@@ -603,9 +649,9 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
}
}
- if (!Payload)
+ if (!Value)
{
- ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType()));
+ ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType()));
m_CacheStats.MissCount++;
return Request.WriteResponse(HttpResponseCode::NotFound);
}
@@ -613,9 +659,9 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
Ref.BucketSegment,
Ref.HashKey,
- Ref.PayloadId,
- NiceBytes(Payload.Size()),
- ToString(Payload.GetContentType()),
+ Ref.ValueContentId,
+ NiceBytes(Value.Size()),
+ ToString(Value.GetContentType()),
InUpstreamCache ? "UPSTREAM" : "LOCAL");
m_CacheStats.HitCount++;
@@ -630,12 +676,12 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
}
else
{
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload);
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value);
}
}
void
-HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
+HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL)
{
// Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored
ZEN_UNUSED(PolicyFromURL);
@@ -656,9 +702,11 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
}
- if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId)
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueId does not match attachment hash"sv);
+ return Request.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "ValueContentId does not match attachment hash"sv);
}
CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed);
@@ -666,7 +714,7 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques
ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})",
Ref.BucketSegment,
Ref.HashKey,
- Ref.PayloadId,
+ Ref.ValueContentId,
NiceBytes(Body.Size()),
ToString(Body.GetContentType()),
Result.New ? "NEW" : "OLD");
@@ -695,13 +743,13 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
std::string_view HashSegment;
- std::string_view PayloadSegment;
+ std::string_view ValueSegment;
- std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/');
+ std::string_view::size_type ValueSplitOffset = Key.find_last_of('/');
// We know there is a slash so no need to check for npos return
- if (PayloadSplitOffset == BucketSplitOffset)
+ if (ValueSplitOffset == BucketSplitOffset)
{
// Basic cache record lookup
HashSegment = Key.substr(BucketSplitOffset + 1);
@@ -709,8 +757,8 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
else
{
// Cache record + valueid lookup
- HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1);
- PayloadSegment = Key.substr(PayloadSplitOffset + 1);
+ HashSegment = Key.substr(BucketSplitOffset + 1, ValueSplitOffset - BucketSplitOffset - 1);
+ ValueSegment = Key.substr(ValueSplitOffset + 1);
}
if (HashSegment.size() != IoHash::StringLength)
@@ -718,9 +766,9 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
return false;
}
- if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength)
+ if (!ValueSegment.empty() && ValueSegment.size() == IoHash::StringLength)
{
- const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash);
+ const bool IsOk = ParseHexBytes(ValueSegment.data(), ValueSegment.size(), OutRef.ValueContentId.Hash);
if (!IsOk)
{
@@ -729,7 +777,7 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
else
{
- OutRef.PayloadId = IoHash::Zero;
+ OutRef.ValueContentId = IoHash::Zero;
}
const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash);
@@ -752,27 +800,44 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
const HttpContentType ContentType = Request.RequestContentType();
const HttpContentType AcceptType = Request.AcceptContentType();
- if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage)
+ if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) ||
+ AcceptType != HttpContentType::kCbPackage)
{
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- Request.WriteResponseAsync(
- [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) {
- const std::string_view Method = RpcRequest["Method"sv].AsString();
- if (Method == "GetCacheRecords"sv)
- {
- HandleRpcGetCacheRecords(AsyncRequest, RpcRequest);
- }
- else if (Method == "GetCacheValues"sv)
- {
- HandleRpcGetCachePayloads(AsyncRequest, RpcRequest);
- }
- else
- {
- AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
- }
- });
+ Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable {
+ CbPackage Package;
+ CbObjectView Object;
+ CbObject ObjectBuffer;
+ if (ContentType == HttpContentType::kCbObject)
+ {
+ ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body));
+ Object = ObjectBuffer;
+ }
+ else
+ {
+ Package = ParsePackageMessage(Body);
+ Object = Package.GetObject();
+ }
+ const std::string_view Method = Object["Method"sv].AsString();
+ if (Method == "PutCacheRecords"sv)
+ {
+ HandleRpcPutCacheRecords(AsyncRequest, Package);
+ }
+ else if (Method == "GetCacheRecords"sv)
+ {
+ HandleRpcGetCacheRecords(AsyncRequest, Object);
+ }
+ else if (Method == "GetCacheValues"sv)
+ {
+ HandleRpcGetCacheValues(AsyncRequest, Object);
+ }
+ else
+ {
+ AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ });
}
break;
default:
@@ -782,24 +847,154 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
}
void
+HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest)
+{
+ ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
+ CbObjectView BatchObject = BatchRequest.GetObject();
+
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+ CachePolicy DefaultPolicy;
+
+ ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
+
+ std::string_view PolicyText = Params["DefaultPolicy"].AsString();
+ DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
+ std::vector<bool> Results;
+ for (CbFieldView RequestField : Params["Requests"sv])
+ {
+ CbObjectView RequestObject = RequestField.AsObjectView();
+ CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView();
+ CbObjectView KeyView = RecordObject["Key"sv].AsObjectView();
+ CbFieldView BucketField = KeyView["Bucket"sv];
+ CbFieldView HashField = KeyView["Hash"sv];
+ CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash());
+ if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ CacheRecordPolicy Policy = CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
+ PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)};
+
+ PutResult Result = PutCacheRecord(PutRequest, &BatchRequest);
+
+ if (Result == PutResult::Invalid)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ Results.push_back(Result == PutResult::Success);
+ }
+ if (Results.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ CbObjectWriter ResponseObject;
+ ResponseObject.BeginArray("Result"sv);
+ for (bool Value : Results)
+ {
+ ResponseObject.AddBool(Value);
+ }
+ ResponseObject.EndArray();
+
+ CbPackage RpcResponse;
+ RpcResponse.SetObject(ResponseObject.Save());
+
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+HttpStructuredCacheService::PutResult
+HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
+{
+ std::vector<IoHash> ValidAttachments;
+ AttachmentCount Count;
+ CbObjectView Record = Request.RecordObject;
+ uint64_t RecordObjectSize = Record.GetSize();
+ uint64_t TransferredSize = RecordObjectSize;
+
+ Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+
+ ValidAttachments.emplace_back(InsertResult.DecompressedId);
+
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ Count.Valid++;
+ TransferredSize += Chunk.GetCompressedSize();
+ }
+ else
+ {
+ ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ToString(HttpContentType::kCbPackage),
+ ValueHash);
+ Count.Invalid++;
+ }
+ }
+ else if (m_CidStore.ContainsChunk(ValueHash))
+ {
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
+
+ if (Count.Invalid > 0)
+ {
+ return PutResult::Invalid;
+ }
+
+ ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)",
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ NiceBytes(TransferredSize),
+ Count.New,
+ Count.Valid,
+ Count.Total);
+
+ ZenCacheValue CacheValue;
+ CacheValue.Value = IoBuffer(Record.GetSize());
+ Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue);
+
+ const bool IsPartialRecord = Count.Valid != Count.Total;
+
+ if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
+ {
+ m_UpstreamCache.EnqueueUpstream(
+ {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)});
+ }
+ return PutResult::Success;
+}
+
+void
HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
{
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
CbPackage RpcResponse;
- CacheRecordPolicy Policy;
- CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ CacheRecordPolicy BatchPolicy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView());
std::vector<CacheKey> CacheKeys;
std::vector<IoBuffer> CacheValues;
std::vector<size_t> UpstreamRequests;
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
- Policy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView());
-
- const bool PartialRecord = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::PartialRecord);
- const bool QueryRemote = EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote);
-
for (CbFieldView KeyView : Params["CacheKeys"sv])
{
CbObjectView KeyObject = KeyView.AsObjectView();
@@ -816,44 +1011,84 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys)
{
ZenCacheValue CacheValue;
- uint32_t MissingCount = 0;
+ uint32_t MissingCount = 0;
+ uint32_t MissingReadFromUpstreamCount = 0;
- if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
{
CbObjectView CacheRecord(CacheValue.Value.Data());
- CacheRecord.IterateAttachments([this, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- ZEN_ASSERT(Chunk.GetSize() > 0);
- RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
- }
- else
- {
- MissingCount++;
- }
- });
+ CacheRecord.IterateAttachments(
+ [this, &MissingCount, &MissingReadFromUpstreamCount, &RpcResponse, &BatchPolicy](CbFieldView AttachmentHash) {
+ CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy();
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal))
+ {
+ // A value that is requested without the Query flag (such as None/Disable) does not count as missing, because we
+ // didn't ask for it and thus the record is complete in its absence.
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ MissingCount++;
+ }
+ }
+ else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash()))
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ }
+ MissingCount++;
+ }
+ }
+ else
+ {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ ZEN_ASSERT(Chunk.GetSize() > 0);
+ RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ }
+ else
+ {
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
+ {
+ MissingReadFromUpstreamCount++;
+ }
+ MissingCount++;
+ }
+ }
+ });
}
- if (CacheValue.Value && (MissingCount == 0 || PartialRecord))
+ if ((!CacheValue.Value && EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryRemote)) ||
+ MissingReadFromUpstreamCount != 0)
+ {
+ UpstreamRequests.push_back(KeyIndex);
+ }
+ else if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)))
{
ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}",
Key.Bucket,
Key.Hash,
NiceBytes(CacheValue.Value.Size()),
ToString(CacheValue.Value.GetContentType()),
- MissingCount ? "(PARTIAl)" : ""sv);
+ MissingCount ? "(PARTIAL)" : ""sv);
CacheValues[KeyIndex] = std::move(CacheValue.Value);
m_CacheStats.HitCount++;
}
- else if (QueryRemote)
- {
- UpstreamRequests.push_back(KeyIndex);
- }
else
{
- ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAl)"sv : ""sv);
- m_CacheStats.MissCount++;
+ if (!EnumHasAnyFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::Query))
+ {
+ // If they requested no query, do not record this as a miss
+ ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash);
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAL)"sv : ""sv);
+ m_CacheStats.MissCount++;
+ }
}
++KeyIndex;
@@ -861,7 +1096,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
if (!UpstreamRequests.empty())
{
- const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, PartialRecord](CacheRecordGetCompleteParams&& Params) {
+ const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, &BatchPolicy](CacheRecordGetCompleteParams&& Params) {
ZEN_ASSERT(Params.KeyIndex < CacheValues.size());
IoBuffer CacheValue;
@@ -869,37 +1104,52 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
if (Params.Record)
{
- Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count](CbFieldView HashView) {
- if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash()))
+ Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count, &BatchPolicy](CbFieldView HashView) {
+ CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy();
+ bool FoundInUpstream = false;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote))
{
- if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash()))
{
- auto InsertResult = m_CidStore.AddChunk(Compressed);
- if (InsertResult.New)
+ FoundInUpstream = true;
+ if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
{
- Count.New++;
- }
- Count.Valid++;
+ FoundInUpstream = true;
+ if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal))
+ {
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ }
+ Count.Valid++;
- RpcResponse.AddAttachment(CbAttachment(Compressed));
- }
- else
- {
- ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'",
- HashView.AsHash(),
- Params.Key.Bucket,
- Params.Key.Hash);
- Count.Invalid++;
+ if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData))
+ {
+ RpcResponse.AddAttachment(CbAttachment(Compressed));
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'",
+ HashView.AsHash(),
+ Params.Key.Bucket,
+ Params.Key.Hash);
+ Count.Invalid++;
+ }
}
}
- else if (m_CidStore.ContainsChunk(HashView.AsHash()))
+ if (!FoundInUpstream && EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal) &&
+ m_CidStore.ContainsChunk(HashView.AsHash()))
{
+ // We added the attachment for this Value in the local loop before calling m_UpstreamCache
Count.Valid++;
}
Count.Total++;
});
- if ((Count.Valid == Count.Total) || PartialRecord)
+ if ((Count.Valid == Count.Total) || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))
{
CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer();
}
@@ -917,9 +1167,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
Count.Total);
CacheValue.SetContentType(ZenContentType::kCbObject);
-
CacheValues[Params.KeyIndex] = CacheValue;
- m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue});
+ if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::StoreLocal))
+ {
+ m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue});
+ }
m_CacheStats.HitCount++;
m_CacheStats.UpstreamHitCount++;
@@ -932,7 +1184,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
}
};
- m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
+ m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, BatchPolicy, std::move(OnCacheRecordGetComplete));
}
CbObjectWriter ResponseObject;
@@ -963,9 +1215,9 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req
}
void
-HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
+HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
{
- ZEN_TRACE_CPU("Z$::RpcGetCachePayloads");
+ ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
@@ -980,7 +1232,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash();
- const Oid PayloadId = RequestObject["ValueId"sv].AsObjectId();
+ const Oid ValueId = RequestObject["ValueId"sv].AsObjectId();
const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64();
const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
std::string_view PolicyText = RequestObject["Policy"sv].AsString();
@@ -989,7 +1241,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
// Note we could use emplace_back here but [Apple] LLVM-12's C++ library
// can't infer a constructor like other platforms (or can't handle an
// initializer list like others do).
- ChunkRequests.push_back({Key, ChunkId, PayloadId, RawOffset, RawSize, ChunkPolicy});
+ ChunkRequests.push_back({Key, ChunkId, ValueId, RawOffset, RawSize, ChunkPolicy});
}
if (ChunkRequests.empty())
@@ -1002,8 +1254,8 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
// Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId)
// is missing, load the cache record and try to find the raw hash from the ValueId.
{
- const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash {
- if (PayloadId)
+ const auto GetChunkIdFromValueId = [](CbObjectView Record, const Oid& ValueId) -> IoHash {
+ if (ValueId)
{
// A valid ValueId indicates that the caller is searching for a Value in a Record
// that was Put with ICacheStore::Put
@@ -1012,7 +1264,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
CbObjectView ValueObject = ValueView.AsObjectView();
const Oid Id = ValueObject["Id"sv].AsObjectId();
- if (Id == PayloadId)
+ if (Id == ValueId)
{
return ValueObject["RawHash"sv].AsHash();
}
@@ -1022,7 +1274,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
if (CbObjectView ValueObject = Record["Value"sv].AsObjectView())
{
const Oid Id = ValueObject["Id"sv].AsObjectId();
- if (Id == PayloadId)
+ if (Id == ValueId)
{
return ValueObject["RawHash"sv].AsHash();
}
@@ -1033,7 +1285,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
CbObjectView AttachmentObject = AttachmentView.AsObjectView();
const Oid Id = AttachmentObject["Id"sv].AsObjectId();
- if (Id == PayloadId)
+ if (Id == ValueId)
{
return AttachmentObject["RawHash"sv].AsHash();
}
@@ -1071,7 +1323,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
if (CurrentRecordBuffer)
{
- ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId);
+ ChunkRequest.ChunkId = GetChunkIdFromValueId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.ValueId);
}
}
}
@@ -1118,8 +1370,8 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
if (!UpstreamRequests.empty())
{
- const auto OnCachePayloadGetComplete = [this, &Chunks](CachePayloadGetCompleteParams&& Params) {
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload)))
+ const auto OnCacheValueGetComplete = [this, &Chunks](CacheValueGetCompleteParams&& Params) {
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)))
{
m_CidStore.AddChunk(Compressed);
@@ -1127,11 +1379,11 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
Params.Request.Key.Bucket,
Params.Request.Key.Hash,
Params.Request.ChunkId,
- NiceBytes(Params.Payload.GetSize()),
+ NiceBytes(Params.Value.GetSize()),
"UPSTREAM");
ZEN_ASSERT(Params.RequestIndex < Chunks.size());
- Chunks[Params.RequestIndex] = std::move(Params.Payload);
+ Chunks[Params.RequestIndex] = std::move(Params.Value);
m_CacheStats.HitCount++;
m_CacheStats.UpstreamHitCount++;
@@ -1143,7 +1395,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re
}
};
- m_UpstreamCache.GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
+ m_UpstreamCache.GetCacheValues(ChunkRequests, UpstreamRequests, std::move(OnCacheValueGetComplete));
}
CbPackage RpcResponse;