// Copyright Epic Games, Inc. All Rights Reserved. #include "structuredcache.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //#include "cachekey.h" #include "monitoring/httpstats.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" #include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" #include "zenstore/scrubcontext.h" #include #include #include #include #include #include #if ZEN_WITH_TESTS # include # include #endif namespace zen { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// CachePolicy ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) { std::string_view PolicyText = QueryParams.GetValue("Policy"sv); return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; } CacheRecordPolicy LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default) { OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object); return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy); } struct AttachmentCount { uint32_t New = 0; uint32_t Valid = 0; uint32_t Invalid = 0; uint32_t Total = 0; }; struct PutRequestData { std::string Namespace; CacheKey Key; CbObjectView RecordObject; CacheRecordPolicy Policy; }; namespace { static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv; struct HttpRequestData { std::optional Namespace; std::optional Bucket; std::optional HashKey; std::optional ValueContentId; }; constinit AsciiSet ValidNamespaceNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789-_.ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; constinit AsciiSet ValidBucketNameCharactersSet{"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"}; std::optional GetValidNamespaceName(std::string_view Name) { if (Name.empty()) { ZEN_WARN("Namespace is invalid, empty namespace is not allowed"); return {}; } if (Name.length() > 64) { ZEN_WARN("Namespace '{}' is invalid, length exceeds 64 characters", Name); return {}; } if (!AsciiSet::HasOnly(Name, ValidNamespaceNameCharactersSet)) { ZEN_WARN("Namespace '{}' is invalid, invalid characters detected", Name); return {}; } return ToLower(Name); } std::optional GetValidBucketName(std::string_view Name) { if (Name.empty()) { ZEN_WARN("Bucket name is invalid, empty bucket name is not allowed"); return {}; } if (!AsciiSet::HasOnly(Name, ValidBucketNameCharactersSet)) { ZEN_WARN("Bucket name '{}' is invalid, invalid characters detected", Name); return {}; } return ToLower(Name); } std::optional GetValidIoHash(std::string_view Hash) { if (Hash.length() != IoHash::StringLength) { return {}; } IoHash KeyHash; if (!ParseHexBytes(Hash.data(), Hash.size(), KeyHash.Hash)) { return {}; } return KeyHash; } bool HttpRequestParseRelativeUri(std::string_view Key, HttpRequestData& Data) { std::vector Tokens; uint32_t TokenCount = zen::ForEachStrTok(Key, '/', [&](const std::string_view& Token) { Tokens.push_back(Token); return true; }); switch (TokenCount) { case 1: Data.Namespace = GetValidNamespaceName(Tokens[0]); return Data.Namespace.has_value(); case 2: { std::optional PossibleHashKey = GetValidIoHash(Tokens[1]); if (PossibleHashKey.has_value()) { // Legacy bucket/key request Data.Bucket = GetValidBucketName(Tokens[0]); if (!Data.Bucket.has_value()) { return false; } Data.HashKey = PossibleHashKey; Data.Namespace = ZenCacheStore::DefaultNamespace; return true; } Data.Namespace = GetValidNamespaceName(Tokens[0]); if (!Data.Namespace.has_value()) { return false; } Data.Bucket = GetValidBucketName(Tokens[1]); if (!Data.Bucket.has_value()) { return false; } return true; } case 3: { std::optional PossibleHashKey = GetValidIoHash(Tokens[1]); if (PossibleHashKey.has_value()) { // Legacy bucket/key/valueid request Data.Bucket = GetValidBucketName(Tokens[0]); if (!Data.Bucket.has_value()) { return false; } Data.HashKey = PossibleHashKey; Data.ValueContentId = GetValidIoHash(Tokens[2]); if (!Data.ValueContentId.has_value()) { return false; } Data.Namespace = ZenCacheStore::DefaultNamespace; return true; } Data.Namespace = GetValidNamespaceName(Tokens[0]); if (!Data.Namespace.has_value()) { return false; } Data.Bucket = GetValidBucketName(Tokens[1]); if (!Data.Bucket.has_value()) { return false; } Data.HashKey = GetValidIoHash(Tokens[2]); if (!Data.HashKey) { return false; } return true; } case 4: { Data.Namespace = GetValidNamespaceName(Tokens[0]); if (!Data.Namespace.has_value()) { return false; } Data.Bucket = GetValidBucketName(Tokens[1]); if (!Data.Bucket.has_value()) { return false; } Data.HashKey = GetValidIoHash(Tokens[2]); if (!Data.HashKey.has_value()) { return false; } Data.ValueContentId = GetValidIoHash(Tokens[3]); if (!Data.ValueContentId.has_value()) { return false; } return true; } default: return false; } } std::optional GetRpcRequestNamespace(const CbObjectView Params) { CbFieldView NamespaceField = Params["Namespace"sv]; if (!NamespaceField) { return std::string(ZenCacheStore::DefaultNamespace); } if (NamespaceField.HasError()) { return {}; } if (!NamespaceField.IsString()) { return {}; } return GetValidNamespaceName(NamespaceField.AsString()); } bool GetRpcRequestCacheKey(const CbObjectView& KeyView, CacheKey& Key) { CbFieldView BucketField = KeyView["Bucket"sv]; if (BucketField.HasError()) { return false; } if (!BucketField.IsString()) { return false; } std::optional Bucket = GetValidBucketName(BucketField.AsString()); if (!Bucket.has_value()) { return false; } CbFieldView HashField = KeyView["Hash"sv]; if (HashField.HasError()) { return false; } if (!HashField.IsHash()) { return false; } IoHash Hash = HashField.AsHash(); Key = CacheKey::Create(*Bucket, Hash); return true; } } // namespace ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) , m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); } HttpStructuredCacheService::~HttpStructuredCacheService() { ZEN_INFO("closing structured cache"); m_StatsService.UnregisterHandler("z$", *this); m_StatusService.UnregisterHandler("z$", *this); } const char* HttpStructuredCacheService::BaseUri() const { return "/z$/"; } void HttpStructuredCacheService::Flush() { } void HttpStructuredCacheService::Scrub(ScrubContext& Ctx) { if (m_LastScrubTime == Ctx.ScrubTimestamp()) { return; } m_LastScrubTime = Ctx.ScrubTimestamp(); m_CidStore.Scrub(Ctx); m_CacheStore.Scrub(Ctx); } void HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { metrics::OperationTiming::Scope $(m_HttpRequests); std::string_view Key = Request.RelativeUri(); if (Key == HttpZCacheRPCPrefix) { return HandleRpcRequest(Request); } HttpRequestData RequestData; if (!HttpRequestParseRelativeUri(Key, RequestData)) { return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } if (RequestData.ValueContentId.has_value()) { ZEN_ASSERT(RequestData.Namespace.has_value()); ZEN_ASSERT(RequestData.Bucket.has_value()); ZEN_ASSERT(RequestData.HashKey.has_value()); CacheRef Ref = {.Namespace = RequestData.Namespace.value(), .BucketSegment = RequestData.Bucket.value(), .HashKey = RequestData.HashKey.value(), .ValueContentId = RequestData.ValueContentId.value()}; return HandleCacheValueRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); } if (RequestData.HashKey.has_value()) { ZEN_ASSERT(RequestData.Namespace.has_value()); ZEN_ASSERT(RequestData.Bucket.has_value()); CacheRef Ref = {.Namespace = RequestData.Namespace.value(), .BucketSegment = RequestData.Bucket.value(), .HashKey = RequestData.HashKey.value(), .ValueContentId = IoHash::Zero}; return HandleCacheRecordRequest(Request, Ref, ParseCachePolicy(Request.GetQueryParams())); } if (RequestData.Bucket.has_value()) { ZEN_ASSERT(RequestData.Namespace.has_value()); return HandleCacheBucketRequest(Request, RequestData.Namespace.value(), RequestData.Bucket.value()); } ZEN_ASSERT(RequestData.Namespace.has_value()); return HandleCacheNamespaceRequest(Request, RequestData.Namespace.value()); } void HttpStructuredCacheService::HandleCacheNamespaceRequest(zen::HttpServerRequest& Request, std::string_view Namespace) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { // Query stats } break; case HttpVerb::kDelete: // Drop namespace { if (m_CacheStore.DropNamespace(Namespace)) { return Request.WriteResponse(HttpResponseCode::OK); } else { return Request.WriteResponse(HttpResponseCode::NotFound); } } break; default: break; } } void HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { // Query stats } break; case HttpVerb::kDelete: // Drop bucket { if (m_CacheStore.DropBucket(Namespace, Bucket)) { return Request.WriteResponse(HttpResponseCode::OK); } else { return Request.WriteResponse(HttpResponseCode::NotFound); } } break; default: break; } } void HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { HandleGetCacheRecord(Request, Ref, PolicyFromUrl); } break; case HttpVerb::kPut: HandlePutCacheRecord(Request, Ref, PolicyFromUrl); break; default: break; } } void HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { const ZenContentType AcceptType = Request.AcceptContentType(); const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); bool Success = false; ZenCacheValue ClientResultValue; if (!EnumHasAnyFlags(PolicyFromUrl, CachePolicy::Query)) { return Request.WriteResponse(HttpResponseCode::OK); } if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { Success = true; ZenContentType ContentType = ClientResultValue.Value.GetContentType(); if (AcceptType == ZenContentType::kCbPackage) { if (ContentType == ZenContentType::kCbObject) { CbPackage Package; uint32_t MissingCount = 0; CbObjectView CacheRecord(ClientResultValue.Value.Data()); CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { if (SkipData) { if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) { MissingCount++; } } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); } else { MissingCount++; } } }); Success = MissingCount == 0 || PartialRecord; if (Success) { Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); BinaryWriter MemStream; Package.Save(MemStream); ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); } } else { Success = false; } } else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType && AcceptType != ZenContentType::kBinary) { Success = false; } } if (Success) { ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' (LOCAL)", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject) { return Request.WriteResponse(HttpResponseCode::OK); } else { // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } else if (!EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryRemote)) { ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } // Issue upstream query asynchronously in order to keep requests flowing without // hogging I/O servicing threads with blocking work Request.WriteResponseAsync([this, AcceptType, PolicyFromUrl, Ref](HttpServerRequest& AsyncRequest) { bool Success = false; const bool PartialRecord = EnumHasAllFlags(PolicyFromUrl, CachePolicy::PartialRecord); const bool QueryLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::QueryLocal); const bool StoreLocal = EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreLocal); const bool SkipData = EnumHasAllFlags(PolicyFromUrl, CachePolicy::SkipData); ZenCacheValue ClientResultValue; metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Success) { Success = true; ClientResultValue.Value = UpstreamResult.Value; ClientResultValue.Value.SetContentType(AcceptType); if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) { if (AcceptType == ZenContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); if (ValidationResult != CbValidateError::None) { Success = false; ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid compact binary object from upstream", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data } if (Success && StoreLocal) { m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ClientResultValue); } } else if (AcceptType == ZenContentType::kCbPackage) { CbPackage Package; if (Package.TryLoad(ClientResultValue.Value)) { CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) { if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash())) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { if (StoreLocal) { auto InsertResult = m_CidStore.AddChunk(Compressed); if (InsertResult.New) { Count.New++; } } Count.Valid++; } else { ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", HashView.AsHash(), Ref.BucketSegment, Ref.HashKey); Count.Invalid++; } } else if (QueryLocal) { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) { Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); Count.Valid++; } } Count.Total++; }); if ((Count.Valid == Count.Total) || PartialRecord) { ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); if (StoreLocal) { m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); } BinaryWriter MemStream; if (SkipData) { // Save a package containing only the object. CbPackage(Package.GetObject()).Save(MemStream); } else { Package.Save(MemStream); } ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); } else { Success = false; ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } } else { Success = false; ZEN_WARN("Get - '{}/{}/{}' '{}' FAILED, invalid upstream package", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } } } if (Success) { ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' (UPSTREAM)", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; if (SkipData && AcceptType == ZenContentType::kBinary) { AsyncRequest.WriteResponse(HttpResponseCode::OK); } else { // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally // metadata. AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } else { ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; AsyncRequest.WriteResponse(HttpResponseCode::NotFound); } }); } void HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { return Request.WriteResponse(HttpResponseCode::BadRequest); } const HttpContentType ContentType = Request.RequestContentType(); Body.SetContentType(ContentType); if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) { ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}'", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType)); m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); if (EnumHasAllFlags(PolicyFromUrl, CachePolicy::StoreRemote)) { m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}}); } Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); if (ValidationResult != CbValidateError::None) { ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, invalid compact binary", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); } CachePolicy Policy = PolicyFromUrl; CbObjectView CacheRecord(Body.Data()); std::vector ValidAttachments; int32_t TotalCount = 0; CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) { const IoHash Hash = AttachmentHash.AsHash(); if (m_CidStore.ContainsChunk(Hash)) { ValidAttachments.emplace_back(Hash); } TotalCount++; }); ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' attachments '{}/{}' (valid/total)", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType), TotalCount, ValidAttachments.size()); Body.SetContentType(ZenContentType::kCbObject); m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body}); const bool IsPartialRecord = TotalCount != static_cast(ValidAttachments.size()); if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbPackage) { CbPackage Package; if (!Package.TryLoad(Body)) { ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, invalid package", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); } CachePolicy Policy = PolicyFromUrl; CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; std::vector ValidAttachments; ValidAttachments.reserve(Package.GetAttachments().size()); CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &Count](CbFieldView HashView) { const IoHash Hash = HashView.AsHash(); if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (Attachment->IsCompressedBinary()) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); ValidAttachments.emplace_back(Hash); if (InsertResult.New) { Count.New++; } Count.Valid++; } else { ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, ToString(HttpContentType::kCbPackage), Hash); Count.Invalid++; } } else if (m_CidStore.ContainsChunk(Hash)) { ValidAttachments.emplace_back(Hash); Count.Valid++; } Count.Total++; }); if (Count.Invalid > 0) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.GetSize()), ToString(ContentType), Count.New, Count.Valid, Count.Total); ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue); const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, .Namespace = Ref.Namespace, .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); } else { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv); } } void HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: HandleGetCacheValue(Request, Ref, PolicyFromUrl); break; case HttpVerb::kPut: HandlePutCacheValue(Request, Ref, PolicyFromUrl); break; default: break; } } void HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { Stopwatch Timer; IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); bool InUpstreamCache = false; CachePolicy Policy = PolicyFromUrl; { const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); if (QueryUpstream) { if (auto UpstreamResult = m_UpstreamCache.GetCacheValue(Ref.Namespace, {Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { m_CidStore.AddChunk(Compressed); InUpstreamCache = true; } else { ZEN_WARN("got uncompressed upstream cache value"); } } } } if (!Value) { ZEN_DEBUG("MISS - '{}/{}/{}/{}' '{}' in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType()), NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } ZEN_DEBUG("HIT - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, NiceBytes(Value.Size()), ToString(Value.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL", NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; if (InUpstreamCache) { m_CacheStats.UpstreamHitCount++; } if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) { Request.WriteResponse(HttpResponseCode::OK); } else { Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); } } void HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl) { // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored ZEN_UNUSED(PolicyFromUrl); Stopwatch Timer; IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { return Request.WriteResponse(HttpResponseCode::BadRequest); } Body.SetContentType(Request.RequestContentType()); CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); if (!Compressed) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueContentId does not match attachment hash"sv); } CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); ZEN_DEBUG("PUT - '{}/{}/{}/{}' {} '{}' ({}) in {}", Ref.Namespace, Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, NiceBytes(Body.Size()), ToString(Body.GetContentType()), Result.New ? "NEW" : "OLD", NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; Request.WriteResponse(ResponseCode); } void HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) { switch (Request.RequestVerb()) { case HttpVerb::kPost: { const HttpContentType ContentType = Request.RequestContentType(); const HttpContentType AcceptType = Request.AcceptContentType(); if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || AcceptType != HttpContentType::kCbPackage) { return Request.WriteResponse(HttpResponseCode::BadRequest); } Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable { CbPackage Package; CbObjectView Object; CbObject ObjectBuffer; if (ContentType == HttpContentType::kCbObject) { ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body)); Object = ObjectBuffer; } else { Package = ParsePackageMessage(Body); Object = Package.GetObject(); } const std::string_view Method = Object["Method"sv].AsString(); if (Method == "PutCacheRecords"sv) { HandleRpcPutCacheRecords(AsyncRequest, Package); } else if (Method == "GetCacheRecords"sv) { HandleRpcGetCacheRecords(AsyncRequest, Object); } else if (Method == "PutCacheValues"sv) { HandleRpcPutCacheValues(AsyncRequest, Package); } else if (Method == "GetCacheValues"sv) { HandleRpcGetCacheValues(AsyncRequest, Object); } else if (Method == "GetCacheChunks"sv) { HandleRpcGetCacheChunks(AsyncRequest, Object); } else { AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); } }); } break; default: Request.WriteResponse(HttpResponseCode::BadRequest); break; } } void HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) { ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); CbObjectView BatchObject = BatchRequest.GetObject(); CbObjectView Params = BatchObject["Params"sv].AsObjectView(); CachePolicy DefaultPolicy; ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); std::string_view PolicyText = Params["DefaultPolicy"].AsString(); std::optional Namespace = GetRpcRequestNamespace(Params); if (!Namespace) { return Request.WriteResponse(HttpResponseCode::BadRequest); } DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::vector Results; for (CbFieldView RequestField : Params["Requests"sv]) { CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); CacheKey Key; if (!GetRpcRequestCacheKey(KeyView, Key)) { return Request.WriteResponse(HttpResponseCode::BadRequest); } CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)}; PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); if (Result == PutResult::Invalid) { return Request.WriteResponse(HttpResponseCode::BadRequest); } Results.push_back(Result == PutResult::Success); } if (Results.empty()) { return Request.WriteResponse(HttpResponseCode::BadRequest); } CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); for (bool Value : Results) { ResponseObject.AddBool(Value); } ResponseObject.EndArray(); CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; RpcResponse.Save(MemStream); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } HttpStructuredCacheService::PutResult HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) { std::vector ValidAttachments; AttachmentCount Count; CbObjectView Record = Request.RecordObject; uint64_t RecordObjectSize = Record.GetSize(); uint64_t TransferredSize = RecordObjectSize; Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { const IoHash ValueHash = HashView.AsHash(); if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) { if (Attachment->IsCompressedBinary()) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); ValidAttachments.emplace_back(ValueHash); if (InsertResult.New) { Count.New++; } Count.Valid++; TransferredSize += Chunk.GetCompressedSize(); } else { ZEN_WARN("PUT - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed", Request.Namespace, Request.Key.Bucket, Request.Key.Hash, ToString(HttpContentType::kCbPackage), ValueHash); Count.Invalid++; } } else if (m_CidStore.ContainsChunk(ValueHash)) { ValidAttachments.emplace_back(ValueHash); Count.Valid++; } Count.Total++; }); if (Count.Invalid > 0) { return PutResult::Invalid; } ZEN_DEBUG("PUT - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", Request.Namespace, Request.Key.Bucket, Request.Key.Hash, NiceBytes(TransferredSize), Count.New, Count.Valid, Count.Total); ZenCacheValue CacheValue; CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue); const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, .Namespace = Request.Namespace, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); } return PutResult::Success; } namespace impl { HttpResponseCode GetCacheRecords(ZenCacheStoreBase& CacheStore, CidStoreBase& CidStore, UpstreamCache& UpstreamCache, std::atomic_uint64_t& HitCount, std::atomic_uint64_t& UpstreamHitCount, std::atomic_uint64_t& MissCount, const cacherequests::GetCacheRecordsRequest& Request, const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult) { size_t RequestCount = Request.Requests.size(); OutResult.Results.resize(RequestCount); std::vector UpstreamRecordIndexes; UpstreamRecordIndexes.reserve(RequestCount); struct RecordRequestData { CachePolicy RecordPolicy; bool MissingLocalRecord = true; bool UsedUpstream = false; size_t MissingValueCount = 0; }; std::vector RequestData(RequestCount); for (size_t RequestIndex = 0; RequestIndex < RequestCount; RequestIndex++) { const CacheKey& Key = Request.Requests[RequestIndex]; RecordRequestData& RecordRequestData = RequestData[RequestIndex]; std::optional& RecordResult = OutResult.Results[RequestIndex]; RecordRequestData.RecordPolicy = cacherequests::GetEffectiveRecordPolicy(Policy, RequestIndex); const bool RecordQueryLocal = EnumHasAllFlags(RecordRequestData.RecordPolicy, CachePolicy::QueryLocal); ZenCacheValue RecordCacheValue; if (RecordQueryLocal && CacheStore.Get(Request.Namespace, Key.Bucket, Key.Hash, RecordCacheValue)) { if (RecordCacheValue.Value.GetContentType() != ZenContentType::kCbObject) { ZEN_WARN("local record {}/{}/{} is not a structured object, skipping.", Request.Namespace, Key.Bucket, Key.Hash); continue; } CbObject Record = LoadCompactBinaryObject(RecordCacheValue.Value); cacherequests::CacheRecord CacheRecord; if (!CacheRecord.Parse(Record)) { ZEN_WARN("local record {}/{}/{} is corrupt, skipping", Request.Namespace, Key.Bucket, Key.Hash); continue; } RecordResult = {.Key = CacheRecord.Key}; RecordRequestData.MissingLocalRecord = false; size_t ValueCount = CacheRecord.Values.size(); RecordResult->Values.resize(ValueCount); for (size_t ValueIndex = 0; ValueIndex < ValueCount; ++ValueIndex) { const cacherequests::CacheRecordValue& Value = CacheRecord.Values[ValueIndex]; const CachePolicy ValuePolicy = cacherequests::GetEffectiveValuePolicy(Policy, RequestIndex, Value.Id); RecordResult->Values[ValueIndex] = {.Id = Value.Id, .RawHash = Value.RawHash, .RawSize = Value.RawSize}; const bool ValueQueryAny = EnumHasAnyFlags(ValuePolicy, CachePolicy::Query); if (!ValueQueryAny) { // A value that is requested without the Query flag counts as existing, because we // didn't ask for it and thus the record is complete in its absence. continue; } const bool ValueSkipData = EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData); if (ValueSkipData) { // Verify that chunk exist even if we skip the actual data if (CidStore.ContainsChunk(Value.RawHash)) { continue; } const bool ValueQueryRemote = EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote); if (ValueQueryRemote) { RecordRequestData.MissingValueCount++; } continue; } if (IoBuffer Chunk = CidStore.FindChunkByCid(Value.RawHash)) { if (Chunk.GetSize() == 0) { ZEN_WARN("local value {}/{}/{}/{} is zero size, skipping", Request.Namespace, Key.Bucket, Key.Hash, Value.RawHash); continue; } RecordResult->Values[ValueIndex].Body = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); continue; } const bool ValueQueryRemote = EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote); if (ValueQueryRemote) { RecordRequestData.MissingValueCount++; } } } bool RecordQueryRemote = EnumHasAllFlags(RecordRequestData.RecordPolicy, CachePolicy::QueryRemote); if ((RecordRequestData.MissingLocalRecord && RecordQueryRemote) || (RecordRequestData.MissingValueCount > 0)) { UpstreamRecordIndexes.push_back(RequestIndex); } } if (!UpstreamRecordIndexes.empty()) { cacherequests::RecordsRequestPolicy UpstreamPolicy = {.DefaultPolicy = ConvertToUpstream(Policy.DefaultPolicy) | CachePolicy::SkipMeta}; UpstreamPolicy.RecordPolicies.resize(Request.Requests.size()); for (size_t RequestIndex : UpstreamRecordIndexes) { std::optional& RecordResult = OutResult.Results[RequestIndex]; if (!RecordResult) { if (Policy.RecordPolicies[RequestIndex].has_value()) { UpstreamPolicy.RecordPolicies[RequestIndex] = Policy.RecordPolicies[RequestIndex]->ConvertToUpstream(); } continue; } const CachePolicy UpstreamBasePolicy = ConvertToUpstream(cacherequests::GetEffectiveBasePolicy(Policy, RequestIndex)) | CachePolicy::SkipMeta; CacheRecordPolicyBuilder Builder(UpstreamBasePolicy); for (size_t ValueIndex = 0; ValueIndex < RecordResult->Values.size(); ++ValueIndex) { const cacherequests::GetCacheRecordResultValue& Value = RecordResult->Values[ValueIndex]; const CachePolicy ValuePolicy = cacherequests::GetEffectiveValuePolicy(Policy, RequestIndex, Value.Id); const bool ValueQueryRemote = EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote); CachePolicy ValueUpstreamPolicy = ConvertToUpstream(ValuePolicy); const bool ValueSkipData = EnumHasAnyFlags(ValuePolicy, CachePolicy::SkipData); const bool ValueUpstreamSkipData = EnumHasAnyFlags(ValueUpstreamPolicy, CachePolicy::SkipData); const bool ValueGetFromRemote = !ValueSkipData && !ValueUpstreamSkipData; const bool ValueGetFromUpstream = ValueQueryRemote && ValueGetFromRemote; if (RecordResult->Values[ValueIndex].Body || (!ValueGetFromUpstream)) { ValueUpstreamPolicy |= CachePolicy::SkipData; } if ((ValueUpstreamPolicy | CachePolicy::SkipMeta) != UpstreamBasePolicy) { Builder.AddValuePolicy(Value.Id, ValueUpstreamPolicy); } } UpstreamPolicy.RecordPolicies[RequestIndex] = Builder.Build(); } UpstreamCache.GetCacheRecords(Request, UpstreamPolicy, OutResult, UpstreamRecordIndexes); for (size_t RequestIndex : UpstreamRecordIndexes) { RecordRequestData& RecordRequestData = RequestData[RequestIndex]; std::optional& RecordResult = OutResult.Results[RequestIndex]; if (!RecordResult) { continue; } if (RecordRequestData.MissingLocalRecord) { RecordRequestData.UsedUpstream = true; if (EnumHasAllFlags(RecordRequestData.RecordPolicy, CachePolicy::StoreLocal)) { cacherequests::CacheRecord Record = {.Key = RecordResult->Key}; Record.Values.reserve(RecordResult->Values.size()); for (const cacherequests::GetCacheRecordResultValue& ResultValue : RecordResult->Values) { Record.Values.push_back({.Id = ResultValue.Id, .RawHash = ResultValue.RawHash, .RawSize = ResultValue.RawSize}); } CbObjectWriter Writer; if (!Record.Format(Writer)) { continue; } BinaryWriter MemStream; Writer.Save(MemStream); IoBuffer RecordBody(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()); CacheStore.Put(Request.Namespace, RecordResult->Key.Bucket, RecordResult->Key.Hash, {.Value = RecordBody}); } } for (cacherequests::GetCacheRecordResultValue& ValueResult : RecordResult->Values) { const CachePolicy UpstreamValuePolicy = GetEffectiveValuePolicy(UpstreamPolicy, RequestIndex, ValueResult.Id); const bool ValueQueryUpstream = EnumHasAnyFlags(UpstreamValuePolicy, CachePolicy::Query); if (!ValueQueryUpstream) { continue; } const bool ValueSkipDataUpstream = EnumHasAnyFlags(UpstreamValuePolicy, CachePolicy::SkipData); if (ValueSkipDataUpstream) { continue; } if (ValueResult.Body) { if (!RecordRequestData.MissingLocalRecord) { ZEN_ASSERT(RecordRequestData.MissingValueCount > 0); RecordRequestData.MissingValueCount--; } RecordRequestData.UsedUpstream = true; const CachePolicy ValuePolicy = cacherequests::GetEffectiveValuePolicy(Policy, RequestIndex, ValueResult.Id); if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { CidStore.AddChunk(ValueResult.Body); } continue; } if (RecordRequestData.MissingLocalRecord) { RecordRequestData.MissingValueCount++; ZEN_ASSERT(RecordRequestData.MissingValueCount <= RecordResult->Values.size()); } } } } for (size_t RequestIndex = 0; RequestIndex < RequestCount; RequestIndex++) { const CacheKey& Key = Request.Requests[RequestIndex]; std::optional& RecordResult = OutResult.Results[RequestIndex]; if (!RecordResult) { MissCount++; continue; } RecordRequestData& RecordRequestData = RequestData[RequestIndex]; const bool RecordIsPartial = RecordRequestData.MissingValueCount > 0; if (RecordIsPartial && !EnumHasAllFlags(RecordRequestData.RecordPolicy, CachePolicy::PartialRecord)) { // Clear the result record here, caller did not want any partial records ZEN_DEBUG("MISS - '{}/{}/{}' {}{}", Request.Namespace, Key.Bucket, Key.Hash, "(PARTIAL)"sv, RecordRequestData.UsedUpstream ? " (UPSTREAM)"sv : ""sv); RecordResult.reset(); MissCount++; continue; } ZEN_DEBUG("HIT - '{}/{}/{}' {}{}", Request.Namespace, Key.Bucket, Key.Hash, RecordIsPartial ? " (PARTIAL)"sv : ""sv, RecordRequestData.UsedUpstream ? " (UPSTREAM)"sv : ""sv); HitCount++; UpstreamHitCount += RecordRequestData.UsedUpstream ? 1 : 0; } return HttpResponseCode::OK; } HttpResponseCode GetCacheChunks(ZenCacheStoreBase& CacheStore, CidStoreBase& CidStore, UpstreamCache& UpstreamCache, std::atomic_uint64_t&, std::atomic_uint64_t&, std::atomic_uint64_t&, cacherequests::GetCacheChunksRequest& Request, const cacherequests::ChunksRequestPolicy& Policy, cacherequests::GetCacheChunksResult& OutResult) { auto GetChunkIdsFromCacheRecord = [&Request](size_t RequestIndex, const ZenCacheValue& RecordCacheValue) -> size_t { if (RecordCacheValue.Value.GetContentType() != ZenContentType::kCbObject) { ZEN_WARN("local record {}/{}/{} is not a structured object, skipping.", Request.Namespace, Request.Requests[RequestIndex].Key.Bucket, Request.Requests[RequestIndex].Key.Hash); return 0; } cacherequests::CacheRecord CacheRecord; CbObject Record = LoadCompactBinaryObject(RecordCacheValue.Value); if (!CacheRecord.Parse(Record)) { ZEN_WARN("local record {}/{}/{} is corrupt, skipping", Request.Namespace, Request.Requests[RequestIndex].Key.Bucket, Request.Requests[RequestIndex].Key.Hash); return 0; } size_t ChunkCount = 0; while (CacheRecord.Key == Request.Requests[RequestIndex + ChunkCount].Key) { cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex + ChunkCount]; auto FindIt = std::find_if( CacheRecord.Values.begin(), CacheRecord.Values.end(), [&ChunkRequest](const cacherequests::CacheRecordValue& Value) { return Value.Id == ChunkRequest.ValueId; }); if (FindIt != CacheRecord.Values.end()) { ChunkRequest.ChunkId = FindIt->RawHash; ChunkRequest.RawSize = FindIt->RawSize; } ChunkCount++; } return ChunkCount; }; // Figure out which records we need, locally we just get them, upstream we request them std::vector UpstreamRecordRequestIndexes; size_t RequestCount = Request.Requests.size(); { size_t RequestIndex = 0; while (RequestIndex < RequestCount) { cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex]; // cacherequests::CacheValueResult& ChunkResult = OutResult.Results[RequestIndex]; CachePolicy ChunkPolicy = cacherequests::GetEffectiveChunkPolicy(Policy, RequestIndex); const bool HasValueId = ChunkRequest.ValueId != Oid::Zero; const bool HasChunkId = ChunkRequest.ChunkId != IoHash::Zero; const bool QueryLocal = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryLocal); // const bool SkipData = EnumHasAnyFlags(ChunkPolicy, CachePolicy::SkipData); const bool QueryRemote = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryRemote); const bool GetInlineCacheValue = !HasValueId && !HasChunkId; if (GetInlineCacheValue) { RequestIndex++; continue; } if (HasChunkId) { RequestIndex++; continue; } ZenCacheValue RecordCacheValue; if (QueryLocal && CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) { size_t ResolvedChunkCount = GetChunkIdsFromCacheRecord(RequestIndex, RecordCacheValue); if (ResolvedChunkCount != 0) { RequestIndex += ResolvedChunkCount; continue; } } if (QueryRemote) { UpstreamRecordRequestIndexes.push_back(RequestIndex); size_t ChunkCount = 1; while (Request.Requests[RequestIndex].Key == Request.Requests[RequestIndex + ChunkCount].Key) { ChunkCount++; } RequestIndex += ChunkCount; continue; } } } if (!UpstreamRecordRequestIndexes.empty()) { // We need to do GetCacheValue for the actual record - we don't want to fetch cache records... cacherequests::GetCacheRecordsRequest RecordsRequest = {.Namespace = Request.Namespace}; RecordsRequest.Requests.reserve(UpstreamRecordRequestIndexes.size()); for (size_t RequestIndex : UpstreamRecordRequestIndexes) { cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex]; RecordsRequest.Requests.push_back(ChunkRequest.Key); } cacherequests::RecordsRequestPolicy RecordsRequestPolicy = {.DefaultPolicy = Policy.DefaultPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta}; cacherequests::GetCacheRecordsResult RecordsResult; RecordsRequestPolicy.RecordPolicies.resize(UpstreamRecordRequestIndexes.size()); UpstreamCache.GetCacheRecords(RecordsRequest, RecordsRequestPolicy, RecordsResult, {}); size_t ResultIndex = 0; for (size_t RequestIndex : UpstreamRecordRequestIndexes) { const std::optional& RecordResult = RecordsResult.Results[ResultIndex++]; if (!RecordResult) { continue; } ZEN_ASSERT(RecordResult->Key == Request.Requests[RequestIndex].Key); while (RecordResult->Key == Request.Requests[RequestIndex].Key) { cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex]; auto FindIt = std::find_if(RecordResult->Values.begin(), RecordResult->Values.end(), [&ChunkRequest](const cacherequests::GetCacheRecordResultValue& Value) { return Value.Id == ChunkRequest.ValueId; }); if (FindIt != RecordResult->Values.end()) { ChunkRequest.ChunkId = FindIt->RawHash; ChunkRequest.RawSize = FindIt->RawSize; } RequestIndex++; } } } // Now we should have any ChunkId we can get for all requests std::vector UpstreamChunkIndexes; { for (size_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex]; cacherequests::CacheValueResult& ChunkResult = OutResult.Results[RequestIndex]; CachePolicy ChunkPolicy = cacherequests::GetEffectiveChunkPolicy(Policy, RequestIndex); const bool HasValueId = ChunkRequest.ValueId != Oid::Zero; const bool HasChunkId = ChunkRequest.ChunkId != IoHash::Zero; const bool QueryLocal = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryLocal); const bool SkipData = EnumHasAnyFlags(ChunkPolicy, CachePolicy::SkipData); const bool QueryRemote = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryRemote); const bool GetInlineCacheValue = !HasValueId && !HasChunkId; if (GetInlineCacheValue) { if (QueryLocal) { ZenCacheValue RecordCacheValue; if (CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value)); if (!Compressed) { Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value)); } if (!SkipData) { ChunkResult.Body = Compressed; } ChunkResult.RawSize = Compressed.GetRawSize(); ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); continue; } } if (QueryRemote) { UpstreamChunkIndexes.push_back(RequestIndex); } continue; } if (!HasChunkId) { // Miss continue; } if (QueryLocal) { if (SkipData && ChunkRequest.RawSize != ~uint64_t(0)) { if (CidStore.ContainsChunk(ChunkRequest.ChunkId)) { // Hit ChunkResult.RawSize = ChunkRequest.RawSize; ChunkResult.RawHash = ChunkRequest.ChunkId; continue; } } ZenCacheValue RecordCacheValue; if (IoBuffer Chunk = CidStore.FindChunkByCid(ChunkRequest.ChunkId); Chunk) { // Hit CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); if (!Compressed) { Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value)); } if (!SkipData) { ChunkResult.Body = Compressed; } ChunkResult.RawSize = Compressed.GetRawSize(); ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); continue; } } if (QueryRemote) { UpstreamChunkIndexes.push_back(RequestIndex); } } } return HttpResponseCode::OK; } } // namespace impl void HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheRecordsNew"); cacherequests::GetCacheRecordsRequest Request; cacherequests::RecordsRequestPolicy Policy; if (!Request.Parse(RpcRequest, Policy)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } cacherequests::GetCacheRecordsResult Result; HttpResponseCode Response = impl::GetCacheRecords(m_CacheStore, m_CidStore, m_UpstreamCache, m_CacheStats.HitCount, m_CacheStats.UpstreamHitCount, m_CacheStats.MissCount, Request, Policy, Result); if (Response != HttpResponseCode::OK) { return HttpRequest.WriteResponse(Response); } CbPackage ResponsePackage; if (!Result.Format(ResponsePackage)) { return HttpRequest.WriteResponse(HttpResponseCode::InternalServerError); } BinaryWriter MemStream; ResponsePackage.Save(MemStream); HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } void HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) { ZEN_TRACE_CPU("Z$::RpcPutCacheValues"); CbObjectView BatchObject = BatchRequest.GetObject(); CbObjectView Params = BatchObject["Params"sv].AsObjectView(); ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv); std::string_view PolicyText = Params["DefaultPolicy"].AsString(); CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::optional Namespace = GetRpcRequestNamespace(Params); if (!Namespace) { return Request.WriteResponse(HttpResponseCode::BadRequest); } std::vector Results; for (CbFieldView RequestField : Params["Requests"sv]) { CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); CacheKey Key; if (!GetRpcRequestCacheKey(KeyView, Key)) { return Request.WriteResponse(HttpResponseCode::BadRequest); } PolicyText = RequestObject["Policy"sv].AsString(); CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment(); bool Succeeded = false; uint64_t TransferredSize = 0; if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) { if (Attachment->IsCompressedBinary()) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { // TODO: Implement upstream puts of CacheValues with StoreLocal == false. // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream. Policy |= CachePolicy::StoreLocal; } if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); m_CacheStore.Put(*Namespace, Key.Bucket, Key.Hash, {.Value = Value}); TransferredSize = Chunk.GetCompressedSize(); } Succeeded = true; } else { ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash); return Request.WriteResponse(HttpResponseCode::BadRequest); } } else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { ZenCacheValue ExistingValue; if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { Succeeded = true; } } // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream. if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Namespace = *Namespace, .Key = Key}); } Results.push_back(Succeeded); ZEN_DEBUG("PUTCACHEVALUES - '{}/{}/{}' {}, '{}'", *Namespace, Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid"); } if (Results.empty()) { return Request.WriteResponse(HttpResponseCode::BadRequest); } CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); for (bool Value : Results) { ResponseObject.AddBool(Value); } ResponseObject.EndArray(); CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; RpcResponse.Save(MemStream); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } void HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::optional Namespace = GetRpcRequestNamespace(Params); if (!Namespace) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } struct RequestData { CacheKey Key; CachePolicy Policy; IoHash RawHash = IoHash::Zero; uint64_t RawSize = 0; CompressedBuffer Result; }; std::vector Requests; ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); std::vector RemoteRequestIndexes; for (CbFieldView RequestField : Params["Requests"sv]) { Stopwatch Timer; RequestData& Request = Requests.emplace_back(); CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); if (!GetRpcRequestCacheKey(KeyObject, Request.Key)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } PolicyText = RequestObject["Policy"sv].AsString(); Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; CacheKey& Key = Request.Key; CachePolicy Policy = Request.Policy; CompressedBuffer& Result = Request.Result; ZenCacheValue CacheValue; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (m_CacheStore.Get(*Namespace, Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); } } if (Result) { ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", *Namespace, Key.Bucket, Key.Hash, NiceBytes(Result.GetCompressed().GetSize()), "LOCAL"sv, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; } else if (EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) { RemoteRequestIndexes.push_back(Requests.size() - 1); } else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) { // If they requested no query, do not record this as a miss ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}/{}'", *Namespace, Key.Bucket, Key.Hash); } else { ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", *Namespace, Key.Bucket, Key.Hash, "LOCAL"sv, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; } } if (!RemoteRequestIndexes.empty()) { std::vector RequestedRecordsData; std::vector CacheValueRequests; RequestedRecordsData.reserve(RemoteRequestIndexes.size()); CacheValueRequests.reserve(RemoteRequestIndexes.size()); for (size_t Index : RemoteRequestIndexes) { RequestData& Request = Requests[Index]; RequestedRecordsData.push_back({.Key = {Request.Key.Bucket, Request.Key.Hash}, .Policy = ConvertToUpstream(Request.Policy)}); CacheValueRequests.push_back(&RequestedRecordsData.back()); } Stopwatch Timer; m_UpstreamCache.GetCacheValues( *Namespace, CacheValueRequests, [this, Namespace, &RequestedRecordsData, &Requests, &RemoteRequestIndexes, &Timer](CacheValueGetCompleteParams&& Params) { CacheValueRequest& ChunkRequest = Params.Request; if (Params.RawHash != IoHash::Zero) { size_t RequestOffset = std::distance(RequestedRecordsData.data(), &ChunkRequest); size_t RequestIndex = RemoteRequestIndexes[RequestOffset]; RequestData& Request = Requests[RequestIndex]; Request.RawHash = Params.RawHash; Request.RawSize = Params.RawSize; const bool HasData = IsCompressedBinary(Params.Value.GetContentType()); const bool SkipData = EnumHasAllFlags(Request.Policy, CachePolicy::SkipData); const bool StoreData = EnumHasAllFlags(Request.Policy, CachePolicy::StoreLocal); const bool IsHit = SkipData || HasData; if (IsHit) { if (HasData && !SkipData) { Request.Result = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); } if (HasData && StoreData) { m_CacheStore.Put(*Namespace, Request.Key.Bucket, Request.Key.Hash, ZenCacheValue{Params.Value}); } ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}/{}' {} ({}) in {}", *Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, NiceBytes(Request.Result.GetCompressed().GetSize()), "UPSTREAM"sv, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; return; } } ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}/{}' ({}) in {}", *Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, "UPSTREAM"sv, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); m_CacheStats.MissCount++; }); } if (Requests.empty()) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } CbPackage RpcResponse; CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); for (const RequestData& Request : Requests) { ResponseObject.BeginObject(); { const CompressedBuffer& Result = Request.Result; if (Result) { ResponseObject.AddHash("RawHash"sv, IoHash::FromBLAKE3(Result.GetRawHash())); if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData)) { RpcResponse.AddAttachment(CbAttachment(Result)); } else { ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize()); } } else if (Request.RawHash != IoHash::Zero) { ResponseObject.AddHash("RawHash"sv, Request.RawHash); ResponseObject.AddInteger("RawSize"sv, Request.RawSize); } } ResponseObject.EndObject(); } ResponseObject.EndArray(); RpcResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; RpcResponse.Save(MemStream); HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } namespace cache::detail { struct RecordValue { Oid ValueId; IoHash ContentId; uint64_t RawSize; }; struct RecordBody { IoBuffer CacheValue; std::vector Values; std::string_view Source; CachePolicy DownstreamPolicy; bool Exists = false; bool HasRequest = false; bool ValuesRead = false; }; struct ChunkRequest { CacheChunkRequest* Key = nullptr; RecordBody* Record = nullptr; CompressedBuffer Value; std::string_view Source; uint64_t TotalSize = 0; uint64_t RequestedSize = 0; uint64_t RequestedOffset = 0; CachePolicy DownstreamPolicy; bool Exists = false; bool TotalSizeKnown = false; bool IsRecordRequest = false; }; } // namespace cache::detail static bool UseHandleRpcGetCacheChunksNew = true; void HttpStructuredCacheService::HandleRpcGetCacheChunksNew(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { using namespace cache::detail; ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); cacherequests::GetCacheChunksRequest Request; cacherequests::ChunksRequestPolicy Policy; if (!Request.Parse(RpcRequest, Policy)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } cacherequests::GetCacheChunksResult Result; Result.Results.resize(Request.Requests.size()); impl::GetCacheChunks(m_CacheStore, m_CidStore, m_UpstreamCache, m_CacheStats.HitCount, m_CacheStats.UpstreamHitCount, m_CacheStats.MissCount, Request, Policy, Result); #if 0 std::vector UpstreamChunkIndexes; UpstreamChunkIndexes.reserve(Request.Requests.size()); std::vector UpstreamRecords; std::vector MissingChunksIndexes; UpstreamRecords.reserve(Request.Requests.size()); cacherequests::CacheRecord CurrentCacheRecord; // Requests should be sorted by Key.Hash - it currently does not care about bucket, but is that an issue? // Should we just start by checking for which records we need and handle that first and the run through the cache values themselves? for (size_t RequestIndex = 0; RequestIndex < Request.Requests.size(); ++RequestIndex) { cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[RequestIndex]; cacherequests::CacheValueResult& ChunkResult = Result.Results[RequestIndex]; CachePolicy ChunkPolicy = cacherequests::GetEffectiveChunkPolicy(Policy, RequestIndex); const bool HasValueId = ChunkRequest.ValueId != Oid::Zero; const bool HasChunkId = ChunkRequest.ChunkId != IoHash::Zero; const bool QueryLocal = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryLocal); const bool SkipData = EnumHasAnyFlags(ChunkPolicy, CachePolicy::SkipData); const bool QueryRemote = EnumHasAnyFlags(ChunkPolicy, CachePolicy::QueryRemote); const bool GetInlineCacheValue = !HasValueId && !HasChunkId; if (GetInlineCacheValue) { if (QueryLocal) { ZenCacheValue RecordCacheValue; if (m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value)); if (!Compressed) { Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value)); } if (!SkipData) { ChunkResult.Body = Compressed; } ChunkResult.RawSize = Compressed.GetRawSize(); ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); continue; } } if (QueryRemote) { UpstreamChunkIndexes.push_back(RequestIndex); } continue; } if (!HasChunkId) { ZEN_ASSERT_SLOW(HasValueId); if (CurrentCacheRecord.Key != ChunkRequest.Key) { CurrentCacheRecord.Values.clear(); ZenCacheValue RecordCacheValue; if (QueryLocal && m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) { if (RecordCacheValue.Value.GetContentType() != ZenContentType::kCbObject) { ZEN_WARN("local record {}/{}/{} is not a structured object, skipping.", Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash); // continue; } CbObject Record = LoadCompactBinaryObject(RecordCacheValue.Value); if (!CurrentCacheRecord.Parse(Record)) { ZEN_WARN("local record {}/{}/{} is corrupt, skipping", Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash); // continue; } CurrentCacheRecord.Key = ChunkRequest.Key; } else { if (QueryRemote && (UpstreamRecords.empty() || UpstreamRecords.back() != ChunkRequest.Key)) { UpstreamRecords.push_back(ChunkRequest.Key); } } } auto FindIt = std::find_if(CurrentCacheRecord.Values.begin(), CurrentCacheRecord.Values.end(), [&ChunkRequest](const cacherequests::CacheRecordValue& Value) { return Value.Id == ChunkRequest.ValueId; }); if (FindIt == CurrentCacheRecord.Values.end()) { MissingChunksIndexes.push_back(RequestIndex); continue; } ZEN_ASSERT(FindIt->RawHash != IoHash::Zero); ChunkRequest.ChunkId = FindIt->RawHash; ChunkRequest.RawSize = FindIt->RawSize; } if (QueryLocal) { ZenCacheValue RecordCacheValue; if (m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) { // Parse CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value)); if (!Compressed) { Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value)); } if (!SkipData) { ChunkResult.Body = Compressed; } ChunkResult.RawSize = Compressed.GetRawSize(); ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); continue; } } if (QueryRemote) { UpstreamChunkIndexes.push_back(RequestIndex); } } if (!UpstreamRecords.empty()) { // We need to do GetCacheValue for the actual record - we don't want to fetch cache records... cacherequests::GetCacheRecordsRequest RecordsRequest = {.Namespace = Request.Namespace, .Requests = UpstreamRecords}; cacherequests::RecordsRequestPolicy RecordsRequestPolicy = {.DefaultPolicy = Policy.DefaultPolicy | CachePolicy::SkipData | CachePolicy::SkipMeta}; cacherequests::GetCacheRecordsResult RecordsResult; RecordsRequestPolicy.RecordPolicies.resize(UpstreamRecords.size()); m_UpstreamCache.GetCacheRecords(RecordsRequest, RecordsRequestPolicy, RecordsResult, {}); size_t MissingChunkOffset = 0; for (const auto& RecordResult : RecordsResult.Results) { if (RecordResult) { while (MissingChunkOffset < MissingChunksIndexes.size()) { cacherequests::GetCacheChunkRequest& ChunkRequest = Request.Requests[MissingChunksIndexes[MissingChunkOffset]]; if (ChunkRequest.Key != RecordResult->Key) { auto FindIt = std::find_if( CurrentCacheRecord.Values.begin(), CurrentCacheRecord.Values.end(), [&ChunkRequest](const cacherequests::CacheRecordValue& Value) { return Value.Id == ChunkRequest.ValueId; }); if (FindIt == CurrentCacheRecord.Values.end()) { // The ValueId is not part of the the Record, ignore it continue; } ZEN_ASSERT(FindIt->RawHash != IoHash::Zero); ChunkRequest.ChunkId = FindIt->RawHash; ChunkRequest.RawSize = FindIt->RawSize; break; } MissingChunkOffset++; } } } // Fill in for (size_t Index = 0; Index < Policy.ChunkPolicies.size(); ++Index) { } RecordsRequest.Requests = UpstreamRecords; } // Try to fetch records locally // Fill in RawHash for any chunk request matching the found cache record // Add any RecordKeys to UpstreamRecords for any records that does not exist locally // Request all UpstreamRecords from upstream // Fill in RawHash for any chunk request matching the found cache record // Run through all requests and fetch the chunks we need where we have filled in RawHash // If chunk is not found locally, add to UpstreamChunkIndexes and try to fetch from upstream # if 0 { { // Try to get local record, if we can't find it add the chunk request to fetch from upstream - if we do, will me miss // opportunity to find payload locally? if (QueryLocal) { ZenCacheValue RecordCacheValue; if (m_CacheStore.Get(Request.Namespace, ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, RecordCacheValue)) { // Parse CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(RecordCacheValue.Value)); if (!Compressed) { Compressed = CompressedBuffer::Compress(SharedBuffer(RecordCacheValue.Value)); } if (!SkipData) { ChunkResult.Body = Compressed; } ChunkResult.RawSize = Compressed.GetRawSize(); ChunkResult.RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); continue; } } if (QueryRemote) { // We need the record to be able to fill this request if (UpstreamRecords.empty() || UpstreamRecords.back() != ChunkRequest.Key) { UpstreamRecords.push_back(ChunkRequest.Key); } } continue; } } # endif # if 0 { if (QueryLocal) { if (SkipData && Result.Results[RequestIndex].RawSize != 0) { if (m_CidStore.ContainsChunk(ChunkRequest.ChunkId)) { continue; } else { // Fail! Not found! Result.Results[RequestIndex].RawSize = 0; Result.Results[RequestIndex].RawHash = IoHash::Zero; } } IoBuffer Value = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId); if (Value) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Value)); if (!Compressed) { Compressed = CompressedBuffer::Compress(SharedBuffer(Value)); } if (Result.Results[RequestIndex].RawHash != IoHash::FromBLAKE3(Compressed.GetRawHash())) { // Fail! Not found! Result.Results[RequestIndex].RawSize = 0; Result.Results[RequestIndex].RawHash = IoHash::Zero; continue; } Result.Results[RequestIndex].RawSize = Compressed.GetRawSize(); if (!SkipData) { Result.Results[RequestIndex].Body = Compressed; } continue; } } if (QueryRemote) { UpstreamChunkIndexes.push_back(RequestIndex); } } # endif if (!UpstreamRecords.empty()) { // We need to do GetCacheValue for the actual record - we don't want to fetch cache records... // cacherequests::GetCacheRecordsRequest RecordsRequest = {.Namespace = Request.Namespace, .Requests = UpstreamRecords}; // cacherequests::RecordsRequestPolicy RecordsRequestPolicy = {.DefaultPolicy = Policy.DefaultPolicy | CachePolicy::SkipData //| CachePolicy::SkipMeta}; cacherequests::GetCacheRecordsResult RecordsResult; // RecordsRequestPolicy.RecordPolicies.resize(UpstreamRecords.size()); // m_UpstreamCache.GetCacheRecords(RecordsRequest, RecordsRequestPolicy, RecordsResult, {}); // // Fill in // for (size_t Index = 0; Index < Policy.ChunkPolicies.size(); ++Index) // { // // } // RecordsRequest.Requests = UpstreamRecords; } if (!UpstreamChunkIndexes.empty()) { // m_UpstreamCache.GetCacheChunks(Request.Namespace, Request, Policy, UpstreamChunkIndexes); } #endif std::string Namespace; std::vector RecordKeys; // Data about a Record necessary to identify it to the upstream std::vector Records; // Scratch-space data about a Record when fulfilling RecordRequests std::vector RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream std::vector Requests; // Intermediate and result data about a ChunkRequest std::vector RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key std::vector ValueRequests; // The ChunkRequests that are requesting a Value Key std::vector UpstreamChunks; // ChunkRequests that we need to send to the upstream // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we // have it locally, and otherwise append a request for the payload to UpstreamChunks GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks); // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks); // Call GetCacheChunks on the upstream for any payloads we do not have locally GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); // Send the payload and descriptive data about each chunk to the client WriteGetCacheChunksResponse(Namespace, Requests, HttpRequest); } void HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { if (UseHandleRpcGetCacheChunksNew) { HandleRpcGetCacheChunksNew(HttpRequest, RpcRequest); return; } using namespace cache::detail; ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); std::string Namespace; std::vector RecordKeys; // Data about a Record necessary to identify it to the upstream std::vector Records; // Scratch-space data about a Record when fulfilling RecordRequests std::vector RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream std::vector Requests; // Intermediate and result data about a ChunkRequest std::vector RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key std::vector ValueRequests; // The ChunkRequests that are requesting a Value Key std::vector UpstreamChunks; // ChunkRequests that we need to send to the upstream // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we // have it locally, and otherwise append a request for the payload to UpstreamChunks GetLocalCacheRecords(Namespace, RecordKeys, Records, RecordRequests, UpstreamChunks); // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks GetLocalCacheValues(Namespace, ValueRequests, UpstreamChunks); // Call GetCacheChunks on the upstream for any payloads we do not have locally GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests); // Send the payload and descriptive data about each chunk to the client WriteGetCacheChunksResponse(Namespace, Requests, HttpRequest); } bool HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace, std::vector& RecordKeys, std::vector& Records, std::vector& RequestKeys, std::vector& Requests, std::vector& RecordRequests, std::vector& ValueRequests, CbObjectView RpcRequest) { using namespace cache::detail; ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; std::optional NamespaceText = GetRpcRequestNamespace(Params); if (!NamespaceText) { ZEN_WARN("GetCacheChunks: Invalid namespace in ChunkRequest."); return false; } Namespace = *NamespaceText; CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); size_t NumRequests = static_cast(ChunkRequestsArray.Num()); // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed, // we will need to change the pointers to indexes to handle reallocations. RecordKeys.reserve(NumRequests); Records.reserve(NumRequests); RequestKeys.reserve(NumRequests); Requests.reserve(NumRequests); RecordRequests.reserve(NumRequests); ValueRequests.reserve(NumRequests); CacheKeyRequest* PreviousRecordKey = nullptr; RecordBody* PreviousRecord = nullptr; for (CbFieldView RequestView : ChunkRequestsArray) { CbObjectView RequestObject = RequestView.AsObjectView(); CacheChunkRequest& RequestKey = RequestKeys.emplace_back(); ChunkRequest& Request = Requests.emplace_back(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); Request.Key = &RequestKey; if (!GetRpcRequestCacheKey(KeyObject, Request.Key->Key)) { ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest."); return false; } RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash(); RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId(); RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); Request.RequestedSize = RequestKey.RawSize; Request.RequestedOffset = RequestKey.RawOffset; std::string_view PolicyText = RequestObject["Policy"sv].AsString(); Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; Request.IsRecordRequest = (bool)RequestKey.ValueId; if (!Request.IsRecordRequest) { ValueRequests.push_back(&Request); } else { RecordRequests.push_back(&Request); CacheKeyRequest* RecordKey = nullptr; RecordBody* Record = nullptr; if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key) { RecordKey = &RecordKeys.emplace_back(); PreviousRecordKey = RecordKey; Record = &Records.emplace_back(); PreviousRecord = Record; RecordKey->Key = RequestKey.Key; } else if (RequestKey.Key == PreviousRecordKey->Key) { RecordKey = PreviousRecordKey; Record = PreviousRecord; } else { ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", RequestKey.Key.Bucket, RequestKey.Key.Hash, PreviousRecordKey->Key.Bucket, PreviousRecordKey->Key.Hash); return false; } Request.Record = Record; if (RequestKey.ChunkId == RequestKey.ChunkId.Zero) { Record->DownstreamPolicy = Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy; Record->HasRequest = true; } } } if (Requests.empty()) { return false; } return true; } void HttpStructuredCacheService::GetLocalCacheRecords(std::string_view Namespace, std::vector& RecordKeys, std::vector& Records, std::vector& RecordRequests, std::vector& OutUpstreamChunks) { using namespace cache::detail; std::vector UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) { CacheKeyRequest& RecordKey = RecordKeys[RecordIndex]; RecordBody& Record = Records[RecordIndex]; if (Record.HasRequest) { Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta; if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; if (m_CacheStore.Get(Namespace, RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) { Record.Exists = true; Record.CacheValue = std::move(CacheValue.Value); Record.Source = "LOCAL"sv; } } if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote)) { RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy)); UpstreamRecordRequests.push_back(&RecordKey); } } } if (!UpstreamRecordRequests.empty()) { const auto OnCacheRecordGetComplete = [this, Namespace, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; } CacheKeyRequest& RecordKey = Params.Request; size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); RecordBody& Record = Records[RecordIndex]; const CacheKey& Key = RecordKey.Key; Record.Exists = true; CbObject ObjectBuffer = CbObject::Clone(Params.Record); Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); Record.CacheValue.SetContentType(ZenContentType::kCbObject); Record.Source = "UPSTREAM"sv; if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) { m_CacheStore.Put(Namespace, Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } }; m_UpstreamCache.GetCacheRecords(Namespace, UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } std::vector UpstreamPayloadRequests; for (ChunkRequest* Request : RecordRequests) { if (Request->Key->ChunkId == IoHash::Zero) { // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) // is missing, parse the cache record and try to find the raw hash from the ValueId. RecordBody& Record = *Request->Record; if (!Record.ValuesRead) { Record.ValuesRead = true; if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) { CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); Record.Values.reserve(ValuesArray.Num()); for (CbFieldView ValueField : ValuesArray) { CbObjectView ValueObject = ValueField.AsObjectView(); Oid ValueId = ValueObject["Id"sv].AsObjectId(); CbFieldView RawHashField = ValueObject["RawHash"sv]; IoHash RawHash = RawHashField.AsBinaryAttachment(); if (ValueId && !RawHashField.HasError()) { Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); } } } } for (const RecordValue& Value : Record.Values) { if (Value.ValueId == Request->Key->ValueId) { Request->Key->ChunkId = Value.ContentId; Request->TotalSize = Value.RawSize; Request->TotalSizeKnown = true; break; } } } // Now load the ContentId from the local ContentIdStore or from the upstream if (Request->Key->ChunkId != IoHash::Zero) { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown) { if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) { Request->Exists = true; Request->Source = "LOCAL"sv; } } else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); if (Compressed) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Compressed; } Request->Exists = true; Request->TotalSize = Compressed.GetRawSize(); Request->TotalSizeKnown = true; Request->Source = "LOCAL"sv; } } } if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) { Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy); OutUpstreamChunks.push_back(Request->Key); } } } } void HttpStructuredCacheService::GetLocalCacheValues(std::string_view Namespace, std::vector& ValueRequests, std::vector& OutUpstreamChunks) { using namespace cache::detail; for (ChunkRequest* Request : ValueRequests) { if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; if (m_CacheStore.Get(Namespace, Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) { if (IsCompressedBinary(CacheValue.Value.GetContentType())) { CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); if (Result) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Result; } Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash()); Request->Exists = true; Request->TotalSize = Result.GetRawSize(); Request->TotalSizeKnown = true; Request->Source = "LOCAL"sv; } } } } if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal)) { // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it // locally Request->Key->RawOffset = 0; Request->Key->RawSize = UINT64_MAX; } OutUpstreamChunks.push_back(Request->Key); } } } void HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Namespace, std::vector& UpstreamChunks, std::vector& RequestKeys, std::vector& Requests) { using namespace cache::detail; if (!UpstreamChunks.empty()) { const auto OnCacheChunksGetComplete = [this, Namespace, &RequestKeys, &Requests](CacheChunkGetCompleteParams&& Params) { if (Params.RawHash == Params.RawHash.Zero) { return; } CacheChunkRequest& Key = Params.Request; size_t RequestIndex = std::distance(RequestKeys.data(), &Key); ChunkRequest& Request = Requests[RequestIndex]; if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); if (!Compressed || Compressed.GetRawSize() != Params.RawSize || IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash) { return; } if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal)) { if (Request.IsRecordRequest) { m_CidStore.AddChunk(Compressed); } else { m_CacheStore.Put(Namespace, Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { Request.Value = std::move(Compressed); } } Key.ChunkId = Params.RawHash; Request.Exists = true; Request.TotalSize = Params.RawSize; Request.TotalSizeKnown = true; Request.Source = "UPSTREAM"sv; m_CacheStats.UpstreamHitCount++; }; m_UpstreamCache.GetCacheChunks(Namespace, UpstreamChunks, std::move(OnCacheChunksGetComplete)); } } void HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, std::vector& Requests, zen::HttpServerRequest& HttpRequest) { using namespace cache::detail; CbPackage RpcResponse; CbObjectWriter Writer; Writer.BeginArray("Result"sv); for (ChunkRequest& Request : Requests) { Writer.BeginObject(); { if (Request.Exists) { Writer.AddHash("RawHash"sv, Request.Key->ChunkId); if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { RpcResponse.AddAttachment(CbAttachment(Request.Value)); } else { Writer.AddInteger("RawSize"sv, Request.TotalSize); } ZEN_DEBUG("HIT - '{}/{}/{}/{}' {} '{}' ({})", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId, NiceBytes(Request.TotalSize), Request.IsRecordRequest ? "Record"sv : "Value"sv, Request.Source); m_CacheStats.HitCount++; } else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) { ZEN_DEBUG("SKIP - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); } else { ZEN_DEBUG("MISS - '{}/{}/{}/{}'", Namespace, Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); m_CacheStats.MissCount++; } } Writer.EndObject(); } Writer.EndArray(); RpcResponse.SetObject(Writer.Save()); BinaryWriter MemStream; RpcResponse.Save(MemStream); HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } void HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; EmitSnapshot("requests", m_HttpRequests, Cbo); EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); const uint64_t HitCount = m_CacheStats.HitCount; const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; const uint64_t MissCount = m_CacheStats.MissCount; const uint64_t TotalCount = HitCount + MissCount; const CidStoreSize CidSize = m_CidStore.TotalSize(); const GcStorageSize CacheSize = m_CacheStore.StorageSize(); Cbo.BeginObject("cache"); Cbo.BeginObject("size"); Cbo << "disk" << CacheSize.DiskSize; Cbo << "memory" << CacheSize.MemorySize; Cbo.EndObject(); Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); Cbo << "hits" << HitCount << "misses" << MissCount; Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); Cbo.EndObject(); Cbo.BeginObject("upstream"); m_UpstreamCache.GetStatus(Cbo); Cbo.EndObject(); Cbo.BeginObject("cid"); Cbo.BeginObject("size"); Cbo << "tiny" << CidSize.TinySize; Cbo << "small" << CidSize.SmallSize; Cbo << "large" << CidSize.LargeSize; Cbo << "total" << CidSize.TotalSize; Cbo.EndObject(); Cbo.EndObject(); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; Cbo << "ok" << true; Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } #if ZEN_WITH_TESTS TEST_CASE("z$service.parse.relative.Uri") { HttpRequestData LegacyBucketRequestBecomesNamespaceRequest; CHECK(HttpRequestParseRelativeUri("test", LegacyBucketRequestBecomesNamespaceRequest)); CHECK(LegacyBucketRequestBecomesNamespaceRequest.Namespace == "test"sv); CHECK(!LegacyBucketRequestBecomesNamespaceRequest.Bucket.has_value()); CHECK(!LegacyBucketRequestBecomesNamespaceRequest.HashKey.has_value()); CHECK(!LegacyBucketRequestBecomesNamespaceRequest.ValueContentId.has_value()); HttpRequestData LegacyHashKeyRequest; CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", LegacyHashKeyRequest)); CHECK(LegacyHashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); CHECK(LegacyHashKeyRequest.Bucket == "test"sv); CHECK(LegacyHashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); CHECK(!LegacyHashKeyRequest.ValueContentId.has_value()); HttpRequestData LegacyValueContentIdRequest; CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", LegacyValueContentIdRequest)); CHECK(LegacyValueContentIdRequest.Namespace == ZenCacheStore::DefaultNamespace); CHECK(LegacyValueContentIdRequest.Bucket == "test"sv); CHECK(LegacyValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); CHECK(LegacyValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); HttpRequestData V2DefaultNamespaceRequest; CHECK(HttpRequestParseRelativeUri("ue4.ddc", V2DefaultNamespaceRequest)); CHECK(V2DefaultNamespaceRequest.Namespace == "ue4.ddc"); CHECK(!V2DefaultNamespaceRequest.Bucket.has_value()); CHECK(!V2DefaultNamespaceRequest.HashKey.has_value()); CHECK(!V2DefaultNamespaceRequest.ValueContentId.has_value()); HttpRequestData V2NamespaceRequest; CHECK(HttpRequestParseRelativeUri("nicenamespace", V2NamespaceRequest)); CHECK(V2NamespaceRequest.Namespace == "nicenamespace"sv); CHECK(!V2NamespaceRequest.Bucket.has_value()); CHECK(!V2NamespaceRequest.HashKey.has_value()); CHECK(!V2NamespaceRequest.ValueContentId.has_value()); HttpRequestData V2BucketRequestWithDefaultNamespace; CHECK(HttpRequestParseRelativeUri("ue4.ddc/test", V2BucketRequestWithDefaultNamespace)); CHECK(V2BucketRequestWithDefaultNamespace.Namespace == "ue4.ddc"); CHECK(V2BucketRequestWithDefaultNamespace.Bucket == "test"sv); CHECK(!V2BucketRequestWithDefaultNamespace.HashKey.has_value()); CHECK(!V2BucketRequestWithDefaultNamespace.ValueContentId.has_value()); HttpRequestData V2BucketRequestWithNamespace; CHECK(HttpRequestParseRelativeUri("nicenamespace/test", V2BucketRequestWithNamespace)); CHECK(V2BucketRequestWithNamespace.Namespace == "nicenamespace"sv); CHECK(V2BucketRequestWithNamespace.Bucket == "test"sv); CHECK(!V2BucketRequestWithNamespace.HashKey.has_value()); CHECK(!V2BucketRequestWithNamespace.ValueContentId.has_value()); HttpRequestData V2HashKeyRequest; CHECK(HttpRequestParseRelativeUri("test/0123456789abcdef12340123456789abcdef1234", V2HashKeyRequest)); CHECK(V2HashKeyRequest.Namespace == ZenCacheStore::DefaultNamespace); CHECK(V2HashKeyRequest.Bucket == "test"); CHECK(V2HashKeyRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); CHECK(!V2HashKeyRequest.ValueContentId.has_value()); HttpRequestData V2ValueContentIdRequest; CHECK( HttpRequestParseRelativeUri("nicenamespace/test/0123456789abcdef12340123456789abcdef1234/56789abcdef12345678956789abcdef123456789", V2ValueContentIdRequest)); CHECK(V2ValueContentIdRequest.Namespace == "nicenamespace"sv); CHECK(V2ValueContentIdRequest.Bucket == "test"sv); CHECK(V2ValueContentIdRequest.HashKey == IoHash::FromHexString("0123456789abcdef12340123456789abcdef1234"sv)); CHECK(V2ValueContentIdRequest.ValueContentId == IoHash::FromHexString("56789abcdef12345678956789abcdef123456789"sv)); HttpRequestData Invalid; CHECK(!HttpRequestParseRelativeUri("", Invalid)); CHECK(!HttpRequestParseRelativeUri("/", Invalid)); CHECK(!HttpRequestParseRelativeUri("bad\2_namespace", Invalid)); CHECK(!HttpRequestParseRelativeUri("nice/\2\1bucket", Invalid)); CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789a", Invalid)); CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcdef1234", Invalid)); CHECK(!HttpRequestParseRelativeUri("namespace/bucket/pppppppp89abcdef12340123456789abcdef1234", Invalid)); CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/56789abcd", Invalid)); CHECK(!HttpRequestParseRelativeUri("namespace/bucket/0123456789abcdef12340123456789abcdef1234/ppppppppdef12345678956789abcdef123456789", Invalid)); } namespace testutils { class MemCacheStore : public ZenCacheStoreBase { public: virtual bool Get(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue) override final { if (auto NamespaceIt = m_Store.find(std::string(Namespace)); NamespaceIt != m_Store.end()) { if (auto BucketIt = NamespaceIt->second.find(std::string(Bucket)); BucketIt != NamespaceIt->second.end()) { if (auto ValueIt = BucketIt->second.find(HashKey); ValueIt != BucketIt->second.end()) { OutValue = ValueIt->second; return true; } } } return false; }; virtual void Put(std::string_view Namespace, std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value) override final { m_Store[std::string(Namespace)][std::string(Bucket)][HashKey] = Value; }; private: typedef std::unordered_map Bucket; typedef std::unordered_map Namespace; std::map m_Store; }; class MemCidStore : public CidStoreBase { public: virtual InsertResult AddChunk(const CompressedBuffer& ChunkData) override final { IoHash Key = IoHash::FromBLAKE3(ChunkData.GetRawHash()); IoBuffer Data = ChunkData.GetCompressed().Flatten().AsIoBuffer(); if (auto It = m_Store.find(Key); It != m_Store.end()) { It->second = Data; return InsertResult{.New = false}; } m_Store[Key] = Data; return InsertResult{.New = true}; } virtual IoBuffer FindChunkByCid(const IoHash& DecompressedId) override final { if (auto It = m_Store.find(DecompressedId); It != m_Store.end()) { return It->second; } return IoBuffer{}; } virtual bool ContainsChunk(const IoHash& DecompressedId) override final { return m_Store.contains(DecompressedId); } private: std::unordered_map m_Store; }; class LambdaUpstream : public UpstreamCache { public: typedef std::function IncludeIndexes)> GetCacheRecordsFunc; std::vector GetCacheRecordsCallbacks; LambdaUpstream() {} void Initialize() override final {} void RegisterEndpoint(std::unique_ptr) override final {} void IterateEndpoints(std::function&&) override final{}; void GetCacheRecords(const cacherequests::GetCacheRecordsRequest& Request, const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span IncludeIndexes) override final { CHECK(GetCacheRecordsCallbacks.size() > 0); GetCacheRecordsCallbacks[0](Request, Policy, OutResult, IncludeIndexes); GetCacheRecordsCallbacks.erase(GetCacheRecordsCallbacks.begin()); } GetUpstreamCacheResult GetCacheRecord(std::string_view, const CacheKey&, ZenContentType) override final { return GetUpstreamCacheResult{}; } void GetCacheRecords(std::string_view, std::span, OnCacheRecordGetComplete&&) override {} GetUpstreamCacheResult GetCacheValue(std::string_view, const CacheKey&, const IoHash&) override final { return GetUpstreamCacheResult{}; } void GetCacheValues(std::string_view, std::span, OnCacheValueGetComplete&&) override final {} void GetCacheChunks(std::string_view, std::span, OnCacheChunksGetComplete&&) override final {} void EnqueueUpstream(UpstreamCacheRecord) override final {} void GetStatus(CbObjectWriter&) override final {} }; UniqueBuffer GenerateData(uint64_t Size) { auto Buf = zen::UniqueBuffer::Alloc(Size); uint8_t* Data = reinterpret_cast(Buf.GetData()); for (uint64_t Idx = 0; Idx < Size; Idx++) { Data[Idx] = Idx % 256; } return Buf; }; CompressedBuffer GenerateCompressedValue(uint64_t Size) { UniqueBuffer Data = GenerateData(Size); IoBuffer Buffer(IoBuffer::Wrap, Data.GetData(), Data.GetSize()); return CompressedBuffer::Compress(SharedBuffer(Buffer)); }; IoHash Put(CidStoreBase& Store, const CompressedBuffer& ChunkData) { IoHash Key = IoHash::FromBLAKE3(ChunkData.GetRawHash()); Store.AddChunk(ChunkData); return Key; } void Put(CidStoreBase& Store, std::span ChunkDatas) { for (auto ChunkData : ChunkDatas) { Put(Store, ChunkData); } } size_t Count(CidStoreBase& Store, std::span ChunkDatas) { size_t Found = 0; for (const auto& ChunkData : ChunkDatas) { if (Store.ContainsChunk(IoHash::FromBLAKE3(ChunkData.GetRawHash()))) { Found++; } } return Found; } cacherequests::CacheRecord CreateRecord(std::string_view Bucket, std::span AttachmentSizes, std::vector& OutAttachments) { cacherequests::CacheRecord Record; IoHashStream Hasher; Hasher.Append(Bucket.data(), Bucket.size()); Hasher.Append(AttachmentSizes.data(), AttachmentSizes.size() * sizeof(size_t)); Record.Key.Hash = Hasher.GetHash(); Record.Key.Bucket = Bucket; Record.Values.reserve(AttachmentSizes.size()); OutAttachments.reserve(AttachmentSizes.size()); for (const auto& AttachmentSize : AttachmentSizes) { CompressedBuffer Attachment = GenerateCompressedValue(AttachmentSize); Record.Values.push_back( {.Id = Oid::NewOid(), .RawHash = IoHash::FromBLAKE3(Attachment.GetRawHash()), .RawSize = Attachment.GetRawSize()}); OutAttachments.emplace_back(Attachment); } return Record; } IoBuffer RecordToBuffer(const cacherequests::CacheRecord Record) { CbObjectWriter Writer; Record.Format(Writer); BinaryWriter MemWriter; Writer.Save(MemWriter); IoBuffer Result(IoBuffer::Clone, MemWriter.Data(), MemWriter.Size()); Result.SetContentType(ZenContentType::kCbObject); return Result; } void Put(ZenCacheStoreBase& CacheStore, std::string_view Namespace, const std::span Records) { for (const auto& Record : Records) { CacheStore.Put(Namespace, Record.Key.Bucket, Record.Key.Hash, {.Value = RecordToBuffer(Record)}); } } void Put(ZenCacheStoreBase& CacheStore, std::string_view Namespace, const cacherequests::CacheRecord& Record) { Put(CacheStore, Namespace, std::vector({Record})); } size_t Count(ZenCacheStoreBase& CacheStore, std::string_view Namespace, const std::span Keys) { size_t Found = 0; ZenCacheValue RecordValue; for (const auto& Key : Keys) { if (CacheStore.Get(Namespace, Key.Bucket, Key.Hash, RecordValue)) { Found++; } } return Found; } size_t Count(ZenCacheStoreBase& CacheStore, std::string_view Namespace, const CacheKey& Key) { return Count(CacheStore, Namespace, std::vector({Key})); } } // namespace testutils TEST_CASE("zcache.getcacherecords") { using namespace testutils; MemCidStore CidStore; MemCacheStore CacheStore; LambdaUpstream Upstream; std::atomic_uint64_t HitCount = 0; std::atomic_uint64_t UpstreamHitCount = 0; std::atomic_uint64_t MissCount = 0; std::vector> Attachments; Attachments.resize(3); std::vector Records = {CreateRecord("bucket", std::vector({777, 412, 133}), Attachments[0]), CreateRecord("bucket", std::vector({58, 8634, 34}), Attachments[1]), CreateRecord("bucket", std::vector({46, 828, 976}), Attachments[2])}; SUBCASE("defaultpolicy.notincache") { cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; Upstream.GetCacheRecordsCallbacks.emplace_back([](const cacherequests::GetCacheRecordsRequest& Request, const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span IncludeIndexes) { CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::SkipMeta)); CHECK(Request.Requests.size() == 1); CHECK(Policy.RecordPolicies.size() == 1); CHECK(cacherequests::GetEffectiveRecordPolicy(Policy, 0) == Policy.DefaultPolicy); CHECK(IncludeIndexes.size() == 1); CHECK(IncludeIndexes[0] == 0); OutResult.Results.resize(1); }); CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); CHECK(!Result.Results[0].has_value()); CHECK(HitCount == 0); CHECK(UpstreamHitCount == 0); CHECK(MissCount == 1); } SUBCASE("defaultpolicy.noattachmentsincache") { Put(CacheStore, "namespace", Records[0]); cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; Upstream.GetCacheRecordsCallbacks.emplace_back([](const cacherequests::GetCacheRecordsRequest& Request, const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span IncludeIndexes) { CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::SkipMeta)); CHECK(Request.Requests.size() == 1); CHECK(Policy.RecordPolicies.size() == 1); CHECK(Policy.RecordPolicies[0].has_value()); CHECK(Policy.RecordPolicies[0]->GetRecordPolicy() == Policy.DefaultPolicy); CHECK(Policy.RecordPolicies[0]->IsUniform()); CHECK(IncludeIndexes.size() == 1); CHECK(IncludeIndexes[0] == 0); OutResult.Results.resize(1); }); CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); // We did not request any partial results, so we should not get anything back CHECK(Result.Results.size() == 1); CHECK(!Result.Results[0].has_value()); CHECK(HitCount == 0); CHECK(UpstreamHitCount == 0); CHECK(MissCount == 1); } SUBCASE("acceptpartial.noattachmentsincache") { Put(CacheStore, "namespace", Records[0]); cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; Policy.DefaultPolicy |= CachePolicy::PartialRecord; cacherequests::GetCacheRecordsResult Result; Upstream.GetCacheRecordsCallbacks.emplace_back([](const cacherequests::GetCacheRecordsRequest& Request, const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span IncludeIndexes) { CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::PartialRecord | CachePolicy::SkipMeta)); CHECK(Request.Requests.size() == 1); CHECK(Policy.RecordPolicies.size() == 1); CHECK(Policy.RecordPolicies[0].has_value()); CHECK(Policy.RecordPolicies[0]->GetRecordPolicy() == Policy.DefaultPolicy); CHECK(Policy.RecordPolicies[0]->IsUniform()); CHECK(IncludeIndexes.size() == 1); CHECK(IncludeIndexes[0] == 0); OutResult.Results.resize(1); }); CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); CHECK(Result.Results[0].has_value()); CHECK(Result.Results[0]->Values.size() == 3); CHECK(!Result.Results[0]->Values[0].Body); CHECK(!Result.Results[0]->Values[1].Body); CHECK(!Result.Results[0]->Values[2].Body); CHECK(HitCount == 1); CHECK(UpstreamHitCount == 0); CHECK(MissCount == 0); } SUBCASE("defaultpolicy.allattachmentsinlocal") { Put(CacheStore, "namespace", Records[0]); Put(CidStore, Attachments[0]); cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); CHECK(Result.Results[0].has_value()); CHECK(Result.Results[0]->Values.size() == 3); CHECK(Result.Results[0]->Values[0].Body); CHECK(Result.Results[0]->Values[1].Body); CHECK(Result.Results[0]->Values[2].Body); CHECK(HitCount == 1); CHECK(UpstreamHitCount == 0); CHECK(MissCount == 0); } SUBCASE("defaultpolicy.allfromupstream") { cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; Upstream.GetCacheRecordsCallbacks.emplace_back([&](const cacherequests::GetCacheRecordsRequest& Request, const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span IncludeIndexes) { CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::SkipMeta)); CHECK(Request.Requests.size() == 1); CHECK(Policy.RecordPolicies.size() == 1); CHECK(!Policy.RecordPolicies[0].has_value()); CHECK(IncludeIndexes.size() == 1); CHECK(IncludeIndexes[0] == 0); OutResult.Results.resize(1); OutResult.Results[0] = cacherequests::GetCacheRecordResult{ .Key = Records[0].Key, .Values = {cacherequests::GetCacheRecordResultValue{.Id = Records[0].Values[0].Id, .RawHash = Records[0].Values[0].RawHash, .RawSize = Records[0].Values[0].RawSize, .Body = Attachments[0][0]}, cacherequests::GetCacheRecordResultValue{.Id = Records[1].Values[1].Id, .RawHash = Records[1].Values[1].RawHash, .RawSize = Records[1].Values[1].RawSize, .Body = Attachments[0][1]}, cacherequests::GetCacheRecordResultValue{.Id = Records[2].Values[2].Id, .RawHash = Records[2].Values[2].RawHash, .RawSize = Records[2].Values[2].RawSize, .Body = Attachments[0][2]}}}; }); CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); CHECK(Result.Results[0].has_value()); CHECK(Result.Results[0]->Values.size() == 3); CHECK(Result.Results[0]->Values[0].Body); CHECK(Result.Results[0]->Values[1].Body); CHECK(Result.Results[0]->Values[2].Body); CHECK(Count(CacheStore, "namespace", Records[0].Key) == 1); CHECK(Count(CidStore, Attachments[0]) == 3); CHECK(HitCount == 1); CHECK(UpstreamHitCount == 1); CHECK(MissCount == 0); } SUBCASE("defaultpolicy.attachmentsfromupstream") { Put(CacheStore, "namespace", Records[0]); cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; Upstream.GetCacheRecordsCallbacks.emplace_back([&](const cacherequests::GetCacheRecordsRequest& Request, const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span IncludeIndexes) { CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::SkipMeta)); CHECK(Request.Requests.size() == 1); CHECK(Policy.RecordPolicies.size() == 1); CHECK(Policy.RecordPolicies[0].has_value()); CHECK(Policy.RecordPolicies[0]->GetRecordPolicy() == Policy.DefaultPolicy); CHECK(Policy.RecordPolicies[0]->IsUniform()); CHECK(IncludeIndexes.size() == 1); CHECK(IncludeIndexes[0] == 0); OutResult.Results.resize(1); OutResult.Results[0] = cacherequests::GetCacheRecordResult{ .Key = Records[0].Key, .Values = {cacherequests::GetCacheRecordResultValue{.Id = Records[0].Values[0].Id, .RawHash = Records[0].Values[0].RawHash, .RawSize = Records[0].Values[0].RawSize, .Body = Attachments[0][0]}, cacherequests::GetCacheRecordResultValue{.Id = Records[0].Values[1].Id, .RawHash = Records[0].Values[1].RawHash, .RawSize = Records[0].Values[1].RawSize, .Body = Attachments[0][1]}, cacherequests::GetCacheRecordResultValue{.Id = Records[0].Values[2].Id, .RawHash = Records[0].Values[2].RawHash, .RawSize = Records[0].Values[2].RawSize, .Body = Attachments[0][2]}}}; }); CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); CHECK(Result.Results[0].has_value()); CHECK(Result.Results[0]->Values.size() == 3); CHECK(Result.Results[0]->Values[0].Body); CHECK(Result.Results[0]->Values[1].Body); CHECK(Result.Results[0]->Values[2].Body); ZenCacheValue RecordValue; CHECK(Count(CacheStore, "namespace", Records[0].Key) == 1); CHECK(Count(CidStore, Attachments[0]) == 3); CHECK(HitCount == 1); CHECK(UpstreamHitCount == 1); CHECK(MissCount == 0); } SUBCASE("defaultpolicy.partialattachmentsfromupstream") { Put(CacheStore, "namespace", Records[0]); Put(CidStore, std::vector{Attachments[0][0], Attachments[0][2]}); cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Default, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; Upstream.GetCacheRecordsCallbacks.emplace_back([&](const cacherequests::GetCacheRecordsRequest& Request, const cacherequests::RecordsRequestPolicy& Policy, cacherequests::GetCacheRecordsResult& OutResult, const std::span IncludeIndexes) { CHECK(Policy.DefaultPolicy == (CachePolicy::Default | CachePolicy::SkipMeta)); CHECK(Request.Requests.size() == 1); CHECK(Policy.RecordPolicies.size() == 1); CHECK(Policy.RecordPolicies[0].has_value()); CHECK(Policy.RecordPolicies[0]->GetRecordPolicy() == Policy.DefaultPolicy); CHECK(!Policy.RecordPolicies[0]->IsUniform()); CHECK(EnumHasAnyFlags(Policy.RecordPolicies[0]->GetValuePolicy(Records[0].Values[0].Id), CachePolicy::SkipData)); CHECK(!EnumHasAnyFlags(Policy.RecordPolicies[0]->GetValuePolicy(Records[0].Values[1].Id), CachePolicy::SkipData)); CHECK(EnumHasAnyFlags(Policy.RecordPolicies[0]->GetValuePolicy(Records[0].Values[2].Id), CachePolicy::SkipData)); CHECK(IncludeIndexes.size() == 1); CHECK(IncludeIndexes[0] == 0); OutResult.Results.resize(1); CHECK(OutResult.Results[0].has_value()); OutResult.Results[0]->Values[1] = cacherequests::GetCacheRecordResultValue{.Id = Records[0].Values[1].Id, .RawHash = Records[0].Values[1].RawHash, .RawSize = Records[0].Values[1].RawSize, .Body = Attachments[0][1]}; }); CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); CHECK(Result.Results[0].has_value()); CHECK(Result.Results[0]->Values.size() == 3); CHECK(Result.Results[0]->Values[0].Body); CHECK(Result.Results[0]->Values[1].Body); CHECK(Result.Results[0]->Values[2].Body); CHECK(Count(CacheStore, "namespace", Records[0].Key) == 1); CHECK(Count(CidStore, Attachments[0]) == 3); CHECK(HitCount == 1); CHECK(UpstreamHitCount == 1); CHECK(MissCount == 0); } SUBCASE("defaultpolicy.attachmentsinupstreamnoupstreampolicy") { Put(CacheStore, "namespace", Records[1]); cacherequests::GetCacheRecordsRequest Request = {.Namespace = "namespace", .Requests = {Records[0].Key}}; cacherequests::RecordsRequestPolicy Policy = {.DefaultPolicy = CachePolicy::Local, .RecordPolicies = {{}}}; cacherequests::GetCacheRecordsResult Result; CHECK(impl::GetCacheRecords(CacheStore, CidStore, Upstream, HitCount, UpstreamHitCount, MissCount, Request, Policy, Result) == HttpResponseCode::OK); CHECK(Result.Results.size() == 1); CHECK(!Result.Results[0].has_value()); ZenCacheValue RecordValue; CHECK(Count(CacheStore, "namespace", Records[0].Key) == 0); CHECK(Count(CidStore, Attachments[0]) == 0); CHECK(HitCount == 0); CHECK(UpstreamHitCount == 0); CHECK(MissCount == 1); } } #endif void z$service_forcelink() { } } // namespace zen