// Copyright Epic Games, Inc. All Rights Reserved. #include "structuredcache.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //#include "cachekey.h" #include "monitoring/httpstats.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" #include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" #include #include #include #include #include #include namespace zen { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// CachePolicy ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) { std::string_view PolicyText = QueryParams.GetValue("Policy"sv); return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; } struct AttachmentCount { uint32_t New = 0; uint32_t Valid = 0; uint32_t Invalid = 0; uint32_t Total = 0; }; ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) , m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); } HttpStructuredCacheService::~HttpStructuredCacheService() { ZEN_INFO("closing structured cache"); m_StatsService.UnregisterHandler("z$", *this); m_StatusService.UnregisterHandler("z$", *this); } const char* HttpStructuredCacheService::BaseUri() const { return "/z$/"; } void HttpStructuredCacheService::Flush() { } void HttpStructuredCacheService::Scrub(ScrubContext& Ctx) { if (m_LastScrubTime == Ctx.ScrubTimestamp()) { return; } m_LastScrubTime = Ctx.ScrubTimestamp(); m_CidStore.Scrub(Ctx); m_CacheStore.Scrub(Ctx); } void HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { CacheRef Ref; metrics::OperationTiming::Scope $(m_HttpRequests); if (!ValidateKeyUri(Request, /* out */ Ref)) { std::string_view Key = Request.RelativeUri(); if (Key == "$rpc") { return HandleRpcRequest(Request); } if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference return HandleCacheBucketRequest(Request, Key); } return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } CachePolicy PolicyFromURL = ParseCachePolicy(Request.GetQueryParams()); if (Ref.PayloadId == IoHash::Zero) { return HandleCacheRecordRequest(Request, Ref, PolicyFromURL); } else { return HandleCachePayloadRequest(Request, Ref, PolicyFromURL); } return; } void HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Bucket) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { // Query stats } break; case HttpVerb::kDelete: // Drop bucket if (m_CacheStore.DropBucket(Bucket)) { return Request.WriteResponse(HttpResponseCode::OK); } else { return Request.WriteResponse(HttpResponseCode::NotFound); } break; default: break; } } void HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { HandleGetCacheRecord(Request, Ref, PolicyFromURL); } break; case HttpVerb::kPut: HandlePutCacheRecord(Request, Ref, PolicyFromURL); break; default: break; } } void HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { const ZenContentType AcceptType = Request.AcceptContentType(); const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); bool Success = false; ZenCacheValue ClientResultValue; if (!EnumHasAnyFlags(PolicyFromURL, CachePolicy::Query)) { return Request.WriteResponse(HttpResponseCode::OK); } if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { Success = true; if (AcceptType == ZenContentType::kCbPackage) { CbPackage Package; uint32_t MissingCount = 0; CbObjectView CacheRecord(ClientResultValue.Value.Data()); CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { if (SkipData) { MissingCount += m_CidStore.ContainsChunk(AttachmentHash.AsHash()) ? 0 : 1; } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); } else { MissingCount++; } } }); Success = MissingCount == 0 || PartialRecord; if (Success) { Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); BinaryWriter MemStream; Package.Save(MemStream); ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); } } } if (Success) { ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; if (SkipData && AcceptType == ZenContentType::kBinary) { return Request.WriteResponse(HttpResponseCode::OK); } else { // Other types handled SkipData when constructing the ClientResultValue return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } else if (!EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote)) { ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } // Issue upstream query asynchronously in order to keep requests flowing without // hogging I/O servicing threads with blocking work Request.WriteResponseAsync([this, AcceptType, PolicyFromURL, Ref](HttpServerRequest& AsyncRequest) { bool Success = false; const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); const bool QueryLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal); const bool StoreLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreLocal); const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); ZenCacheValue ClientResultValue; metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Success) { Success = true; ClientResultValue.Value = UpstreamResult.Value; ClientResultValue.Value.SetContentType(AcceptType); if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) { if (AcceptType == ZenContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); if (ValidationResult != CbValidateError::None) { Success = false; ZEN_WARN("Get - '{}/{}' '{}' FAILED, invalid compact binary object from upstream", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data } if (Success && StoreLocal) { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, ClientResultValue); } } else if (AcceptType == ZenContentType::kCbPackage) { CbPackage Package; if (Package.TryLoad(ClientResultValue.Value)) { CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) { if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash())) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { if (StoreLocal) { auto InsertResult = m_CidStore.AddChunk(Compressed); if (InsertResult.New) { Count.New++; } } Count.Valid++; } else { ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", HashView.AsHash(), Ref.BucketSegment, Ref.HashKey); Count.Invalid++; } } else if (QueryLocal) { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) { Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); Count.Valid++; } } Count.Total++; }); if ((Count.Valid == Count.Total) || PartialRecord) { ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); if (StoreLocal) { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); } BinaryWriter MemStream; if (SkipData) { // Save a package containing only the object. CbPackage(Package.GetObject()).Save(MemStream); } else { Package.Save(MemStream); } ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); } else { Success = false; ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } } else { Success = false; ZEN_WARN("Get - '{}/{}' '{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } } } if (Success) { ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; if (SkipData && AcceptType == ZenContentType::kBinary) { AsyncRequest.WriteResponse(HttpResponseCode::OK); } else { // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally // metadata. AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } else { ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; AsyncRequest.WriteResponse(HttpResponseCode::NotFound); } }); } void HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { return Request.WriteResponse(HttpResponseCode::BadRequest); } const HttpContentType ContentType = Request.RequestContentType(); Body.SetContentType(ContentType); if (ContentType == HttpContentType::kBinary) { ZEN_DEBUG("PUT - '{}/{}' {} '{}'", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType)); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); if (EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreRemote)) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = {Ref.BucketSegment, Ref.HashKey}}); } Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); if (ValidationResult != CbValidateError::None) { ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid compact binary", Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); } CachePolicy Policy = PolicyFromURL; CbObjectView CacheRecord(Body.Data()); std::vector ValidAttachments; int32_t TotalCount = 0; CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) { const IoHash Hash = AttachmentHash.AsHash(); if (m_CidStore.ContainsChunk(Hash)) { ValidAttachments.emplace_back(Hash); } TotalCount++; }); ZEN_DEBUG("PUT - '{}/{}' {} '{}' attachments '{}/{}' (valid/total)", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType), TotalCount, ValidAttachments.size()); Body.SetContentType(ZenContentType::kCbObject); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); const bool IsPartialRecord = TotalCount != static_cast(ValidAttachments.size()); if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream( {.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbPackage) { CbPackage Package; if (!Package.TryLoad(Body)) { ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); } CachePolicy Policy = PolicyFromURL; CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; std::vector ValidAttachments; ValidAttachments.reserve(Package.GetAttachments().size()); CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &Count](CbFieldView HashView) { const IoHash Hash = HashView.AsHash(); if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (Attachment->IsCompressedBinary()) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); ValidAttachments.emplace_back(InsertResult.DecompressedId); if (InsertResult.New) { Count.New++; } Count.Valid++; } else { ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", Ref.BucketSegment, Ref.HashKey, ToString(HttpContentType::kCbPackage), Hash); Count.Invalid++; } } else if (m_CidStore.ContainsChunk(Hash)) { ValidAttachments.emplace_back(Hash); Count.Valid++; } Count.Total++; }); if (Count.Invalid > 0) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.GetSize()), ToString(ContentType), Count.New, Count.Valid, Count.Total); ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream( {.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); } else { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv); } } void HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: HandleGetCachePayload(Request, Ref, PolicyFromURL); break; case HttpVerb::kPut: HandlePutCachePayload(Request, Ref, PolicyFromURL); break; default: break; } } void HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); bool InUpstreamCache = false; CachePolicy Policy = PolicyFromURL; const bool QueryUpstream = !Payload && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); if (QueryUpstream) { if (auto UpstreamResult = m_UpstreamCache.GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { m_CidStore.AddChunk(Compressed); InUpstreamCache = true; } else { ZEN_WARN("got uncompressed upstream cache value"); } } } if (!Payload) { ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType())); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, NiceBytes(Payload.Size()), ToString(Payload.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); m_CacheStats.HitCount++; if (InUpstreamCache) { m_CacheStats.UpstreamHitCount++; } if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) { Request.WriteResponse(HttpResponseCode::OK); } else { Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); } } void HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored ZEN_UNUSED(PolicyFromURL); IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { return Request.WriteResponse(HttpResponseCode::BadRequest); } Body.SetContentType(Request.RequestContentType()); CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); if (!Compressed) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueId does not match attachment hash"sv); } CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, NiceBytes(Body.Size()), ToString(Body.GetContentType()), Result.New ? "NEW" : "OLD"); const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; Request.WriteResponse(ResponseCode); } bool HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); if (BucketSplitOffset == std::string_view::npos) { return false; } OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset)); if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); })) { return false; } std::string_view HashSegment; std::string_view PayloadSegment; std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/'); // We know there is a slash so no need to check for npos return if (PayloadSplitOffset == BucketSplitOffset) { // Basic cache record lookup HashSegment = Key.substr(BucketSplitOffset + 1); } else { // Cache record + valueid lookup HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1); PayloadSegment = Key.substr(PayloadSplitOffset + 1); } if (HashSegment.size() != IoHash::StringLength) { return false; } if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength) { const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); if (!IsOk) { return false; } } else { OutRef.PayloadId = IoHash::Zero; } const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); if (!IsOk) { return false; } return true; } void HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) { switch (Request.RequestVerb()) { case HttpVerb::kPost: { const HttpContentType ContentType = Request.RequestContentType(); const HttpContentType AcceptType = Request.AcceptContentType(); if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage) { return Request.WriteResponse(HttpResponseCode::BadRequest); } Request.WriteResponseAsync( [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { const std::string_view Method = RpcRequest["Method"sv].AsString(); if (Method == "GetCacheRecords"sv) { HandleRpcGetCacheRecords(AsyncRequest, RpcRequest); } else if (Method == "GetCacheValues"sv) { HandleRpcGetCachePayloads(AsyncRequest, RpcRequest); } else { AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); } }); } break; default: Request.WriteResponse(HttpResponseCode::BadRequest); break; } } void HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); CbPackage RpcResponse; CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); CacheRecordPolicy BatchPolicy = CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView()); std::vector CacheKeys; std::vector CacheValues; std::vector UpstreamRequests; ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); for (CbFieldView KeyView : Params["CacheKeys"sv]) { CbObjectView KeyObject = KeyView.AsObjectView(); CacheKeys.push_back(CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash())); } if (CacheKeys.empty()) { return Request.WriteResponse(HttpResponseCode::BadRequest); } CacheValues.resize(CacheKeys.size()); for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys) { ZenCacheValue CacheValue; uint32_t MissingCount = 0; uint32_t MissingReadFromUpstreamCount = 0; if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) { CbObjectView CacheRecord(CacheValue.Value.Data()); CacheRecord.IterateAttachments( [this, &MissingCount, &MissingReadFromUpstreamCount, &RpcResponse, &BatchPolicy](CbFieldView AttachmentHash) { CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) { // A value that is requested without the Query flag (such as None/Disable) does not count as missing, because we // didn't ask for it and thus the record is complete in its absence. if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { MissingReadFromUpstreamCount++; MissingCount++; } } else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) { if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { MissingReadFromUpstreamCount++; } MissingCount++; } } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { ZEN_ASSERT(Chunk.GetSize() > 0); RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); } else { if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { MissingReadFromUpstreamCount++; } MissingCount++; } } }); } if ((!CacheValue.Value && EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::QueryRemote)) || MissingReadFromUpstreamCount != 0) { UpstreamRequests.push_back(KeyIndex); } else if (CacheValue.Value && (MissingCount == 0 || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL) {}", Key.Bucket, Key.Hash, NiceBytes(CacheValue.Value.Size()), ToString(CacheValue.Value.GetContentType()), MissingCount ? "(PARTIAL)" : ""sv); CacheValues[KeyIndex] = std::move(CacheValue.Value); m_CacheStats.HitCount++; } else { if (!EnumHasAnyFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::Query)) { // If they requested no query, do not record this as a miss ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); } else { ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(PARTIAL)"sv : ""sv); m_CacheStats.MissCount++; } } ++KeyIndex; } if (!UpstreamRequests.empty()) { const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, &BatchPolicy](CacheRecordGetCompleteParams&& Params) { ZEN_ASSERT(Params.KeyIndex < CacheValues.size()); IoBuffer CacheValue; AttachmentCount Count; if (Params.Record) { Params.Record.IterateAttachments([this, &RpcResponse, &Params, &Count, &BatchPolicy](CbFieldView HashView) { CachePolicy ValuePolicy = BatchPolicy.GetRecordPolicy(); bool FoundInUpstream = false; if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash())) { FoundInUpstream = true; if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { FoundInUpstream = true; if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { auto InsertResult = m_CidStore.AddChunk(Compressed); if (InsertResult.New) { Count.New++; } } Count.Valid++; if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { RpcResponse.AddAttachment(CbAttachment(Compressed)); } } else { ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", HashView.AsHash(), Params.Key.Bucket, Params.Key.Hash); Count.Invalid++; } } } if (!FoundInUpstream && EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal) && m_CidStore.ContainsChunk(HashView.AsHash())) { // We added the attachment for this Value in the local loop before calling m_UpstreamCache Count.Valid++; } Count.Total++; }); if ((Count.Valid == Count.Total) || EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::PartialRecord)) { CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer(); } } if (CacheValue) { ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)", Params.Key.Bucket, Params.Key.Hash, NiceBytes(CacheValue.GetSize()), ToString(HttpContentType::kCbPackage), Count.New, Count.Valid, Count.Total); CacheValue.SetContentType(ZenContentType::kCbObject); CacheValues[Params.KeyIndex] = CacheValue; if (EnumHasAllFlags(BatchPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) { m_CacheStore.Put(Params.Key.Bucket, Params.Key.Hash, {.Value = CacheValue}); } m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; } else { const bool IsPartial = Count.Valid != Count.Total; ZEN_DEBUG("MISS - '{}/{}' {}", Params.Key.Bucket, Params.Key.Hash, IsPartial ? "(partial)"sv : ""sv); m_CacheStats.MissCount++; } }; m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, BatchPolicy, std::move(OnCacheRecordGetComplete)); } CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); for (const IoBuffer& Value : CacheValues) { if (Value) { CbObjectView Record(Value.Data()); ResponseObject << Record; } else { ResponseObject.AddNull(); } } ResponseObject.EndArray(); RpcResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; RpcResponse.Save(MemStream); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } void HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCachePayloads"); ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); std::vector ChunkRequests; std::vector UpstreamRequests; std::vector Chunks; CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); for (CbFieldView RequestView : Params["ChunkRequests"sv]) { CbObjectView RequestObject = RequestView.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash(); const Oid PayloadId = RequestObject["ValueId"sv].AsObjectId(); const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64(); const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); std::string_view PolicyText = RequestObject["Policy"sv].AsString(); const CachePolicy ChunkPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; // Note we could use emplace_back here but [Apple] LLVM-12's C++ library // can't infer a constructor like other platforms (or can't handle an // initializer list like others do). ChunkRequests.push_back({Key, ChunkId, PayloadId, RawOffset, RawSize, ChunkPolicy}); } if (ChunkRequests.empty()) { return Request.WriteResponse(HttpResponseCode::BadRequest); } Chunks.resize(ChunkRequests.size()); // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) // is missing, load the cache record and try to find the raw hash from the ValueId. { const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { if (PayloadId) { // A valid ValueId indicates that the caller is searching for a Value in a Record // that was Put with ICacheStore::Put for (CbFieldView ValueView : Record["Values"sv]) { CbObjectView ValueObject = ValueView.AsObjectView(); const Oid Id = ValueObject["Id"sv].AsObjectId(); if (Id == PayloadId) { return ValueObject["RawHash"sv].AsHash(); } } // Legacy fields from previous version of CacheRecord serialization: if (CbObjectView ValueObject = Record["Value"sv].AsObjectView()) { const Oid Id = ValueObject["Id"sv].AsObjectId(); if (Id == PayloadId) { return ValueObject["RawHash"sv].AsHash(); } } for (CbFieldView AttachmentView : Record["Attachments"sv]) { CbObjectView AttachmentObject = AttachmentView.AsObjectView(); const Oid Id = AttachmentObject["Id"sv].AsObjectId(); if (Id == PayloadId) { return AttachmentObject["RawHash"sv].AsHash(); } } return IoHash::Zero; } else { // An invalid ValueId indicates that the caller is requesting a Value that // was Put with ICacheStore::PutValue return Record["RawHash"sv].AsHash(); } }; CacheKey CurrentKey = CacheKey::Empty; IoBuffer CurrentRecordBuffer; for (CacheChunkRequest& ChunkRequest : ChunkRequests) { if (ChunkRequest.ChunkId != IoHash::Zero) { continue; } if (ChunkRequest.Key != CurrentKey) { CurrentKey = ChunkRequest.Key; ZenCacheValue CacheValue; if (m_CacheStore.Get(CurrentKey.Bucket, CurrentKey.Hash, CacheValue)) { CurrentRecordBuffer = CacheValue.Value; } } if (CurrentRecordBuffer) { ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId); } } } for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests) { const bool QueryLocal = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryLocal); const bool QueryRemote = EnumHasAllFlags(ChunkRequest.Policy, CachePolicy::QueryRemote); if (QueryLocal) { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId)) { ZEN_ASSERT(Chunk.GetSize() > 0); ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId, NiceBytes(Chunk.Size()), ToString(Chunk.GetContentType()), "LOCAL"); Chunks[RequestIndex] = Chunk; m_CacheStats.HitCount++; } else if (QueryRemote) { UpstreamRequests.push_back(RequestIndex); } else { ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); m_CacheStats.MissCount++; } } else { ZEN_DEBUG("SKIP - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); } ++RequestIndex; } if (!UpstreamRequests.empty()) { const auto OnCachePayloadGetComplete = [this, &Chunks](CachePayloadGetCompleteParams&& Params) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload))) { m_CidStore.AddChunk(Compressed); ZEN_DEBUG("HIT - '{}/{}/{}' {} ({})", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId, NiceBytes(Params.Payload.GetSize()), "UPSTREAM"); ZEN_ASSERT(Params.RequestIndex < Chunks.size()); Chunks[Params.RequestIndex] = std::move(Params.Payload); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; } else { ZEN_DEBUG("MISS - '{}/{}/{}'", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId); m_CacheStats.MissCount++; } }; m_UpstreamCache.GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); } CbPackage RpcResponse; CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); for (size_t ChunkIndex = 0; ChunkIndex < Chunks.size(); ++ChunkIndex) { if (Chunks[ChunkIndex]) { ResponseObject << ChunkRequests[ChunkIndex].ChunkId; RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex]))))); } else { ResponseObject << IoHash::Zero; } } ResponseObject.EndArray(); RpcResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; RpcResponse.Save(MemStream); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } void HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; EmitSnapshot("requests", m_HttpRequests, Cbo); EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); const uint64_t HitCount = m_CacheStats.HitCount; const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; const uint64_t MissCount = m_CacheStats.MissCount; const uint64_t TotalCount = HitCount + MissCount; const CasStoreSize CasSize = m_CidStore.CasSize(); const GcStorageSize CacheSize = m_CacheStore.StorageSize(); Cbo.BeginObject("cache"); Cbo.BeginObject("size"); Cbo << "disk" << CacheSize.DiskSize; Cbo << "memory" << CacheSize.MemorySize; Cbo.EndObject(); Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); Cbo << "hits" << HitCount << "misses" << MissCount; Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); Cbo.EndObject(); Cbo.BeginObject("upstream"); m_UpstreamCache.GetStatus(Cbo); Cbo.EndObject(); Cbo.BeginObject("cas"); Cbo.BeginObject("size"); Cbo << "tiny" << CasSize.TinySize; Cbo << "small" << CasSize.SmallSize; Cbo << "large" << CasSize.LargeSize; Cbo << "total" << CasSize.TotalSize; Cbo.EndObject(); Cbo.EndObject(); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; Cbo << "ok" << true; Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } } // namespace zen