diff options
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 683 |
1 files changed, 449 insertions, 234 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 5918d5178..d39b95a1e 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -7,6 +7,7 @@ #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> +#include <zencore/enumflags.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> @@ -14,6 +15,7 @@ #include <zencore/timer.h> #include <zencore/trace.h> #include <zenhttp/httpserver.h> +#include <zenhttp/httpshared.h> #include <zenstore/cas.h> #include <zenutil/cache/cache.h> @@ -42,11 +44,8 @@ using namespace std::literals; CachePolicy ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) { - const CachePolicy QueryPolicy = zen::ParseQueryCachePolicy(QueryParams.GetValue("query"sv)); - const CachePolicy StorePolicy = zen::ParseStoreCachePolicy(QueryParams.GetValue("store"sv)); - const CachePolicy SkipPolicy = zen::ParseSkipCachePolicy(QueryParams.GetValue("skip"sv)); - - return QueryPolicy | StorePolicy | SkipPolicy; + std::string_view PolicyText = QueryParams.GetValue("Policy"sv); + return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; } struct AttachmentCount @@ -57,6 +56,13 @@ struct AttachmentCount uint32_t Total = 0; }; +struct PutRequestData +{ + CacheKey Key; + CbObjectView RecordObject; + CacheRecordPolicy Policy; +}; + ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, @@ -134,16 +140,15 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } - const auto QueryParams = Request.GetQueryParams(); - CachePolicy Policy = ParseCachePolicy(QueryParams); + CachePolicy PolicyFromURL = ParseCachePolicy(Request.GetQueryParams()); - if (Ref.PayloadId == IoHash::Zero) + if (Ref.ValueContentId == IoHash::Zero) { - return HandleCacheRecordRequest(Request, Ref, Policy); + return HandleCacheRecordRequest(Request, Ref, PolicyFromURL); } else { - return HandleCachePayloadRequest(Request, Ref, Policy); + return HandleCacheValueRequest(Request, Ref, PolicyFromURL); } return; @@ -180,19 +185,19 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, } void -HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { - HandleGetCacheRecord(Request, Ref, Policy); + HandleGetCacheRecord(Request, Ref, PolicyFromURL); } break; case HttpVerb::kPut: - HandlePutCacheRecord(Request, Ref, Policy); + HandlePutCacheRecord(Request, Ref, PolicyFromURL); break; default: break; @@ -200,18 +205,20 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, } void -HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { - const ZenContentType AcceptType = Request.AcceptContentType(); - const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; - const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; - const bool PartialOnError = (Policy & CachePolicy::PartialOnError) == CachePolicy::PartialOnError; - const bool QueryUpstream = (Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote; + const ZenContentType AcceptType = Request.AcceptContentType(); + const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); + const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); bool Success = false; - ZenCacheValue LocalCacheValue; + ZenCacheValue ClientResultValue; + if (!EnumHasAnyFlags(PolicyFromURL, CachePolicy::Query)) + { + return Request.WriteResponse(HttpResponseCode::OK); + } - if (m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, LocalCacheValue)) + if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { Success = true; @@ -220,14 +227,11 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request CbPackage Package; uint32_t MissingCount = 0; - CbObjectView CacheRecord(LocalCacheValue.Value.Data()); - CacheRecord.IterateAttachments([this, SkipAttachments, &MissingCount, &Package](CbFieldView AttachmentHash) { - if (SkipAttachments && MissingCount == 0) + CbObjectView CacheRecord(ClientResultValue.Value.Data()); + CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { + if (SkipData) { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) - { - MissingCount++; - } + MissingCount += m_CidStore.ContainsChunk(AttachmentHash.AsHash()) ? 0 : 1; } else { @@ -242,17 +246,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } }); - Success = MissingCount == 0 || PartialOnError; + Success = MissingCount == 0 || PartialRecord; if (Success) { - Package.SetObject(LoadCompactBinaryObject(LocalCacheValue.Value)); + Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); BinaryWriter MemStream; Package.Save(MemStream); - LocalCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - LocalCacheValue.Value.SetContentType(HttpContentType::kCbPackage); + ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); } } } @@ -262,21 +266,21 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", Ref.BucketSegment, Ref.HashKey, - NiceBytes(LocalCacheValue.Value.Size()), - ToString(LocalCacheValue.Value.GetContentType())); + NiceBytes(ClientResultValue.Value.Size()), + ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; - - if (SkipData) + if (SkipData && AcceptType == ZenContentType::kBinary) { return Request.WriteResponse(HttpResponseCode::OK); } else { - return Request.WriteResponse(HttpResponseCode::OK, LocalCacheValue.Value.GetContentType(), LocalCacheValue.Value); + // Other types handled SkipData when constructing the ClientResultValue + return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } - else if (!QueryUpstream) + else if (!EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote)) { ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; @@ -286,9 +290,13 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request // Issue upstream query asynchronously in order to keep requests flowing without // hogging I/O servicing threads with blocking work - Request.WriteResponseAsync([this, AcceptType, SkipData, SkipAttachments, PartialOnError, Ref](HttpServerRequest& AsyncRequest) { - bool Success = false; - ZenCacheValue UpstreamCacheValue; + Request.WriteResponseAsync([this, AcceptType, PolicyFromURL, Ref](HttpServerRequest& AsyncRequest) { + bool Success = false; + const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); + const bool QueryLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal); + const bool StoreLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreLocal); + const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); + ZenCacheValue ClientResultValue; metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); @@ -297,8 +305,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { Success = true; - UpstreamCacheValue.Value = UpstreamResult.Value; - UpstreamCacheValue.Value.SetContentType(AcceptType); + ClientResultValue.Value = UpstreamResult.Value; + ClientResultValue.Value.SetContentType(AcceptType); if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) { @@ -313,72 +321,82 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request Ref.HashKey, ToString(AcceptType)); } + + // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data } - if (Success) + if (Success && StoreLocal) { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, UpstreamCacheValue); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, ClientResultValue); } } else if (AcceptType == ZenContentType::kCbPackage) { CbPackage Package; - if (Package.TryLoad(UpstreamCacheValue.Value)) + if (Package.TryLoad(ClientResultValue.Value)) { CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; - CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, SkipAttachments](CbFieldView HashView) { + CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) { if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash())) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { - auto InsertResult = m_CidStore.AddChunk(Compressed); - if (InsertResult.New) + if (StoreLocal) { - Count.New++; + auto InsertResult = m_CidStore.AddChunk(Compressed); + if (InsertResult.New) + { + Count.New++; + } } Count.Valid++; } else { - ZEN_WARN("Uncompressed payload '{}' from upstream cache record '{}/{}'", + ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", HashView.AsHash(), Ref.BucketSegment, Ref.HashKey); Count.Invalid++; } } - else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) + else if (QueryLocal) { - if (!SkipAttachments) + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) { Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + Count.Valid++; } - Count.Valid++; } Count.Total++; }); - if ((Count.Valid == Count.Total) || PartialOnError) + if ((Count.Valid == Count.Total) || PartialRecord) { ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - - if (SkipAttachments) + if (StoreLocal) { - Package.Reset(); - Package.SetObject(CacheRecord); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); } BinaryWriter MemStream; - Package.Save(MemStream); + if (SkipData) + { + // Save a package containing only the object. + CbPackage(Package.GetObject()).Save(MemStream); + } + else + { + Package.Save(MemStream); + } - UpstreamCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - UpstreamCacheValue.Value.SetContentType(ZenContentType::kCbPackage); + ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); } else { @@ -402,19 +420,21 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", Ref.BucketSegment, Ref.HashKey, - NiceBytes(UpstreamCacheValue.Value.Size()), - ToString(UpstreamCacheValue.Value.GetContentType())); + NiceBytes(ClientResultValue.Value.Size()), + ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; - if (SkipData) + if (SkipData && AcceptType == ZenContentType::kBinary) { AsyncRequest.WriteResponse(HttpResponseCode::OK); } else { - AsyncRequest.WriteResponse(HttpResponseCode::OK, UpstreamCacheValue.Value.GetContentType(), UpstreamCacheValue.Value); + // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally + // metadata. + AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } else @@ -427,7 +447,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } void -HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { IoBuffer Body = Request.ReadPayload(); @@ -436,8 +456,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::BadRequest); } - const HttpContentType ContentType = Request.RequestContentType(); - const bool StoreUpstream = (Policy & CachePolicy::StoreRemote) == CachePolicy::StoreRemote; + const HttpContentType ContentType = Request.RequestContentType(); Body.SetContentType(ContentType); @@ -446,7 +465,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request ZEN_DEBUG("PUT - '{}/{}' {} '{}'", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType)); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - if (StoreUpstream) + if (EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreRemote)) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = {Ref.BucketSegment, Ref.HashKey}}); } @@ -463,6 +482,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); } + CachePolicy Policy = PolicyFromURL; CbObjectView CacheRecord(Body.Data()); std::vector<IoHash> ValidAttachments; int32_t TotalCount = 0; @@ -489,10 +509,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); - if (StoreUpstream && !IsPartialRecord) + if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); @@ -506,6 +527,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); } + CachePolicy Policy = PolicyFromURL; CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; @@ -570,10 +592,11 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request const bool IsPartialRecord = Count.Valid != Count.Total; - if (StoreUpstream && !IsPartialRecord) + if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { - m_UpstreamCache.EnqueueUpstream( - {.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .Key = {Ref.BucketSegment, Ref.HashKey}, + .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); @@ -585,18 +608,16 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } void -HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: - { - HandleGetCachePayload(Request, Ref, Policy); - } + HandleGetCacheValue(Request, Ref, PolicyFromURL); break; case HttpVerb::kPut: - HandlePutCachePayload(Request, Ref, Policy); + HandlePutCacheValue(Request, Ref, PolicyFromURL); break; default: break; @@ -604,15 +625,17 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request } void -HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { - IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); - bool InUpstreamCache = false; - const bool QueryUpstream = !Payload && (Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote; + IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); + bool InUpstreamCache = false; + CachePolicy Policy = PolicyFromURL; + const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache.GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) + if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); + UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { @@ -621,14 +644,14 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques } else { - ZEN_WARN("got uncompressed upstream cache payload"); + ZEN_WARN("got uncompressed upstream cache value"); } } } - if (!Payload) + if (!Value) { - ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType())); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType())); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -636,9 +659,9 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, - Ref.PayloadId, - NiceBytes(Payload.Size()), - ToString(Payload.GetContentType()), + Ref.ValueContentId, + NiceBytes(Value.Size()), + ToString(Value.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); m_CacheStats.HitCount++; @@ -647,21 +670,21 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques m_CacheStats.UpstreamHitCount++; } - if ((Policy & CachePolicy::SkipData) == CachePolicy::SkipData) + if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) { Request.WriteResponse(HttpResponseCode::OK); } else { - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); } } void -HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { - // Note: Individual cache payloads are not propagated upstream until a valid cache record has been stored - ZEN_UNUSED(Policy); + // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored + ZEN_UNUSED(PolicyFromURL); IoBuffer Body = Request.ReadPayload(); @@ -679,9 +702,11 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } - if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId) { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv); + return Request.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "ValueContentId does not match attachment hash"sv); } CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); @@ -689,7 +714,7 @@ HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Reques ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, - Ref.PayloadId, + Ref.ValueContentId, NiceBytes(Body.Size()), ToString(Body.GetContentType()), Result.New ? "NEW" : "OLD"); @@ -718,22 +743,22 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } std::string_view HashSegment; - std::string_view PayloadSegment; + std::string_view ValueSegment; - std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/'); + std::string_view::size_type ValueSplitOffset = Key.find_last_of('/'); // We know there is a slash so no need to check for npos return - if (PayloadSplitOffset == BucketSplitOffset) + if (ValueSplitOffset == BucketSplitOffset) { // Basic cache record lookup HashSegment = Key.substr(BucketSplitOffset + 1); } else { - // Cache record + payload lookup - HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1); - PayloadSegment = Key.substr(PayloadSplitOffset + 1); + // Cache record + valueid lookup + HashSegment = Key.substr(BucketSplitOffset + 1, ValueSplitOffset - BucketSplitOffset - 1); + ValueSegment = Key.substr(ValueSplitOffset + 1); } if (HashSegment.size() != IoHash::StringLength) @@ -741,9 +766,9 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& return false; } - if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength) + if (!ValueSegment.empty() && ValueSegment.size() == IoHash::StringLength) { - const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); + const bool IsOk = ParseHexBytes(ValueSegment.data(), ValueSegment.size(), OutRef.ValueContentId.Hash); if (!IsOk) { @@ -752,7 +777,7 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } else { - OutRef.PayloadId = IoHash::Zero; + OutRef.ValueContentId = IoHash::Zero; } const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); @@ -775,27 +800,44 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) const HttpContentType ContentType = Request.RequestContentType(); const HttpContentType AcceptType = Request.AcceptContentType(); - if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage) + if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || + AcceptType != HttpContentType::kCbPackage) { return Request.WriteResponse(HttpResponseCode::BadRequest); } - Request.WriteResponseAsync( - [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { - const std::string_view Method = RpcRequest["Method"sv].AsString(); - if (Method == "GetCacheRecords"sv) - { - HandleRpcGetCacheRecords(AsyncRequest, RpcRequest); - } - else if (Method == "GetCachePayloads"sv) - { - HandleRpcGetCachePayloads(AsyncRequest, RpcRequest); - } - else - { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); - } - }); + Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable { + CbPackage Package; + CbObjectView Object; + CbObject ObjectBuffer; + if (ContentType == HttpContentType::kCbObject) + { + ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body)); + Object = ObjectBuffer; + } + else + { + Package = ParsePackageMessage(Body); + Object = Package.GetObject(); + } + const std::string_view Method = Object["Method"sv].AsString(); + if (Method == "PutCacheRecords"sv) + { + HandleRpcPutCacheRecords(AsyncRequest, Package); + } + else if (Method == "GetCacheRecords"sv) + { + HandleRpcGetCacheRecords(AsyncRequest, Object); + } + else if (Method == "GetCacheValues"sv) + { + HandleRpcGetCacheValues(AsyncRequest, Object); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + } + }); } break; default: @@ -805,25 +847,154 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) } void +HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) +{ + ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); + CbObjectView BatchObject = BatchRequest.GetObject(); + + CbObjectView Params = BatchObject["Params"sv].AsObjectView(); + CachePolicy DefaultPolicy; + + ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); + + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::vector<bool> Results; + for (CbFieldView RequestField : Params["Requests"sv]) + { + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); + CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); + CbFieldView BucketField = KeyView["Bucket"sv]; + CbFieldView HashField = KeyView["Hash"sv]; + CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + CacheRecordPolicy Policy = CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); + PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)}; + + PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); + + if (Result == PutResult::Invalid) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + Results.push_back(Result == PutResult::Success); + } + if (Results.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (bool Value : Results) + { + ResponseObject.AddBool(Value); + } + ResponseObject.EndArray(); + + CbPackage RpcResponse; + RpcResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +HttpStructuredCacheService::PutResult +HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) +{ + std::vector<IoHash> ValidAttachments; + AttachmentCount Count; + CbObjectView Record = Request.RecordObject; + uint64_t RecordObjectSize = Record.GetSize(); + uint64_t TransferredSize = RecordObjectSize; + + Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { + const IoHash ValueHash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + + ValidAttachments.emplace_back(InsertResult.DecompressedId); + + if (InsertResult.New) + { + Count.New++; + } + Count.Valid++; + TransferredSize += Chunk.GetCompressedSize(); + } + else + { + ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", + Request.Key.Bucket, + Request.Key.Hash, + ToString(HttpContentType::kCbPackage), + ValueHash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(ValueHash)) + { + ValidAttachments.emplace_back(ValueHash); + Count.Valid++; + } + Count.Total++; + }); + + if (Count.Invalid > 0) + { + return PutResult::Invalid; + } + + ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", + Request.Key.Bucket, + Request.Key.Hash, + NiceBytes(TransferredSize), + Count.New, + Count.Valid, + Count.Total); + + ZenCacheValue CacheValue; + CacheValue.Value = IoBuffer(Record.GetSize()); + Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); + CacheValue.Value.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue); + + const bool IsPartialRecord = Count.Valid != Count.Total; + + if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) + { + m_UpstreamCache.EnqueueUpstream( + {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); + } + return PutResult::Success; +} + +void HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); CbPackage RpcResponse; - CacheRecordPolicy Policy; - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + CacheRecordPolicy BatchPolicy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView()); std::vector<CacheKey> CacheKeys; std::vector<IoBuffer> CacheValues; std::vector<size_t> UpstreamRequests; ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); - CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy); - - const bool PartialOnError = Policy.HasRecordPolicy(CachePolicy::PartialOnError); - const bool SkipAttachments = Policy.HasRecordPolicy(CachePolicy::SkipAttachments); - const bool QueryRemote = Policy.HasRecordPolicy(CachePolicy::QueryRemote); - for (CbFieldView KeyView : Params["CacheKeys"sv]) { CbObjectView KeyObject = KeyView.AsObjectView(); @@ -840,54 +1011,84 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys) { ZenCacheValue CacheValue; - uint32_t MissingCount = 0; + uint32_t MissingCount = 0; + uint32_t MissingReadFromUpstreamCount = 0; - if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) { CbObjectView CacheRecord(CacheValue.Value.Data()); - CacheRecord.IterateAttachments([this, SkipAttachments, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) { - if (SkipAttachments && MissingCount == 0) - { - if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + CacheRecord.IterateAttachments( + [this, &MissingCount, &MissingReadFromUpstreamCount, &RpcResponse, &BatchPolicy](CbFieldView AttachmentHash) { + CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) { - MissingCount++; + // A value that is requested without the Query flag (such as None/Disable) does not count as missing, because we + // didn't ask for it and thus the record is complete in its absence. + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + MissingCount++; + } } - } - else - { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { - ZEN_ASSERT(Chunk.GetSize() > 0); - RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + } + MissingCount++; + } } else { - MissingCount++; + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + ZEN_ASSERT(Chunk.GetSize() > 0); + RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + } + else + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + MissingReadFromUpstreamCount++; + } + MissingCount++; + } } - } - }); + }); } - if (CacheValue.Value && (MissingCount == 0 || PartialOnError)) + if ((!CacheValue.Value && EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryRemote)) || + MissingReadFromUpstreamCount != 0) + { + UpstreamRequests.push_back(KeyIndex); + } + else if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}", Key.Bucket, Key.Hash, NiceBytes(CacheValue.Value.Size()), ToString(CacheValue.Value.GetContentType()), - MissingCount ? "(PARTIAl)" : ""sv); + MissingCount ? "(PARTIAL)" : ""sv); CacheValues[KeyIndex] = std::move(CacheValue.Value); m_CacheStats.HitCount++; } - else if (QueryRemote) - { - UpstreamRequests.push_back(KeyIndex); - } else { - ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAl)"sv : ""sv); - m_CacheStats.MissCount++; + if (!EnumHasAnyFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::Query)) + { + // If they requested no query, do not record this as a miss + ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); + } + else + { + ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAL)"sv : ""sv); + m_CacheStats.MissCount++; + } } ++KeyIndex; @@ -895,82 +1096,95 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req if (!UpstreamRequests.empty()) { - const auto OnCacheRecordGetComplete = - [this, &CacheValues, &RpcResponse, PartialOnError, SkipAttachments](CacheRecordGetCompleteParams&& Params) { - ZEN_ASSERT(Params.KeyIndex < CacheValues.size()); + const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, &BatchPolicy](CacheRecordGetCompleteParams&& Params) { + ZEN_ASSERT(Params.KeyIndex < CacheValues.size()); - IoBuffer CacheValue; - AttachmentCount Count; + IoBuffer CacheValue; + AttachmentCount Count; - if (Params.Record) - { - Params.Record.IterateAttachments([this, &RpcResponse, SkipAttachments, &Params, &Count](CbFieldView HashView) { + if (Params.Record) + { + Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count, &BatchPolicy](CbFieldView HashView) { + CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); + bool FoundInUpstream = false; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash())) { + FoundInUpstream = true; if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { - auto InsertResult = m_CidStore.AddChunk(Compressed); - if (InsertResult.New) + FoundInUpstream = true; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { - Count.New++; + auto InsertResult = m_CidStore.AddChunk(Compressed); + if (InsertResult.New) + { + Count.New++; + } } Count.Valid++; - if (!SkipAttachments) + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { RpcResponse.AddAttachment(CbAttachment(Compressed)); } } else { - ZEN_DEBUG("Uncompressed payload '{}' from upstream cache record '{}/{}'", + ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", HashView.AsHash(), Params.Key.Bucket, Params.Key.Hash); Count.Invalid++; } } - else if (m_CidStore.ContainsChunk(HashView.AsHash())) - { - Count.Valid++; - } - Count.Total++; - }); - - if ((Count.Valid == Count.Total) || PartialOnError) + } + if (!FoundInUpstream && EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal) && + m_CidStore.ContainsChunk(HashView.AsHash())) { - CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer(); + // We added the attachment for this Value in the local loop before calling m_UpstreamCache + Count.Valid++; } - } + Count.Total++; + }); - if (CacheValue) + if ((Count.Valid == Count.Total) || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)) { - ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)", - Params.Key.Bucket, - Params.Key.Hash, - NiceBytes(CacheValue.GetSize()), - ToString(HttpContentType::kCbPackage), - Count.New, - Count.Valid, - Count.Total); - - CacheValue.SetContentType(ZenContentType::kCbObject); - - CacheValues[Params.KeyIndex] = CacheValue; - m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue}); - - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount++; + CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer(); } - else + } + + if (CacheValue) + { + ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)", + Params.Key.Bucket, + Params.Key.Hash, + NiceBytes(CacheValue.GetSize()), + ToString(HttpContentType::kCbPackage), + Count.New, + Count.Valid, + Count.Total); + + CacheValue.SetContentType(ZenContentType::kCbObject); + CacheValues[Params.KeyIndex] = CacheValue; + if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) { - const bool IsPartial = Count.Valid != Count.Total; - ZEN_DEBUG("MISS - '{}/{}' {}", Params.Key.Bucket, Params.Key.Hash, IsPartial ? "(partial)"sv : ""sv); - m_CacheStats.MissCount++; + m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue}); } - }; - m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + } + else + { + const bool IsPartial = Count.Valid != Count.Total; + ZEN_DEBUG("MISS - '{}/{}' {}", Params.Key.Bucket, Params.Key.Hash, IsPartial ? "(partial)"sv : ""sv); + m_CacheStats.MissCount++; + } + }; + + m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, BatchPolicy, std::move(OnCacheRecordGetComplete)); } CbObjectWriter ResponseObject; @@ -1001,11 +1215,11 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req } void -HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { - ZEN_TRACE_CPU("Z$::RpcGetCachePayloads"); + ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCachePayloads"sv); + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); std::vector<CacheChunkRequest> ChunkRequests; std::vector<size_t> UpstreamRequests; @@ -1014,19 +1228,20 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re for (CbFieldView RequestView : Params["ChunkRequests"sv]) { - CbObjectView RequestObject = RequestView.AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); - const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash(); - const Oid PayloadId = RequestObject["PayloadId"sv].AsObjectId(); - const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64(); - const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); - const uint32_t ChunkPolicy = RequestObject["Policy"sv].AsUInt32(); + CbObjectView RequestObject = RequestView.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); + const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash(); + const Oid ValueId = RequestObject["ValueId"sv].AsObjectId(); + const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64(); + const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); + std::string_view PolicyText = RequestObject["Policy"sv].AsString(); + const CachePolicy ChunkPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; // Note we could use emplace_back here but [Apple] LLVM-12's C++ library // can't infer a constructor like other platforms (or can't handle an // initializer list like others do). - ChunkRequests.push_back({Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast<CachePolicy>(ChunkPolicy)}); + ChunkRequests.push_back({Key, ChunkId, ValueId, RawOffset, RawSize, ChunkPolicy}); } if (ChunkRequests.empty()) @@ -1036,20 +1251,20 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re Chunks.resize(ChunkRequests.size()); - // Unreal uses a 12 byte ID to address cache record payloads. When the uncompressed hash (ChunkId) - // is missing, load the cache record and try to find the raw hash from the payload ID. + // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) + // is missing, load the cache record and try to find the raw hash from the ValueId. { - const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { - if (PayloadId) + const auto GetChunkIdFromValueId = [](CbObjectView Record, const Oid& ValueId) -> IoHash { + if (ValueId) { - // A valid PayloadId indicates that the caller is searching for a Payload in a Record + // A valid ValueId indicates that the caller is searching for a Value in a Record // that was Put with ICacheStore::Put for (CbFieldView ValueView : Record["Values"sv]) { CbObjectView ValueObject = ValueView.AsObjectView(); const Oid Id = ValueObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return ValueObject["RawHash"sv].AsHash(); } @@ -1059,7 +1274,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (CbObjectView ValueObject = Record["Value"sv].AsObjectView()) { const Oid Id = ValueObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return ValueObject["RawHash"sv].AsHash(); } @@ -1070,7 +1285,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re CbObjectView AttachmentObject = AttachmentView.AsObjectView(); const Oid Id = AttachmentObject["Id"sv].AsObjectId(); - if (Id == PayloadId) + if (Id == ValueId) { return AttachmentObject["RawHash"sv].AsHash(); } @@ -1079,7 +1294,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re } else { - // An invalid PayloadId indicates that the caller is requesting a Value that + // An invalid ValueId indicates that the caller is requesting a Value that // was Put with ICacheStore::PutValue return Record["RawHash"sv].AsHash(); } @@ -1108,15 +1323,15 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (CurrentRecordBuffer) { - ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId); + ChunkRequest.ChunkId = GetChunkIdFromValueId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.ValueId); } } } for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests) { - const bool QueryLocal = (ChunkRequest.Policy & CachePolicy::QueryLocal) == CachePolicy::QueryLocal; - const bool QueryRemote = (ChunkRequest.Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote; + const bool QueryLocal = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryLocal); + const bool QueryRemote = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryRemote); if (QueryLocal) { @@ -1155,8 +1370,8 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re if (!UpstreamRequests.empty()) { - const auto OnCachePayloadGetComplete = [this, &Chunks](CachePayloadGetCompleteParams&& Params) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload))) + const auto OnCacheValueGetComplete = [this, &Chunks](CacheValueGetCompleteParams&& Params) { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value))) { m_CidStore.AddChunk(Compressed); @@ -1164,11 +1379,11 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId, - NiceBytes(Params.Payload.GetSize()), + NiceBytes(Params.Value.GetSize()), "UPSTREAM"); ZEN_ASSERT(Params.RequestIndex < Chunks.size()); - Chunks[Params.RequestIndex] = std::move(Params.Payload); + Chunks[Params.RequestIndex] = std::move(Params.Value); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; @@ -1180,7 +1395,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re } }; - m_UpstreamCache.GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); + m_UpstreamCache.GetCacheValues(ChunkRequests, UpstreamRequests, std::move(OnCacheValueGetComplete)); } CbPackage RpcResponse; |