diff options
| author | mattpetersepic <[email protected]> | 2022-02-01 08:06:36 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-02-01 08:06:36 -0700 |
| commit | 154743f2d2ff2b7163bcf8d7b76eea3e3579aaba (patch) | |
| tree | aef417b5c9a0d5502c7afdb01c4cc598071e956d /zenserver | |
| parent | Tweaked remote_build.py TTY output (diff) | |
| download | zen-154743f2d2ff2b7163bcf8d7b76eea3e3579aaba.tar.xz zen-154743f2d2ff2b7163bcf8d7b76eea3e3579aaba.zip | |
Cache policy support (#47)
Add HandleRpc methods for the remaining ICacheStore requests from unreal: PutCacheValues/GetCacheValues. We now have batched versions for PutCacheRecords,GetCacheRecords,PutCacheValues,GetCacheValues,GetCacheChunks. Add support for CachePolicy flags to all of these batched methods.
* Add Batched PutCacheValues/GetCacheValues. Rename old GetCacheValues to GetCacheChunks.
* HandleRpcGetCacheRecords: Receive a CacheRecordPolicy with each key, and skipdata on attachments we already have.
* Changes to CachePolicy copied from Release-5.0 depot. Change serialization to use the key BasePolicy instead of DefaultValuePolicy.
* GetChunks: Read CacheRecords from remote if necessary to find ContentId. Implement QueryLocal, StoreLocal, and SkipData.
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 1170 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 42 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 261 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 33 |
4 files changed, 1161 insertions, 345 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index facb29e1d..49e5896d1 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -48,6 +48,13 @@ ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; } +CacheRecordPolicy +LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default) +{ + OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object); + return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy); +} + struct AttachmentCount { uint32_t New = 0; @@ -829,10 +836,18 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) { HandleRpcGetCacheRecords(AsyncRequest, Object); } + else if (Method == "PutCacheValues"sv) + { + HandleRpcPutCacheValues(AsyncRequest, Package); + } else if (Method == "GetCacheValues"sv) { HandleRpcGetCacheValues(AsyncRequest, Object); } + else if (Method == "GetCacheChunks"sv) + { + HandleRpcGetCacheChunks(AsyncRequest, Object); + } else { AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); @@ -872,7 +887,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req { return Request.WriteResponse(HttpResponseCode::BadRequest); } - CacheRecordPolicy Policy = CacheRecordPolicy::Load(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); + CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)}; PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); @@ -966,7 +981,7 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack Count.Total); ZenCacheValue CacheValue; - CacheValue.Value = IoBuffer(Record.GetSize()); + CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue); @@ -981,14 +996,15 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack return PutResult::Success; } +#if BACKWARDS_COMPATABILITY_JAN2022 void -HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +HttpStructuredCacheService::HandleRpcGetCacheRecordsLegacy(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); CbPackage RpcResponse; CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); - CacheRecordPolicy BatchPolicy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView()); + CacheRecordPolicy BatchPolicy = LoadCacheRecordPolicy(Params["Policy"sv].AsObjectView()); std::vector<CacheKey> CacheKeys; std::vector<IoBuffer> CacheValues; std::vector<size_t> UpstreamRequests; @@ -1014,7 +1030,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req uint32_t MissingCount = 0; uint32_t MissingReadFromUpstreamCount = 0; - if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue) && + CacheValue.Value.GetContentType() == ZenContentType::kCbObject) { CbObjectView CacheRecord(CacheValue.Value.Data()); CacheRecord.IterateAttachments( @@ -1060,12 +1077,8 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req }); } - if ((!CacheValue.Value && EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryRemote)) || - MissingReadFromUpstreamCount != 0) - { - UpstreamRequests.push_back(KeyIndex); - } - else if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) + // Searching upstream is not implemented in this legacy support function + if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}", Key.Bucket, @@ -1094,116 +1107,445 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req ++KeyIndex; } - if (!UpstreamRequests.empty()) + CbObjectWriter ResponseObject; + + ResponseObject.BeginArray("Result"sv); + for (const IoBuffer& Value : CacheValues) { - const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, &BatchPolicy](CacheRecordGetCompleteParams&& Params) { - ZEN_ASSERT(Params.KeyIndex < CacheValues.size()); + if (Value) + { + CbObjectView Record(Value.Data()); + ResponseObject << Record; + } + else + { + ResponseObject.AddNull(); + } + } + ResponseObject.EndArray(); - IoBuffer CacheValue; - AttachmentCount Count; + RpcResponse.SetObject(ResponseObject.Save()); - if (Params.Record) + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} +#endif + +void +HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) +{ +#if BACKWARDS_COMPATABILITY_JAN2022 + // Backwards compatability; + if (RpcRequest["Params"sv].AsObjectView()["CacheKeys"sv]) + { + return HandleRpcGetCacheRecordsLegacy(HttpRequest, RpcRequest); + } +#endif + ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); + + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); + + struct ValueRequestData + { + Oid ValueId; + IoHash ContentId; + CompressedBuffer Payload; + CachePolicy DownstreamPolicy; + bool Exists = false; + bool ReadFromUpstream = false; + }; + struct RecordRequestData + { + CacheKeyRequest Upstream; + CbObjectView RecordObject; + IoBuffer RecordCacheValue; + CacheRecordPolicy DownstreamPolicy; + std::vector<ValueRequestData> Values; + bool Complete = false; + bool UsedUpstream = false; + }; + + std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::vector<RecordRequestData> Requests; + std::vector<size_t> UpstreamIndexes; + CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); + Requests.reserve(RequestsArray.Num()); + + auto ParseValues = [](RecordRequestData& Request) { + CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView(); + Request.Values.reserve(ValuesArray.Num()); + for (CbFieldView ValueField : ValuesArray) + { + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ValueId = ValueObject["Id"sv].AsObjectId(); + CbFieldView RawHashField = ValueObject["RawHash"sv]; + IoHash RawHash = RawHashField.AsBinaryAttachment(); + if (ValueId && !RawHashField.HasError()) + { + Request.Values.push_back({ValueId, RawHash}); + Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId); + } + } + }; + + for (CbFieldView RequestField : RequestsArray) + { + RecordRequestData& Request = Requests.emplace_back(); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + CbFieldView BucketField = KeyObject["Bucket"sv]; + CbFieldView HashField = KeyObject["Hash"sv]; + CacheKey& Key = Request.Upstream.Key; + Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + if (HashField.HasError() || Key.Bucket.empty()) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } + Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); + const CacheRecordPolicy& Policy = Request.DownstreamPolicy; + + ZenCacheValue CacheValue; + bool NeedUpstreamAttachment = false; + bool FoundLocalInvalid = false; + ZenCacheValue RecordCacheValue; + + if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, RecordCacheValue)) + { + Request.RecordCacheValue = std::move(RecordCacheValue.Value); + if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject) { - Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count, &BatchPolicy](CbFieldView HashView) { - CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); - bool FoundInUpstream = false; - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + FoundLocalInvalid = true; + } + else + { + Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); + ParseValues(Request); + + Request.Complete = true; + for (ValueRequestData& Value : Request.Values) + { + CachePolicy ValuePolicy = Value.DownstreamPolicy; + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) { - if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash())) + // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we + // didn't ask for it and thus the record is complete in its absence. + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { - FoundInUpstream = true; - if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) - { - FoundInUpstream = true; - if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) - { - auto InsertResult = m_CidStore.AddChunk(Compressed); - if (InsertResult.New) - { - Count.New++; - } - } - Count.Valid++; - - if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) - { - RpcResponse.AddAttachment(CbAttachment(Compressed)); - } - } - else + Value.Exists = true; + } + else + { + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; + Request.Complete = false; + } + } + else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) + { + if (m_CidStore.ContainsChunk(Value.ContentId)) + { + Value.Exists = true; + } + else + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { - ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", - HashView.AsHash(), - Params.Key.Bucket, - Params.Key.Hash); - Count.Invalid++; + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; } + Request.Complete = false; } } - if (!FoundInUpstream && EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal) && - m_CidStore.ContainsChunk(HashView.AsHash())) + else { - // We added the attachment for this Value in the local loop before calling m_UpstreamCache - Count.Valid++; + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) + { + ZEN_ASSERT(Chunk.GetSize() > 0); + Value.Payload = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); + Value.Exists = true; + } + else + { + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; + } + Request.Complete = false; + } } - Count.Total++; - }); + } + } + } + if (!Request.Complete) + { + bool NeedUpstreamRecord = + !Request.RecordObject && !FoundLocalInvalid && EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote); + if (NeedUpstreamRecord || NeedUpstreamAttachment) + { + UpstreamIndexes.push_back(Requests.size() - 1); + } + } + } + if (Requests.empty()) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } + + if (!UpstreamIndexes.empty()) + { + std::vector<CacheKeyRequest*> UpstreamRequests; + UpstreamRequests.reserve(UpstreamIndexes.size()); + for (size_t Index : UpstreamIndexes) + { + RecordRequestData& Request = Requests[Index]; + UpstreamRequests.push_back(&Request.Upstream); - if ((Count.Valid == Count.Total) || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)) + if (Request.Values.size()) + { + // We will be returning the local object and know all the value Ids that exist in it + // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have. + CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta; + CacheRecordPolicyBuilder Builder(UpstreamBasePolicy); + for (ValueRequestData& Value : Request.Values) { - CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer(); + CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy); + UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None; + Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy); } + Request.Upstream.Policy = Builder.Build(); + } + else + { + // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants, + // and convert the CacheRecordPolicy to an upstream policy + Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream(); } + } - if (CacheValue) + const auto OnCacheRecordGetComplete = [this, &ParseValues](CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) { - ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)", - Params.Key.Bucket, - Params.Key.Hash, - NiceBytes(CacheValue.GetSize()), - ToString(HttpContentType::kCbPackage), - Count.New, - Count.Valid, - Count.Total); - - CacheValue.SetContentType(ZenContentType::kCbObject); - CacheValues[Params.KeyIndex] = CacheValue; - if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) + return; + } + + RecordRequestData& Request = + *reinterpret_cast<RecordRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(RecordRequestData, Upstream)); + const CacheKey& Key = Request.Upstream.Key; + if (!Request.RecordObject) + { + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject); + Request.RecordObject = ObjectBuffer; + if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) { - m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue}); + m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); } - - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount++; + ParseValues(Request); + Request.UsedUpstream = true; } - else + + Request.Complete = true; + for (ValueRequestData& Value : Request.Values) { - const bool IsPartial = Count.Valid != Count.Total; - ZEN_DEBUG("MISS - '{}/{}' {}", Params.Key.Bucket, Params.Key.Hash, IsPartial ? "(partial)"sv : ""sv); - m_CacheStats.MissCount++; + if (Value.Exists) + { + continue; + } + CachePolicy ValuePolicy = Value.DownstreamPolicy; + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + Request.Complete = false; + continue; + } + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) + { + if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId)) + { + if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) + { + Request.UsedUpstream = true; + Value.Exists = true; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) + { + m_CidStore.AddChunk(Compressed); + } + if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) + { + Value.Payload = Compressed; + } + } + else + { + ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", Value.ContentId, Key.Bucket, Key.Hash); + } + } + if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) + { + Request.Complete = false; + } + // Request.Complete does not need to be set to false for upstream SkipData attachments. + // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment + // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of + // any missing SkipData attachments. + } } }; - m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, BatchPolicy, std::move(OnCacheRecordGetComplete)); + m_UpstreamCache.GetCacheRecords(UpstreamRequests, std::move(OnCacheRecordGetComplete)); } + CbPackage ResponsePackage; CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); - for (const IoBuffer& Value : CacheValues) + for (RecordRequestData& Request : Requests) { - if (Value) + const CacheKey& Key = Request.Upstream.Key; + if (Request.Complete || + (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { - CbObjectView Record(Value.Data()); - ResponseObject << Record; + ResponseObject << Request.RecordObject; + for (ValueRequestData& Value : Request.Values) + { + if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload) + { + ResponsePackage.AddAttachment(CbAttachment(Value.Payload)); + } + } + + ZEN_DEBUG("HIT - '{}/{}' {}{}{}", + Key.Bucket, + Key.Hash, + NiceBytes(Request.RecordCacheValue.Size()), + Request.Complete ? ""sv : " (PARTIAL)"sv, + Request.UsedUpstream ? " (UPSTREAM)"sv : ""sv); + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount += Request.UsedUpstream ? 1 : 0; } else { ResponseObject.AddNull(); + + if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query)) + { + // If they requested no query, do not record this as a miss + ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); + } + else + { + ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv); + m_CacheStats.MissCount++; + } } } ResponseObject.EndArray(); + ResponsePackage.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + ResponsePackage.Save(MemStream); + + HttpRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} +void +HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) +{ + ZEN_TRACE_CPU("Z$::RpcPutCacheValues"); + CbObjectView BatchObject = BatchRequest.GetObject(); + + CbObjectView Params = BatchObject["Params"sv].AsObjectView(); + + ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv); + + std::string_view PolicyText = Params["DefaultPolicy"].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + std::vector<bool> Results; + for (CbFieldView RequestField : Params["Requests"sv]) + { + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); + CbFieldView BucketField = KeyView["Bucket"sv]; + CbFieldView HashField = KeyView["Hash"sv]; + CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + PolicyText = RequestObject["Policy"sv].AsString(); + CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; + IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment(); + bool Succeeded = false; + uint64_t TransferredSize = 0; + + if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) + { + // TODO: Implement upstream puts of CacheValues with StoreLocal == false. + // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream. + Policy |= CachePolicy::StoreLocal; + } + + if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) + { + IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); + Value.SetContentType(ZenContentType::kCompressedBinary); + m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Value}); + TransferredSize = Chunk.GetCompressedSize(); + } + Succeeded = true; + } + else + { + ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}' FAILED, value is not compressed", Key.Bucket, Key.Hash, RawHash); + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + } + else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) + { + ZenCacheValue ExistingValue; + if (m_CacheStore.Get(Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) + { + Succeeded = true; + } + } + // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. + // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream. + + if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) + { + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = Key}); + } + Results.push_back(Succeeded); + ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid"); + } + if (Results.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (bool Value : Results) + { + ResponseObject.AddBool(Value); + } + ResponseObject.EndArray(); + + CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; @@ -1215,216 +1557,610 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req } void -HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { - ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); - - ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); +#if BACKWARDS_COMPATABILITY_JAN2022 + if (RpcRequest["Params"sv].AsObjectView()["ChunkRequests"]) + { + return HandleRpcGetCacheChunks(HttpRequest, RpcRequest); + } +#endif - std::vector<CacheChunkRequest> ChunkRequests; - std::vector<size_t> UpstreamRequests; - std::vector<IoBuffer> Chunks; - CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); - for (CbFieldView RequestView : Params["ChunkRequests"sv]) + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + struct RequestData { - CbObjectView RequestObject = RequestView.AsObjectView(); - CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); - const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); - const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash(); - const Oid ValueId = RequestObject["ValueId"sv].AsObjectId(); - const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64(); - const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); - std::string_view PolicyText = RequestObject["Policy"sv].AsString(); - const CachePolicy ChunkPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; + CacheKey Key; + CachePolicy Policy; + CompressedBuffer Result; + }; + std::vector<RequestData> Requests; - // Note we could use emplace_back here but [Apple] LLVM-12's C++ library - // can't infer a constructor like other platforms (or can't handle an - // initializer list like others do). - ChunkRequests.push_back({Key, ChunkId, ValueId, RawOffset, RawSize, ChunkPolicy}); - } + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); - if (ChunkRequests.empty()) + for (CbFieldView RequestField : Params["Requests"sv]) { - return Request.WriteResponse(HttpResponseCode::BadRequest); - } + RequestData& Request = Requests.emplace_back(); + CbObjectView RequestObject = RequestField.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + CbFieldView BucketField = KeyObject["Bucket"sv]; + CbFieldView HashField = KeyObject["Hash"sv]; + Request.Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); + if (BucketField.HasError() || HashField.HasError() || Request.Key.Bucket.empty()) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } + PolicyText = RequestObject["Policy"sv].AsString(); + Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; - Chunks.resize(ChunkRequests.size()); + CacheKey& Key = Request.Key; + CachePolicy Policy = Request.Policy; + CompressedBuffer& Result = Request.Result; - // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) - // is missing, load the cache record and try to find the raw hash from the ValueId. - { - const auto GetChunkIdFromValueId = [](CbObjectView Record, const Oid& ValueId) -> IoHash { - if (ValueId) + ZenCacheValue CacheValue; + std::string_view Source; + if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) + { + if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { - // A valid ValueId indicates that the caller is searching for a Value in a Record - // that was Put with ICacheStore::Put - for (CbFieldView ValueView : Record["Values"sv]) + Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); + if (Result) { - CbObjectView ValueObject = ValueView.AsObjectView(); - const Oid Id = ValueObject["Id"sv].AsObjectId(); - - if (Id == ValueId) - { - return ValueObject["RawHash"sv].AsHash(); - } + Source = "LOCAL"sv; } - - // Legacy fields from previous version of CacheRecord serialization: - if (CbObjectView ValueObject = Record["Value"sv].AsObjectView()) + } + } + if (!Result && EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) + { + GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary); + if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType())) + { + Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)); + if (Result) { - const Oid Id = ValueObject["Id"sv].AsObjectId(); - if (Id == ValueId) + UpstreamResult.Value.SetContentType(ZenContentType::kCompressedBinary); + Source = "UPSTREAM"sv; + // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement + // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive + // for us to keep the data only on the upstream server. + // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { - return ValueObject["RawHash"sv].AsHash(); + m_CacheStore.Put(Key.Bucket, Key.Hash, ZenCacheValue{UpstreamResult.Value}); } } + } + } - for (CbFieldView AttachmentView : Record["Attachments"sv]) - { - CbObjectView AttachmentObject = AttachmentView.AsObjectView(); - const Oid Id = AttachmentObject["Id"sv].AsObjectId(); + if (Result) + { + ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({})", Key.Bucket, Key.Hash, NiceBytes(Result.GetCompressed().GetSize()), Source); + m_CacheStats.HitCount++; + } + else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) + { + // If they requested no query, do not record this as a miss + ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); + } + else + { + ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}'", Key.Bucket, Key.Hash); + m_CacheStats.MissCount++; + } + } + if (Requests.empty()) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } - if (Id == ValueId) - { - return AttachmentObject["RawHash"sv].AsHash(); - } - } - return IoHash::Zero; - } - else + CbPackage RpcResponse; + CbObjectWriter ResponseObject; + ResponseObject.BeginArray("Result"sv); + for (const RequestData& Request : Requests) + { + ResponseObject.BeginObject(); + { + const CompressedBuffer& Result = Request.Result; + if (Result) { - // An invalid ValueId indicates that the caller is requesting a Value that - // was Put with ICacheStore::PutValue - return Record["RawHash"sv].AsHash(); + ResponseObject.AddHash("RawHash"sv, IoHash::FromBLAKE3(Result.GetRawHash())); + if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData)) + { + RpcResponse.AddAttachment(CbAttachment(Result)); + } + else + { + ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize()); + } } - }; + } + ResponseObject.EndObject(); + } + ResponseObject.EndArray(); - CacheKey CurrentKey = CacheKey::Empty; - IoBuffer CurrentRecordBuffer; + RpcResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + HttpRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +namespace GetCacheChunks::detail { + + struct ValueData + { + Oid ValueId; + IoHash ContentId; + uint64_t RawSize; + }; + struct KeyRequestData + { + CacheKeyRequest Upstream; + IoBuffer CacheValue; + std::vector<ValueData> Values; + CachePolicy DownstreamRecordPolicy; + CachePolicy DownstreamPolicy; + std::string_view Source; + bool Exists = false; + bool HasRequest = false; + bool HasRecordRequest = false; + bool HasValueRequest = false; + bool ValuesRead = false; + }; + struct ChunkRequestData + { + CacheChunkRequest Upstream; + KeyRequestData* KeyRequest; + size_t KeyRequestIndex; + CachePolicy DownstreamPolicy; + CompressedBuffer Value; + std::string_view Source; + uint64_t TotalSize = 0; + bool Exists = false; + bool IsRecordRequest = false; + bool TotalSizeKnown = false; + }; + +} // namespace GetCacheChunks::detail + +void +HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) +{ + using namespace GetCacheChunks::detail; + + ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); + + std::vector<KeyRequestData> KeyRequests; + std::vector<ChunkRequestData> Chunks; + BACKWARDS_COMPATABILITY_JAN2022_CODE(bool SendValueOnly = false;) + if (!TryGetCacheChunks_Parse(KeyRequests, Chunks BACKWARDS_COMPATABILITY_JAN2022_CODE(, SendValueOnly), RpcRequest)) + { + return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); + } + GetCacheChunks_LoadKeys(KeyRequests); + GetCacheChunks_LoadChunks(Chunks); + GetCacheChunks_SendResults(Chunks, HttpRequest BACKWARDS_COMPATABILITY_JAN2022_CODE(, SendValueOnly)); +} + +bool +HttpStructuredCacheService::TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests, + std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, + BACKWARDS_COMPATABILITY_JAN2022_CODE(bool& SendValueOnly, ) CbObjectView RpcRequest) +{ + using namespace GetCacheChunks::detail; + +#if BACKWARDS_COMPATABILITY_JAN2022 + SendValueOnly = RpcRequest["MethodVersion"sv].AsInt32() < 1; +#else + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); +#endif + + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); + CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; + + KeyRequestData* PreviousKeyRequest = nullptr; + CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); + Chunks.reserve(ChunkRequestsArray.Num()); + for (CbFieldView RequestView : ChunkRequestsArray) + { + ChunkRequestData& Chunk = Chunks.emplace_back(); + CbObjectView RequestObject = RequestView.AsObjectView(); + + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + CbFieldView HashField = KeyObject["Hash"sv]; + Chunk.Upstream.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash()); + if (Chunk.Upstream.Key.Bucket.empty() || HashField.HasError()) + { + ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest."); + return false; + } - for (CacheChunkRequest& ChunkRequest : ChunkRequests) + KeyRequestData* KeyRequest = nullptr; + if (!PreviousKeyRequest || PreviousKeyRequest->Upstream.Key < Chunk.Upstream.Key) { - if (ChunkRequest.ChunkId != IoHash::Zero) + KeyRequest = &KeyRequests.emplace_back(); + KeyRequest->Upstream.Key = Chunk.Upstream.Key; + PreviousKeyRequest = KeyRequest; + } + else if (!(Chunk.Upstream.Key < PreviousKeyRequest->Upstream.Key)) + { + KeyRequest = PreviousKeyRequest; + } + else + { + ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", + Chunk.Upstream.Key.Bucket, + Chunk.Upstream.Key.Hash, + PreviousKeyRequest->Upstream.Key.Bucket, + PreviousKeyRequest->Upstream.Key.Hash); + return false; + } + Chunk.KeyRequestIndex = std::distance(KeyRequests.data(), KeyRequest); + + Chunk.Upstream.ChunkId = RequestObject["ChunkId"sv].AsHash(); + Chunk.Upstream.ValueId = RequestObject["ValueId"sv].AsObjectId(); + Chunk.Upstream.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); + Chunk.Upstream.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); + std::string_view PolicyText = RequestObject["Policy"sv].AsString(); + Chunk.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; +#if BACKWARDS_COMPATABILITY_JAN2022 + if (SendValueOnly) + { + Chunk.DownstreamPolicy = Chunk.DownstreamPolicy & (~CachePolicy::SkipData); + } +#endif + Chunk.IsRecordRequest = (bool)Chunk.Upstream.ValueId; + + if (!Chunk.IsRecordRequest || Chunk.Upstream.ChunkId == IoHash::Zero) + { + KeyRequest->DownstreamPolicy = + KeyRequest->HasRequest ? Union(KeyRequest->DownstreamPolicy, Chunk.DownstreamPolicy) : Chunk.DownstreamPolicy; + KeyRequest->HasRequest = true; + (Chunk.IsRecordRequest ? KeyRequest->HasRecordRequest : KeyRequest->HasValueRequest) = true; + } + } + if (Chunks.empty()) + { + return false; + } + for (ChunkRequestData& Chunk : Chunks) + { + Chunk.KeyRequest = &KeyRequests[Chunk.KeyRequestIndex]; + } + return true; +} + +void +HttpStructuredCacheService::GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests) +{ + using namespace GetCacheChunks::detail; + + std::vector<CacheKeyRequest*> UpstreamRecordRequests; + std::vector<KeyRequestData*> UpstreamValueRequests; + for (KeyRequestData& KeyRequest : KeyRequests) + { + if (KeyRequest.HasRequest) + { + if (KeyRequest.HasRecordRequest) { - continue; + KeyRequest.DownstreamRecordPolicy = KeyRequest.DownstreamPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta; } - if (ChunkRequest.Key != CurrentKey) + if (!KeyRequest.Exists && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryLocal)) { - CurrentKey = ChunkRequest.Key; - + // There's currently no interface for checking only whether a CacheValue exists without loading it, + // so we load it here even if SkipData is true and its a CacheValue request. ZenCacheValue CacheValue; - if (m_CacheStore.Get(CurrentKey.Bucket, CurrentKey.Hash, CacheValue)) + if (m_CacheStore.Get(KeyRequest.Upstream.Key.Bucket, KeyRequest.Upstream.Key.Hash, CacheValue)) { - CurrentRecordBuffer = CacheValue.Value; + KeyRequest.Exists = true; + KeyRequest.CacheValue = std::move(CacheValue.Value); + KeyRequest.Source = "LOCAL"sv; } } - - if (CurrentRecordBuffer) + if (!KeyRequest.Exists) { - ChunkRequest.ChunkId = GetChunkIdFromValueId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.ValueId); + // At most one of RecordRequest or ValueRequest will succeed for the upstream request of the key a given key, but we don't + // know which, + // and if the requests (from arbitrary Unreal Class code) includes both types of request for a key, we want to ask for both + // kinds and pass the request that uses the one that succeeds. + if (KeyRequest.HasRecordRequest && EnumHasAllFlags(KeyRequest.DownstreamRecordPolicy, CachePolicy::QueryRemote)) + { + KeyRequest.Upstream.Policy = CacheRecordPolicy(ConvertToUpstream(KeyRequest.DownstreamRecordPolicy)); + UpstreamRecordRequests.push_back(&KeyRequest.Upstream); + } + if (KeyRequest.HasValueRequest && EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::QueryRemote)) + { + UpstreamValueRequests.push_back(&KeyRequest); + } } } } - for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests) + if (!UpstreamRecordRequests.empty()) { - const bool QueryLocal = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryLocal); - const bool QueryRemote = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryRemote); + const auto OnCacheRecordGetComplete = [this](CacheRecordGetCompleteParams&& Params) { + if (!Params.Record) + { + return; + } - if (QueryLocal) + KeyRequestData& KeyRequest = + *reinterpret_cast<KeyRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(KeyRequestData, Upstream)); + const CacheKey& Key = KeyRequest.Upstream.Key; + KeyRequest.Exists = true; + CbObject ObjectBuffer = CbObject::Clone(Params.Record); + KeyRequest.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); + KeyRequest.CacheValue.SetContentType(ZenContentType::kCbObject); + KeyRequest.Source = "UPSTREAM"sv; + + if (EnumHasAllFlags(KeyRequest.DownstreamPolicy, CachePolicy::StoreLocal)) + { + m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue}); + } + }; + m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); + } + + if (!UpstreamValueRequests.empty()) + { + for (KeyRequestData* KeyRequestPtr : UpstreamValueRequests) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId)) + KeyRequestData& KeyRequest = *KeyRequestPtr; + CacheKey& Key = KeyRequest.Upstream.Key; + GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kBinary); + if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType())) { - ZEN_ASSERT(Chunk.GetSize() > 0); - - ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", - ChunkRequest.Key.Bucket, - ChunkRequest.Key.Hash, - ChunkRequest.ChunkId, - NiceBytes(Chunk.Size()), - ToString(Chunk.GetContentType()), - "LOCAL"); - - Chunks[RequestIndex] = Chunk; - m_CacheStats.HitCount++; + CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)); + if (Result) + { + KeyRequest.CacheValue = std::move(UpstreamResult.Value); + KeyRequest.CacheValue.SetContentType(ZenContentType::kCompressedBinary); + KeyRequest.Exists = true; + KeyRequest.Source = "UPSTREAM"sv; + // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement + // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive + // for us to keep the data only on the upstream server. + // if (EnumHasAllFlags(KeyRequest->DownstreamValuePolicy, CachePolicy::StoreLocal)) + { + m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = KeyRequest.CacheValue}); + } + } } - else if (QueryRemote) + } + } +} + +void +HttpStructuredCacheService::GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks) +{ + using namespace GetCacheChunks::detail; + + std::vector<CacheChunkRequest*> UpstreamPayloadRequests; + for (ChunkRequestData& Chunk : Chunks) + { + if (Chunk.IsRecordRequest) + { + if (Chunk.Upstream.ChunkId == IoHash::Zero) { - UpstreamRequests.push_back(RequestIndex); + // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) + // is missing, parse the cache record and try to find the raw hash from the ValueId. + KeyRequestData& KeyRequest = *Chunk.KeyRequest; + if (!KeyRequest.ValuesRead) + { + KeyRequest.ValuesRead = true; + if (KeyRequest.CacheValue && KeyRequest.CacheValue.GetContentType() == ZenContentType::kCbObject) + { + CbObjectView RecordObject = CbObjectView(KeyRequest.CacheValue.GetData()); + CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); + KeyRequest.Values.reserve(ValuesArray.Num()); + for (CbFieldView ValueField : ValuesArray) + { + CbObjectView ValueObject = ValueField.AsObjectView(); + Oid ValueId = ValueObject["Id"sv].AsObjectId(); + CbFieldView RawHashField = ValueObject["RawHash"sv]; + IoHash RawHash = RawHashField.AsBinaryAttachment(); + if (ValueId && !RawHashField.HasError()) + { + KeyRequest.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); + } + } + } + } + + for (const ValueData& Value : KeyRequest.Values) + { + if (Value.ValueId == Chunk.Upstream.ValueId) + { + Chunk.Upstream.ChunkId = Value.ContentId; + Chunk.TotalSize = Value.RawSize; + Chunk.TotalSizeKnown = true; + break; + } + } } - else + + // Now load the ContentId from the local ContentIdStore or from the upstream + if (Chunk.Upstream.ChunkId != IoHash::Zero) { - ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); - m_CacheStats.MissCount++; + if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryLocal)) + { + if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData) && Chunk.TotalSizeKnown) + { + if (m_CidStore.ContainsChunk(Chunk.Upstream.ChunkId)) + { + Chunk.Exists = true; + Chunk.Source = "LOCAL"sv; + } + } + else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Upstream.ChunkId)) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + if (Compressed) + { + if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + { + Chunk.Value = Compressed; + } + Chunk.Exists = true; + Chunk.TotalSize = Compressed.GetRawSize(); + Chunk.TotalSizeKnown = true; + Chunk.Source = "LOCAL"sv; + } + } + } + if (!Chunk.Exists && EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::QueryRemote)) + { + Chunk.Upstream.Policy = ConvertToUpstream(Chunk.DownstreamPolicy); + UpstreamPayloadRequests.push_back(&Chunk.Upstream); + } } } else { - ZEN_DEBUG("SKIP - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); + if (Chunk.KeyRequest->Exists) + { + if (Chunk.KeyRequest->CacheValue && IsCompressedBinary(Chunk.KeyRequest->CacheValue.GetContentType())) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk.KeyRequest->CacheValue)); + if (Compressed) + { + if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + { + Chunk.Value = Compressed; + } + Chunk.Exists = true; + Chunk.TotalSize = Compressed.GetRawSize(); + Chunk.TotalSizeKnown = true; + Chunk.Source = Chunk.KeyRequest->Source; + Chunk.Upstream.ChunkId = IoHash::FromBLAKE3(Compressed.GetRawHash()); + } + } + } } - - ++RequestIndex; } - if (!UpstreamRequests.empty()) + if (!UpstreamPayloadRequests.empty()) { - const auto OnCacheValueGetComplete = [this, &Chunks](CacheValueGetCompleteParams&& Params) { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value))) + const auto OnCacheValueGetComplete = [this](CacheValueGetCompleteParams&& Params) { + if (Params.RawHash == Params.RawHash.Zero) { - m_CidStore.AddChunk(Compressed); - - ZEN_DEBUG("HIT - '{}/{}/{}' {} ({})", - Params.Request.Key.Bucket, - Params.Request.Key.Hash, - Params.Request.ChunkId, - NiceBytes(Params.Value.GetSize()), - "UPSTREAM"); - - ZEN_ASSERT(Params.RequestIndex < Chunks.size()); - Chunks[Params.RequestIndex] = std::move(Params.Value); - - m_CacheStats.HitCount++; - m_CacheStats.UpstreamHitCount++; + return; } - else + + ChunkRequestData& Chunk = + *reinterpret_cast<ChunkRequestData*>(reinterpret_cast<char*>(&Params.Request) - offsetof(ChunkRequestData, Upstream)); + if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal) || + !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) { - ZEN_DEBUG("MISS - '{}/{}/{}'", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId); - m_CacheStats.MissCount++; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); + if (!Compressed || Compressed.GetRawSize() != Params.RawSize) + { + return; + } + + if (EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::StoreLocal)) + { + m_CidStore.AddChunk(Compressed); + } + if (!EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + { + Chunk.Value = std::move(Compressed); + } } + Chunk.Exists = true; + Chunk.TotalSize = Params.RawSize; + Chunk.TotalSizeKnown = true; + Chunk.Source = "UPSTREAM"sv; + + m_CacheStats.UpstreamHitCount++; }; - m_UpstreamCache.GetCacheValues(ChunkRequests, UpstreamRequests, std::move(OnCacheValueGetComplete)); + m_UpstreamCache.GetCacheValues(UpstreamPayloadRequests, std::move(OnCacheValueGetComplete)); } +} - CbPackage RpcResponse; - CbObjectWriter ResponseObject; +void +HttpStructuredCacheService::GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, + zen::HttpServerRequest& HttpRequest + BACKWARDS_COMPATABILITY_JAN2022_CODE(, bool SendValueOnly)) +{ + using namespace GetCacheChunks::detail; - ResponseObject.BeginArray("Result"sv); + CbPackage RpcResponse; + CbObjectWriter Writer; - for (size_t ChunkIndex = 0; ChunkIndex < Chunks.size(); ++ChunkIndex) + Writer.BeginArray("Result"sv); + for (ChunkRequestData& Chunk : Chunks) { - if (Chunks[ChunkIndex]) +#if BACKWARDS_COMPATABILITY_JAN2022 + if (SendValueOnly) { - ResponseObject << ChunkRequests[ChunkIndex].ChunkId; - RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex]))))); + if (Chunk.Value) + { + Writer << Chunk.Upstream.ChunkId; + RpcResponse.AddAttachment(CbAttachment(Chunk.Value)); + } + else + { + Writer << IoHash::Zero; + } } else +#endif { - ResponseObject << IoHash::Zero; + Writer.BeginObject(); + { + if (Chunk.Exists) + { + Writer.AddHash("RawHash"sv, Chunk.Upstream.ChunkId); + if (Chunk.Value && !EnumHasAllFlags(Chunk.DownstreamPolicy, CachePolicy::SkipData)) + { + RpcResponse.AddAttachment(CbAttachment(Chunk.Value)); + } + else + { + Writer.AddInteger("RawSize"sv, Chunk.TotalSize); + } + + ZEN_DEBUG("CHUNKHIT - '{}/{}/{}' {} '{}' ({})", + Chunk.Upstream.Key.Bucket, + Chunk.Upstream.Key.Hash, + Chunk.Upstream.ValueId, + NiceBytes(Chunk.TotalSize), + Chunk.IsRecordRequest ? "Record"sv : "Value"sv, + Chunk.Source); + m_CacheStats.HitCount++; + } + else if (!EnumHasAnyFlags(Chunk.DownstreamPolicy, CachePolicy::Query)) + { + ZEN_DEBUG("CHUNKSKIP - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId); + } + else + { + ZEN_DEBUG("MISS - '{}/{}/{}'", Chunk.Upstream.Key.Bucket, Chunk.Upstream.Key.Hash, Chunk.Upstream.ValueId); + m_CacheStats.MissCount++; + } + } + Writer.EndObject(); } } - ResponseObject.EndArray(); + Writer.EndArray(); - RpcResponse.SetObject(ResponseObject.Save()); + RpcResponse.SetObject(Writer.Save()); BinaryWriter MemStream; RpcResponse.Save(MemStream); - Request.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + HttpRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } void diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 88bf6cda1..14b001e48 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -9,6 +9,10 @@ #include "monitoring/httpstatus.h" #include <memory> +#include <vector> + +// Include the define for BACKWARDS_COMPATABILITY_JAN2022 +#include <zenutil/cache/cachepolicy.h> namespace spdlog { class logger; @@ -25,6 +29,11 @@ class UpstreamCache; class ZenCacheStore; enum class CachePolicy : uint32_t; +namespace GetCacheChunks::detail { + struct KeyRequestData; + struct ChunkRequestData; +} // namespace GetCacheChunks::detail + /** * Structured cache service. Imposes constraints on keys, supports blobs and * structured values @@ -99,12 +108,25 @@ private: void HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL); void HandleRpcRequest(zen::HttpServerRequest& Request); void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); - void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); - void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest); - void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); - virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; - virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; - PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); +#if BACKWARDS_COMPATABILITY_JAN2022 + void HandleRpcGetCacheRecordsLegacy(zen::HttpServerRequest& Request, CbObjectView BatchRequest); +#endif + void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest); + void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); + virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; + virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; + PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package); + + bool TryGetCacheChunks_Parse(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests, + std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, + BACKWARDS_COMPATABILITY_JAN2022_CODE(bool& SendValueOnly, ) CbObjectView RpcRequest); + void GetCacheChunks_LoadKeys(std::vector<GetCacheChunks::detail::KeyRequestData>& KeyRequests); + void GetCacheChunks_LoadChunks(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks); + void GetCacheChunks_SendResults(std::vector<GetCacheChunks::detail::ChunkRequestData>& Chunks, + zen::HttpServerRequest& HttpRequest BACKWARDS_COMPATABILITY_JAN2022_CODE(, bool SendValueOnly)); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; @@ -119,4 +141,12 @@ private: CacheStats m_CacheStats; }; +/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. + * We need this until the content type is preserved for kCompressedBinary when passing to and from upstream servers. */ +inline bool +IsCompressedBinary(ZenContentType Type) +{ + return Type == ZenContentType::kBinary || Type == ZenContentType::kCompressedBinary; +} + } // namespace zen diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 657cfb729..9d3ed2f94 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -18,6 +18,7 @@ #include <zenstore/cas.h> #include <zenstore/cidstore.h> +#include "cache/structuredcache.h" #include "cache/structuredcachestore.h" #include "diag/logging.h" @@ -215,21 +216,16 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, - std::span<size_t> KeyIndex, - const CacheRecordPolicy& Policy, - OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Horde::GetCacheRecords"); - ZEN_UNUSED(Policy); - CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; - for (size_t Index : KeyIndex) + for (CacheKeyRequest* Request : Requests) { - const CacheKey& CacheKey = CacheKeys[Index]; + const CacheKey& CacheKey = Request->Key; CbPackage Package; CbObject Record; @@ -264,7 +260,7 @@ namespace detail { } } - OnComplete({.Key = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); + OnComplete({.Request = *Request, .Record = Record, .Package = Package}); } return Result; @@ -301,20 +297,20 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCacheValueGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Horde::GetCacheValues"); CloudCacheSession Session(m_Client); GetUpstreamCacheResult Result; - for (size_t Index : RequestIndex) + for (CacheChunkRequest* RequestPtr : CacheChunkRequests) { - const CacheChunkRequest& Request = CacheChunkRequests[Index]; - IoBuffer Payload; + CacheChunkRequest& Request = *RequestPtr; + IoBuffer Payload; + CompressedBuffer Compressed; if (!Result.Error) { const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); @@ -323,9 +319,23 @@ namespace detail { AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); + if (Payload && IsCompressedBinary(Payload.GetContentType())) + { + Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); + } } - OnComplete({.Request = Request, .RequestIndex = Index, .Value = Payload}); + if (Compressed) + { + OnComplete({.Request = Request, + .RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()), + .RawSize = Compressed.GetRawSize(), + .Value = Payload}); + } + else + { + OnComplete({.Request = Request, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); + } } return Result; @@ -620,15 +630,10 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, - std::span<size_t> KeyIndex, - const CacheRecordPolicy& Policy, - OnCacheRecordGetComplete&& OnComplete) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override { ZEN_TRACE_CPU("Upstream::Zen::GetCacheRecords"); - - std::vector<size_t> IndexMap; - IndexMap.reserve(KeyIndex.size()); + ZEN_ASSERT(Requests.size() > 0); CbObjectWriter BatchRequest; BatchRequest << "Method"sv @@ -636,21 +641,30 @@ namespace detail { BatchRequest.BeginObject("Params"sv); { - BatchRequest.BeginArray("CacheKeys"sv); - for (size_t Index : KeyIndex) - { - const CacheKey& Key = CacheKeys[Index]; - IndexMap.push_back(Index); + CachePolicy DefaultPolicy = Requests[0]->Policy.GetRecordPolicy(); + BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy); + BatchRequest.BeginArray("Requests"sv); + for (CacheKeyRequest* Request : Requests) + { BatchRequest.BeginObject(); - BatchRequest << "Bucket"sv << Key.Bucket; - BatchRequest << "Hash"sv << Key.Hash; + { + const CacheKey& Key = Request->Key; + BatchRequest.BeginObject("Key"sv); + { + BatchRequest << "Bucket"sv << Key.Bucket; + BatchRequest << "Hash"sv << Key.Hash; + } + BatchRequest.EndObject(); + if (!Request->Policy.IsUniform() || Request->Policy.GetRecordPolicy() != DefaultPolicy) + { + BatchRequest.SetName("Policy"sv); + Request->Policy.Save(BatchRequest); + } + } BatchRequest.EndObject(); } BatchRequest.EndArray(); - - BatchRequest.SetName("Policy"sv); - Policy.Save(BatchRequest); } BatchRequest.EndObject(); @@ -668,19 +682,27 @@ namespace detail { { if (BatchResponse.TryLoad(Result.Response)) { - for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) + CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); + if (Results.Num() != Requests.size()) { - const size_t Index = IndexMap[LocalIndex++]; - OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + ZEN_WARN("Upstream::Zen::GetCacheRecords invalid number of Requests from Upstream."); } + else + { + for (size_t Index = 0; CbFieldView Record : Results) + { + CacheKeyRequest* Request = Requests[Index++]; + OnComplete({.Request = *Request, .Record = Record.AsObjectView(), .Package = BatchResponse}); + } - return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } } } - for (size_t Index : KeyIndex) + for (CacheKeyRequest* Request : Requests) { - OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; @@ -717,27 +739,28 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCacheValueGetComplete&& OnComplete) override final + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::Zen::GetCacheValues"); - - std::vector<size_t> IndexMap; - IndexMap.reserve(RequestIndex.size()); + ZEN_ASSERT(!CacheChunkRequests.empty()); CbObjectWriter BatchRequest; BatchRequest << "Method"sv - << "GetCacheValues"; + << "GetCacheChunks"; +#if BACKWARDS_COMPATABILITY_JAN2022 + BatchRequest.AddInteger("MethodVersion"sv, 1); +#endif BatchRequest.BeginObject("Params"sv); { + CachePolicy DefaultPolicy = CacheChunkRequests[0]->Policy; + BatchRequest << "DefaultPolicy"sv << WriteToString<128>(DefaultPolicy).ToView(); BatchRequest.BeginArray("ChunkRequests"sv); { - for (size_t Index : RequestIndex) + for (CacheChunkRequest* RequestPtr : CacheChunkRequests) { - const CacheChunkRequest& Request = CacheChunkRequests[Index]; - IndexMap.push_back(Index); + const CacheChunkRequest& Request = *RequestPtr; BatchRequest.BeginObject(); { @@ -745,11 +768,26 @@ namespace detail { BatchRequest << "Bucket"sv << Request.Key.Bucket; BatchRequest << "Hash"sv << Request.Key.Hash; BatchRequest.EndObject(); - BatchRequest.AddObjectId("ValueId"sv, Request.ValueId); - BatchRequest << "ChunkId"sv << Request.ChunkId; - BatchRequest << "RawOffset"sv << Request.RawOffset; - BatchRequest << "RawSize"sv << Request.RawSize; - BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); + if (Request.ValueId) + { + BatchRequest.AddObjectId("ValueId"sv, Request.ValueId); + } + if (Request.ChunkId != Request.ChunkId.Zero) + { + BatchRequest << "ChunkId"sv << Request.ChunkId; + } + if (Request.RawOffset != 0) + { + BatchRequest << "RawOffset"sv << Request.RawOffset; + } + if (Request.RawSize != UINT64_MAX) + { + BatchRequest << "RawSize"sv << Request.RawSize; + } + if (Request.Policy != DefaultPolicy) + { + BatchRequest << "Policy"sv << WriteToString<128>(Request.Policy).ToView(); + } } BatchRequest.EndObject(); } @@ -772,29 +810,56 @@ namespace detail { { if (BatchResponse.TryLoad(Result.Response)) { - for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) + CbArrayView Results = BatchResponse.GetObject()["Result"sv].AsArrayView(); + if (CacheChunkRequests.size() != Results.Num()) { - const size_t Index = IndexMap[LocalIndex++]; - IoBuffer Payload; - - if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) + ZEN_WARN("Upstream::Zen::GetCacheValues invalid number of Requests from Upstream."); + } + else + { + for (size_t RequestIndex = 0; CbFieldView ChunkField : Results) { - if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + CacheChunkRequest& Request = *CacheChunkRequests[RequestIndex++]; + CbObjectView ChunkObject = ChunkField.AsObjectView(); + IoHash RawHash = ChunkObject["RawHash"sv].AsHash(); + IoBuffer Payload; + uint64_t RawSize = 0; + if (RawHash != IoHash::Zero) { - Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + bool Success = false; + const CbAttachment* Attachment = BatchResponse.FindAttachment(RawHash); + if (Attachment) + { + if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + { + Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCompressedBinary); + RawSize = Compressed.GetRawSize(); + Success = true; + } + } + if (!Success) + { + CbFieldView RawSizeField = ChunkObject["RawSize"sv]; + RawSize = RawSizeField.AsUInt64(); + Success = !RawSizeField.HasError(); + } + if (!Success) + { + RawHash = IoHash::Zero; + } } + OnComplete({.Request = Request, .RawHash = RawHash, .RawSize = RawSize, .Value = std::move(Payload)}); } - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = std::move(Payload)}); + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } - - return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } - for (size_t Index : RequestIndex) + for (CacheChunkRequest* RequestPtr : CacheChunkRequests) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); + OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; @@ -1045,21 +1110,16 @@ public: return {}; } - virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, - std::span<size_t> KeyIndex, - const CacheRecordPolicy& DownstreamPolicy, - OnCacheRecordGetComplete&& OnComplete) override final + virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - std::vector<size_t> RemainingKeys(KeyIndex.begin(), KeyIndex.end()); + std::vector<CacheKeyRequest*> RemainingKeys(Requests.begin(), Requests.end()); if (m_Options.ReadUpstream) { - CacheRecordPolicy UpstreamPolicy = DownstreamPolicy.ConvertToUpstream(); - for (auto& Endpoint : m_Endpoints) { if (RemainingKeys.empty()) @@ -1072,25 +1132,24 @@ public: continue; } - UpstreamEndpointStats& Stats = Endpoint->Stats(); - std::vector<size_t> Missing; - GetUpstreamCacheResult Result; + UpstreamEndpointStats& Stats = Endpoint->Stats(); + std::vector<CacheKeyRequest*> Missing; + GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - Result = - Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, UpstreamPolicy, [&](CacheRecordGetCompleteParams&& Params) { - if (Params.Record) - { - OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); + Result = Endpoint->GetCacheRecords(RemainingKeys, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); - Stats.CacheHitCount.Increment(1); - } - else - { - Missing.push_back(Params.KeyIndex); - } - }); + Stats.CacheHitCount.Increment(1); + } + else + { + Missing.push_back(&Params.Request); + } + }); } Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); @@ -1110,21 +1169,19 @@ public: } } - for (size_t Index : RemainingKeys) + for (CacheKeyRequest* Request : RemainingKeys) { - OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + OnComplete({.Request = *Request, .Record = CbObjectView(), .Package = CbPackage()}); } } - virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCacheValueGetComplete&& OnComplete) override final + virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) override final { ZEN_TRACE_CPU("Upstream::GetCacheValues"); std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); - std::vector<size_t> RemainingKeys(RequestIndex.begin(), RequestIndex.end()); + std::vector<CacheChunkRequest*> RemainingKeys(CacheChunkRequests.begin(), CacheChunkRequests.end()); if (m_Options.ReadUpstream) { @@ -1140,14 +1197,14 @@ public: continue; } - UpstreamEndpointStats& Stats = Endpoint->Stats(); - std::vector<size_t> Missing; - GetUpstreamCacheResult Result; + UpstreamEndpointStats& Stats = Endpoint->Stats(); + std::vector<CacheChunkRequest*> Missing; + GetUpstreamCacheResult Result; { metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); - Result = Endpoint->GetCacheValues(CacheChunkRequests, RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { - if (Params.Value) + Result = Endpoint->GetCacheValues(RemainingKeys, [&](CacheValueGetCompleteParams&& Params) { + if (Params.RawHash != Params.RawHash.Zero) { OnComplete(std::forward<CacheValueGetCompleteParams>(Params)); @@ -1155,7 +1212,7 @@ public: } else { - Missing.push_back(Params.RequestIndex); + Missing.push_back(&Params.Request); } }); } @@ -1177,9 +1234,9 @@ public: } } - for (size_t Index : RemainingKeys) + for (CacheChunkRequest* RequestPtr : CacheChunkRequests) { - OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Value = IoBuffer()}); + OnComplete({.Request = *RequestPtr, .RawHash = IoHash::Zero, .RawSize = 0, .Value = IoBuffer()}); } } diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 2087b1fba..994129fc4 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -2,6 +2,8 @@ #pragma once +#include <zencore/compactbinary.h> +#include <zencore/compress.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/stats.h> @@ -12,10 +14,10 @@ #include <chrono> #include <functional> #include <memory> +#include <vector> namespace zen { -class CbObjectView; class CbPackage; class CbObjectWriter; class CidStore; @@ -65,8 +67,7 @@ struct PutUpstreamCacheResult struct CacheRecordGetCompleteParams { - const CacheKey& Key; - size_t KeyIndex = ~size_t(0); + CacheKeyRequest& Request; const CbObjectView& Record; const CbPackage& Package; }; @@ -75,9 +76,10 @@ using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams struct CacheValueGetCompleteParams { - const CacheChunkRequest& Request; - size_t RequestIndex{~size_t(0)}; - IoBuffer Value; + CacheChunkRequest& Request; + IoHash RawHash; + uint64_t RawSize; + IoBuffer Value; }; using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>; @@ -151,16 +153,12 @@ public: virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; - virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, - std::span<size_t> KeyIndex, - const CacheRecordPolicy& Policy, - OnCacheRecordGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) = 0; virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; - virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCacheValueGetComplete&& OnComplete) = 0; + virtual GetUpstreamCacheResult GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheValueGetComplete&& OnComplete) = 0; virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, @@ -185,16 +183,11 @@ public: virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; - virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, - std::span<size_t> KeyIndex, - const CacheRecordPolicy& RecordPolicy, - OnCacheRecordGetComplete&& OnComplete) = 0; + virtual void GetCacheRecords(std::span<CacheKeyRequest*> Requests, OnCacheRecordGetComplete&& OnComplete) = 0; virtual GetUpstreamCacheResult GetCacheValue(const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; - virtual void GetCacheValues(std::span<CacheChunkRequest> CacheChunkRequests, - std::span<size_t> RequestIndex, - OnCacheValueGetComplete&& OnComplete) = 0; + virtual void GetCacheValues(std::span<CacheChunkRequest*> CacheChunkRequests, OnCacheValueGetComplete&& OnComplete) = 0; virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; |