// Copyright Epic Games, Inc. All Rights Reserved. #include "structuredcache.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include //#include "cachekey.h" #include "monitoring/httpstats.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" #include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" #include #include #include #include #include #include namespace zen { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// CachePolicy ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) { std::string_view PolicyText = QueryParams.GetValue("Policy"sv); return !PolicyText.empty() ? zen::ParseCachePolicy(PolicyText) : CachePolicy::Default; } CacheRecordPolicy LoadCacheRecordPolicy(CbObjectView Object, CachePolicy DefaultPolicy = CachePolicy::Default) { OptionalCacheRecordPolicy Policy = CacheRecordPolicy::Load(Object); return Policy ? std::move(Policy).Get() : CacheRecordPolicy(DefaultPolicy); } struct AttachmentCount { uint32_t New = 0; uint32_t Valid = 0; uint32_t Invalid = 0; uint32_t Total = 0; }; struct PutRequestData { CacheKey Key; CbObjectView RecordObject; CacheRecordPolicy Policy; }; ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, CidStore& InCidStore, HttpStatsService& StatsService, HttpStatusService& StatusService, UpstreamCache& UpstreamCache) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) , m_CidStore(InCidStore) , m_UpstreamCache(UpstreamCache) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); } HttpStructuredCacheService::~HttpStructuredCacheService() { ZEN_INFO("closing structured cache"); m_StatsService.UnregisterHandler("z$", *this); m_StatusService.UnregisterHandler("z$", *this); } const char* HttpStructuredCacheService::BaseUri() const { return "/z$/"; } void HttpStructuredCacheService::Flush() { } void HttpStructuredCacheService::Scrub(ScrubContext& Ctx) { if (m_LastScrubTime == Ctx.ScrubTimestamp()) { return; } m_LastScrubTime = Ctx.ScrubTimestamp(); m_CidStore.Scrub(Ctx); m_CacheStore.Scrub(Ctx); } void HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { CacheRef Ref; metrics::OperationTiming::Scope $(m_HttpRequests); if (!ValidateKeyUri(Request, /* out */ Ref)) { std::string_view Key = Request.RelativeUri(); if (Key == "$rpc") { return HandleRpcRequest(Request); } if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference return HandleCacheBucketRequest(Request, Key); } return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } CachePolicy PolicyFromURL = ParseCachePolicy(Request.GetQueryParams()); if (Ref.ValueContentId == IoHash::Zero) { return HandleCacheRecordRequest(Request, Ref, PolicyFromURL); } else { return HandleCacheValueRequest(Request, Ref, PolicyFromURL); } return; } void HttpStructuredCacheService::RegisterHandlers(WebSocketServer& Server) { Server.RegisterRequestHandler("GetBinaryCacheValue"sv, *this); Server.RegisterRequestHandler("GetCacheValues"sv, *this); Server.RegisterRequestHandler("GetCacheRecords"sv, *this); Server.RegisterRequestHandler("GetCacheChunks"sv, *this); } class ResponseStreamWriter { public: ResponseStreamWriter(WebSocketServer& Server, const WebSocketMessage& Request, uint32_t RequestCount, uint32_t MaxBatchCount = ~uint32_t(0)); ~ResponseStreamWriter(); template void Append(Fn&& Append) { ZEN_ASSERT(m_ResponseCount < m_RequestCount); if (m_CurrentBatchCount == 0) { m_Writer.BeginObject(); m_Writer.BeginArray("Result"sv); } Append(m_StreamResponse, m_Writer); if (++m_CurrentBatchCount >= m_MaxBatchCount) { SendCurrentBatch(); } } bool Flush(); private: bool SendCurrentBatch(); WebSocketServer& m_Server; CbWriter m_Writer; CbPackage m_StreamResponse; WebSocketId m_SocketId; uint32_t m_CorrelationId; uint32_t m_RequestCount; uint32_t m_MaxBatchCount; uint32_t m_CurrentBatchCount{0}; uint32_t m_ResponseCount{0}; }; ResponseStreamWriter::ResponseStreamWriter(WebSocketServer& Server, const WebSocketMessage& Request, uint32_t RequestCount, uint32_t MaxBatchCount) : m_Server(Server) , m_SocketId(Request.SocketId()) , m_CorrelationId(Request.CorrelationId()) , m_RequestCount(RequestCount) , m_MaxBatchCount(MaxBatchCount) { } ResponseStreamWriter::~ResponseStreamWriter() { Flush(); } bool ResponseStreamWriter::SendCurrentBatch() { if (m_CurrentBatchCount > 0) { m_Writer.EndArray(); m_Writer.EndObject(); m_ResponseCount += m_CurrentBatchCount; m_CurrentBatchCount = 0; CbPackage StreamResponse = std::move(m_StreamResponse); StreamResponse.SetObject(m_Writer.Save().AsObject()); m_StreamResponse.Reset(); m_Writer.Reset(); WebSocketMessage Message; Message.SetMessageType(m_ResponseCount == m_RequestCount ? WebSocketMessageType::kStreamCompleteResponse : WebSocketMessageType::kStreamResponse); Message.SetCorrelationId(m_CorrelationId); Message.SetSocketId(m_SocketId); Message.SetBody(std::move(StreamResponse)); m_Server.SendResponse(std::move(Message)); } return m_ResponseCount == m_RequestCount; } bool ResponseStreamWriter::Flush() { return SendCurrentBatch(); } bool HttpStructuredCacheService::HandleRequest(const WebSocketMessage& RequestMessage) { const uint32_t kMaxBatchCount = 16; CbObjectView Request = RequestMessage.Body().GetObject(); const auto Method = Request["Method"].AsString(); CbObjectView Params = Request["Params"sv].AsObjectView(); if (Method == "GetBinaryCacheValue"sv) { ZEN_TRACE_CPU("Z$::WS_GetBinaryCacheValue"); // CachePolicy Policy; CbObjectView KeyObject = Params["Key"sv].AsObjectView(); CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); ZenCacheValue CacheValue; const bool InLocalCache = m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue); CbPackage Response; if (InLocalCache) { m_CacheStats.HitCount++; CbAttachment Attachment(SharedBuffer(CacheValue.Value)); CbObjectWriter ResponseObject; ResponseObject.AddAttachment("Result", Attachment); Response.AddAttachment(std::move(Attachment)); Response.SetObject(ResponseObject.Save()); ZenContentType ContentType = CacheValue.Value.GetContentType(); ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", Key.Bucket, Key.Hash, NiceBytes(CacheValue.Value.Size()), ToString(ContentType)); } else { m_CacheStats.MissCount++; CbObjectWriter ResponseObject; ResponseObject << "Error"sv << "Not Found"sv; Response.SetObject(ResponseObject.Save()); ZEN_DEBUG("MISS - '{}/{}' '{}'", Key.Bucket, Key.Hash, ToString(ZenContentType::kBinary)); } WebSocketMessage ResponseMessage; ResponseMessage.SetMessageType(WebSocketMessageType::kResponse); ResponseMessage.SetCorrelationId(RequestMessage.CorrelationId()); ResponseMessage.SetSocketId(RequestMessage.SocketId()); ResponseMessage.SetBody(std::move(Response)); SocketServer().SendResponse(std::move(ResponseMessage)); return true; } if (Method == "GetCacheValues"sv) { ZEN_TRACE_CPU("Z$::WS_GetCacheValues"); const std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); CachePolicy DefaultPolicy = DefaultPolicyText.empty() ? CachePolicy::Default : ParseCachePolicy(DefaultPolicyText); CbArrayView Requests = Params["Requests"sv].AsArrayView(); const uint64_t RequestCount = Requests.Num(); ResponseStreamWriter ResponseStream(SocketServer(), RequestMessage, uint32_t(RequestCount), kMaxBatchCount); for (int32_t Idx = 0; CbFieldView RequestField : Params["Requests"sv]) { const int32_t RequestIndex = Idx++; CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); std::string_view PolicyText = RequestObject["Policy"sv].AsString(); CachePolicy Policy = PolicyText.empty() ? DefaultPolicy : ParseCachePolicy(PolicyText); CompressedBuffer Compressed; bool InLocalCache = false; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) { Compressed = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); InLocalCache = true; } } if (Compressed.IsNull() && EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) { if (auto UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kCompressedBinary); UpstreamResult.Success) { Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)); if (Compressed) { UpstreamResult.Value.SetContentType(ZenContentType::kCompressedBinary); m_CacheStore.Put(Key.Bucket, Key.Hash, ZenCacheValue{UpstreamResult.Value}); } } } const uint64_t RawSize = Compressed.IsNull() ? 0 : Compressed.GetRawSize(); ResponseStream.Append([&RequestIndex, &Policy, &Compressed, &RawSize](CbPackage& Response, CbWriter& ResponseObject) { ResponseObject.BeginObject(); ResponseObject.AddInteger("RequestIndex"sv, RequestIndex); if (Compressed) { const IoHash RawHash = IoHash::FromBLAKE3(Compressed.GetRawHash()); ResponseObject.AddHash("RawHash"sv, RawHash); if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) { ResponseObject.AddInteger("RawSize"sv, RawSize); } else { Response.AddAttachment(CbAttachment(std::move(Compressed))); } } ResponseObject.EndObject(); }); if (RawSize > 0) { ZEN_DEBUG("HIT - '{}/{}' {} '{}'", Key.Bucket, Key.Hash, NiceBytes(RawSize), ToString(ZenContentType::kCompressedBinary)); } else { ZEN_DEBUG("MISS - '{}/{}'", Key.Bucket, Key.Hash); } } const bool IsStreamComplete = ResponseStream.Flush(); ZEN_ASSERT(IsStreamComplete); return true; } if (Method == "GetCacheRecords"sv) { ZEN_TRACE_CPU("Z$::WS_GetCacheRecords"); const std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); CachePolicy DefaultPolicy = DefaultPolicyText.empty() ? CachePolicy::Default : ParseCachePolicy(DefaultPolicyText); std::vector UpstreamRequests; CbArrayView Requests = Params["Requests"sv].AsArrayView(); const uint64_t RequestCount = Requests.Num(); ResponseStreamWriter ResponseStream(SocketServer(), RequestMessage, uint32_t(RequestCount), kMaxBatchCount); for (int32_t Idx = 0; CbFieldView RequestField : Requests) { const int32_t RequestIndex = Idx++; CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal)) { ZenCacheValue RecordCacheValue; if (m_CacheStore.Get(Key.Bucket, Key.Hash, RecordCacheValue)) { CbObjectView RecordObject = CbObjectView(RecordCacheValue.Value.GetData()); CbArrayView RecordValuesView = RecordObject["Values"sv].AsArrayView(); uint64_t TotalSize = RecordCacheValue.Value.GetSize(); std::vector Values; Values.reserve(RecordValuesView.Num()); for (CbFieldView ValueField : RecordValuesView) { CbObjectView ValueObject = ValueField.AsObjectView(); Oid ValueId = ValueObject["Id"sv].AsObjectId(); IoHash RawHash = ValueObject["RawHash"sv].AsHash(); CachePolicy ValuePolicy = Policy.GetValuePolicy(ValueId); if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) { if (IoBuffer Value = m_CidStore.FindChunkByCid(RawHash)) { Values.push_back(Value); TotalSize += Value.GetSize(); } } } const bool IsComplete = Values.size() == RecordValuesView.Num(); if (IsComplete) { ResponseStream.Append([&](CbPackage& Response, CbWriter& ResponseObject) { ResponseObject.BeginObject(); ResponseObject.AddInteger("RequestIndex"sv, RequestIndex); ResponseObject.AddObject("Record"sv, CbObject::Clone(RecordObject)); ResponseObject.EndObject(); for (const IoBuffer& Value : Values) { Response.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Value)))); } }); ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", Key.Bucket, Key.Hash, NiceBytes(TotalSize), ToString(ZenContentType::kCbObject)); continue; } } } // if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote) == false) { ResponseStream.Append([&](CbPackage&, CbWriter& ResponseObject) { ResponseObject.BeginObject(); ResponseObject.AddInteger("RequestIndex"sv, RequestIndex); ResponseObject.EndObject(); }); ZEN_DEBUG("MISS - '{}/{}'", Key.Bucket, Key.Hash); continue; } // UpstreamRequests.push_back({.Key = Key, .Policy = Policy, .UserData = uint64_t(RequestIndex)}); } bool IsStreamComplete = ResponseStream.Flush(); if (UpstreamRequests.empty()) { ZEN_ASSERT(IsStreamComplete); return true; } auto OnCacheRecordGetComplete = [this, &ResponseStream](CacheRecordGetCompleteParams&& Params) mutable { if (Params.Record) { CbArrayView RecordValuesView = Params.Record["Values"sv].AsArrayView(); uint32_t AttachmentCount{}; uint64_t TotalSize = Params.Record.GetSize(); std::vector Attachments; for (CbFieldView ValueField : RecordValuesView) { CbObjectView ValueObject = ValueField.AsObjectView(); Oid ValueId = ValueObject["Id"sv].AsObjectId(); IoHash RawHash = ValueObject["RawHash"sv].AsHash(); CachePolicy ValuePolicy = Params.Request.Policy.GetValuePolicy(ValueId); if (const CbAttachment* Attachment = Params.Package.FindAttachment(RawHash)) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { Attachments.emplace_back(Compressed); // Response.AddAttachment(CbAttachment(Compressed)); AttachmentCount++; TotalSize += Compressed.GetCompressedSize(); if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { IoBuffer Value = Compressed.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); m_CacheStore.Put(Params.Request.Key.Bucket, Params.Request.Key.Hash, {.Value = Value}); } } } else if (EnumHasAllFlags(Params.Request.Policy.GetRecordPolicy(), CachePolicy::QueryLocal)) { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(RawHash)) { // Response.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); Attachments.emplace_back(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))); AttachmentCount++; } } } const bool IsComplete = AttachmentCount == RecordValuesView.Num(); const bool AllowPartial = EnumHasAllFlags(Params.Request.Policy.GetRecordPolicy(), CachePolicy::PartialRecord); if (IsComplete || AllowPartial) { ResponseStream.Append([&](CbPackage& Response, CbWriter& ResponseObject) { for (CbAttachment& Attachment : Attachments) { Response.AddAttachment(std::move(Attachment)); } ResponseObject.BeginObject(); ResponseObject.AddInteger("RequestIndex"sv, int32_t(Params.Request.UserData)); ResponseObject.AddObject("Record"sv, CbObject::Clone(Params.Record)); ResponseObject.EndObject(); }); ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", Params.Request.Key.Bucket, Params.Request.Key.Hash, NiceBytes(TotalSize), ToString(ZenContentType::kCbObject)); return; } } ResponseStream.Append([&](CbPackage&, CbWriter& ResponseObject) { ResponseObject.BeginObject(); ResponseObject.AddInteger("RequestIndex"sv, int32_t(Params.Request.UserData)); ResponseObject.EndObject(); }); ZEN_DEBUG("MISS - '{}/{}'", Params.Request.Key.Bucket, Params.Request.Key.Hash); }; // TODO: Fix this std::vector RequestPtrs; RequestPtrs.reserve(UpstreamRequests.size()); for (CacheKeyRequest& Req : UpstreamRequests) { RequestPtrs.push_back(&Req); } m_UpstreamCache.GetCacheRecords(RequestPtrs, std::move(OnCacheRecordGetComplete)); IsStreamComplete = ResponseStream.Flush(); ZEN_ASSERT(IsStreamComplete); return true; } if (Method == "GetCacheChunks"sv) { ZEN_TRACE_CPU("Z$::WS_GetCacheChunks"); auto GetCidFromValueId = [](const Oid& ValueId, CbObjectView Record, uint64_t& OutRawSize) -> IoHash { CbArrayView Values = Record["Values"sv].AsArrayView(); for (CbFieldView Value : Values) { CbObjectView ValueObject = Value.AsObjectView(); if (ValueObject["Id"sv].AsObjectId() == ValueId) { OutRawSize = ValueObject["RawSize"sv].AsUInt64(); return ValueObject["RawHash"sv].AsHash(); } } return IoHash::Zero; }; CacheKey CurrentKey; IoBuffer CurrentRecordValue; CbArrayView Requests = Params["Requests"sv].AsArrayView(); const uint64_t RequestCount = Requests.Num(); ResponseStreamWriter ResponseStream(SocketServer(), RequestMessage, uint32_t(RequestCount), kMaxBatchCount); for (int32_t Idx = 0; CbFieldView RequestField : Requests) { CbObjectView RequestObject = RequestField.AsObjectView(); const int32_t RequestIndex = Idx++; CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); const IoHash RawHash = RequestObject["ChunkId"sv].AsHash(); const Oid ValueId = RequestObject["ValueId"sv].AsObjectId(); const uint64_t RequestedRawOffset = RequestObject["RawOffset"sv].AsUInt64(); const uint64_t RequestedRawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); IoHash Cid = RawHash; uint64_t RawSize = 0; if (RawHash == IoHash::Zero) { if (CurrentKey != Key || CurrentRecordValue.GetSize() == 0) { ZenCacheValue RecordCacheValue; if (m_CacheStore.Get(Key.Bucket, Key.Hash, RecordCacheValue)) { CurrentRecordValue = RecordCacheValue.Value; CurrentKey = Key; } } if (CurrentRecordValue) { Cid = GetCidFromValueId(ValueId, CbObjectView(CurrentRecordValue.GetData()), RawSize); } } CompressedBuffer Compressed; if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Cid)) { Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); RawSize = Compressed.GetRawSize(); } if (Compressed || RawSize > 0) { ResponseStream.Append([&Compressed, RequestIndex, &Cid, RawSize](CbPackage& Response, CbWriter& ResponseObject) { if (Compressed) { Response.AddAttachment(CbAttachment(std::move(Compressed))); } ResponseObject.BeginObject(); ResponseObject.AddInteger("RequestIndex"sv, RequestIndex); ResponseObject.AddHash("RawHash"sv, Cid); ResponseObject.AddInteger("RawSize"sv, RawSize); ResponseObject.EndObject(); }); ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", Key.Bucket, Key.Hash, NiceBytes(RawSize), ToString(ZenContentType::kCompressedBinary)); } else { ResponseStream.Append([RequestIndex](CbPackage&, CbWriter& ResponseObject) { ResponseObject.BeginObject(); ResponseObject.AddInteger("RequestIndex"sv, RequestIndex); ResponseObject.EndObject(); }); ZEN_DEBUG("MISS - '{}/{}' '{}'", Key.Bucket, Key.Hash, ToString(ZenContentType::kCompressedBinary)); } } const bool Complete = ResponseStream.Flush(); ZEN_ASSERT(Complete); return true; } return false; } void HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Bucket) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { // Query stats } break; case HttpVerb::kDelete: // Drop bucket if (m_CacheStore.DropBucket(Bucket)) { return Request.WriteResponse(HttpResponseCode::OK); } else { return Request.WriteResponse(HttpResponseCode::NotFound); } break; default: break; } } void HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { HandleGetCacheRecord(Request, Ref, PolicyFromURL); } break; case HttpVerb::kPut: HandlePutCacheRecord(Request, Ref, PolicyFromURL); break; default: break; } } void HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { const ZenContentType AcceptType = Request.AcceptContentType(); const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); bool Success = false; ZenCacheValue ClientResultValue; if (!EnumHasAnyFlags(PolicyFromURL, CachePolicy::Query)) { return Request.WriteResponse(HttpResponseCode::OK); } if (EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal) && m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, ClientResultValue)) { Success = true; ZenContentType ContentType = ClientResultValue.Value.GetContentType(); if (AcceptType == ZenContentType::kCbPackage) { if (ContentType == ZenContentType::kCbObject) { CbPackage Package; uint32_t MissingCount = 0; CbObjectView CacheRecord(ClientResultValue.Value.Data()); CacheRecord.IterateAttachments([this, &MissingCount, &Package, SkipData](CbFieldView AttachmentHash) { if (SkipData) { if (!m_CidStore.ContainsChunk(AttachmentHash.AsHash())) { MissingCount++; } } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); } else { MissingCount++; } } }); Success = MissingCount == 0 || PartialRecord; if (Success) { Package.SetObject(LoadCompactBinaryObject(ClientResultValue.Value)); BinaryWriter MemStream; Package.Save(MemStream); ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); ClientResultValue.Value.SetContentType(HttpContentType::kCbPackage); } } else { Success = false; } } else if (AcceptType != ClientResultValue.Value.GetContentType() && AcceptType != ZenContentType::kUnknownContentType && AcceptType != ZenContentType::kBinary) { Success = false; } } if (Success) { ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; if (SkipData && AcceptType != ZenContentType::kCbPackage && AcceptType != ZenContentType::kCbObject) { return Request.WriteResponse(HttpResponseCode::OK); } else { // kCbPackage handled SkipData when constructing the ClientResultValue, kcbObject ignores SkipData return Request.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } else if (!EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryRemote)) { ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } // Issue upstream query asynchronously in order to keep requests flowing without // hogging I/O servicing threads with blocking work Request.WriteResponseAsync([this, AcceptType, PolicyFromURL, Ref](HttpServerRequest& AsyncRequest) { bool Success = false; const bool PartialRecord = EnumHasAllFlags(PolicyFromURL, CachePolicy::PartialRecord); const bool QueryLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::QueryLocal); const bool StoreLocal = EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreLocal); const bool SkipData = EnumHasAllFlags(PolicyFromURL, CachePolicy::SkipData); ZenCacheValue ClientResultValue; metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Success) { Success = true; ClientResultValue.Value = UpstreamResult.Value; ClientResultValue.Value.SetContentType(AcceptType); if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) { if (AcceptType == ZenContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); if (ValidationResult != CbValidateError::None) { Success = false; ZEN_WARN("Get - '{}/{}' '{}' FAILED, invalid compact binary object from upstream", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } // We do not do anything to the returned object for SkipData, only package attachments are cut when skipping data } if (Success && StoreLocal) { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, ClientResultValue); } } else if (AcceptType == ZenContentType::kCbPackage) { CbPackage Package; if (Package.TryLoad(ClientResultValue.Value)) { CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) { if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash())) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { if (StoreLocal) { auto InsertResult = m_CidStore.AddChunk(Compressed); if (InsertResult.New) { Count.New++; } } Count.Valid++; } else { ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'", HashView.AsHash(), Ref.BucketSegment, Ref.HashKey); Count.Invalid++; } } else if (QueryLocal) { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash())) { Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); Count.Valid++; } } Count.Total++; }); if ((Count.Valid == Count.Total) || PartialRecord) { ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); if (StoreLocal) { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); } BinaryWriter MemStream; if (SkipData) { // Save a package containing only the object. CbPackage(Package.GetObject()).Save(MemStream); } else { Package.Save(MemStream); } ClientResultValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); ClientResultValue.Value.SetContentType(ZenContentType::kCbPackage); } else { Success = false; ZEN_WARN("Get - '{}/{}' '{}' FAILED, attachments missing in upstream package", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } } else { Success = false; ZEN_WARN("Get - '{}/{}' '{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); } } } if (Success) { ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", Ref.BucketSegment, Ref.HashKey, NiceBytes(ClientResultValue.Value.Size()), ToString(ClientResultValue.Value.GetContentType())); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount++; if (SkipData && AcceptType == ZenContentType::kBinary) { AsyncRequest.WriteResponse(HttpResponseCode::OK); } else { // Other methods modify ClientResultValue to a version that has skipped the data but keeps the Object and optionally // metadata. AsyncRequest.WriteResponse(HttpResponseCode::OK, ClientResultValue.Value.GetContentType(), ClientResultValue.Value); } } else { ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); m_CacheStats.MissCount++; AsyncRequest.WriteResponse(HttpResponseCode::NotFound); } }); } void HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { return Request.WriteResponse(HttpResponseCode::BadRequest); } const HttpContentType ContentType = Request.RequestContentType(); Body.SetContentType(ContentType); if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kCompressedBinary) { ZEN_DEBUG("PUT - '{}/{}' {} '{}'", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType)); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); if (EnumHasAllFlags(PolicyFromURL, CachePolicy::StoreRemote)) { m_UpstreamCache.EnqueueUpstream({.Type = ContentType, .Key = {Ref.BucketSegment, Ref.HashKey}}); } Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); if (ValidationResult != CbValidateError::None) { ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid compact binary", Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); } CachePolicy Policy = PolicyFromURL; CbObjectView CacheRecord(Body.Data()); std::vector ValidAttachments; int32_t TotalCount = 0; CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) { const IoHash Hash = AttachmentHash.AsHash(); if (m_CidStore.ContainsChunk(Hash)) { ValidAttachments.emplace_back(Hash); } TotalCount++; }); ZEN_DEBUG("PUT - '{}/{}' {} '{}' attachments '{}/{}' (valid/total)", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ToString(ContentType), TotalCount, ValidAttachments.size()); Body.SetContentType(ZenContentType::kCbObject); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); const bool IsPartialRecord = TotalCount != static_cast(ValidAttachments.size()); if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbPackage) { CbPackage Package; if (!Package.TryLoad(Body)) { ZEN_WARN("PUT - '{}/{}' '{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey, ToString(ContentType)); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); } CachePolicy Policy = PolicyFromURL; CbObject CacheRecord = Package.GetObject(); AttachmentCount Count; std::vector ValidAttachments; ValidAttachments.reserve(Package.GetAttachments().size()); CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &Count](CbFieldView HashView) { const IoHash Hash = HashView.AsHash(); if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (Attachment->IsCompressedBinary()) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); ValidAttachments.emplace_back(InsertResult.DecompressedId); if (InsertResult.New) { Count.New++; } Count.Valid++; } else { ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", Ref.BucketSegment, Ref.HashKey, ToString(HttpContentType::kCbPackage), Hash); Count.Invalid++; } } else if (m_CidStore.ContainsChunk(Hash)) { ValidAttachments.emplace_back(Hash); Count.Valid++; } Count.Total++; }); if (Count.Invalid > 0) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.GetSize()), ToString(ContentType), Count.New, Count.Valid, Count.Total); ZenCacheValue CacheValue; CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer(); CacheValue.Value.SetContentType(ZenContentType::kCbObject); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, .ValueContentIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); } else { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Content-Type invalid"sv); } } void HttpStructuredCacheService::HandleCacheValueRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: HandleGetCacheValue(Request, Ref, PolicyFromURL); break; case HttpVerb::kPut: HandlePutCacheValue(Request, Ref, PolicyFromURL); break; default: break; } } void HttpStructuredCacheService::HandleGetCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { IoBuffer Value = m_CidStore.FindChunkByCid(Ref.ValueContentId); bool InUpstreamCache = false; CachePolicy Policy = PolicyFromURL; const bool QueryUpstream = !Value && EnumHasAllFlags(Policy, CachePolicy::QueryRemote); if (QueryUpstream) { if (auto UpstreamResult = m_UpstreamCache.GetCacheValue({Ref.BucketSegment, Ref.HashKey}, Ref.ValueContentId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { m_CidStore.AddChunk(Compressed); InUpstreamCache = true; } else { ZEN_WARN("got uncompressed upstream cache value"); } } } if (!Value) { ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, ToString(Request.AcceptContentType())); m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, NiceBytes(Value.Size()), ToString(Value.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); m_CacheStats.HitCount++; if (InUpstreamCache) { m_CacheStats.UpstreamHitCount++; } if (EnumHasAllFlags(Policy, CachePolicy::SkipData)) { Request.WriteResponse(HttpResponseCode::OK); } else { Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Value); } } void HttpStructuredCacheService::HandlePutCacheValue(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromURL) { // Note: Individual cacherecord values are not propagated upstream until a valid cache record has been stored ZEN_UNUSED(PolicyFromURL); IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { return Request.WriteResponse(HttpResponseCode::BadRequest); } Body.SetContentType(Request.RequestContentType()); CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); if (!Compressed) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.ValueContentId) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "ValueContentId does not match attachment hash"sv); } CidStore::InsertResult Result = m_CidStore.AddChunk(Compressed); ZEN_DEBUG("PUT - '{}/{}/{}' {} '{}' ({})", Ref.BucketSegment, Ref.HashKey, Ref.ValueContentId, NiceBytes(Body.Size()), ToString(Body.GetContentType()), Result.New ? "NEW" : "OLD"); const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; Request.WriteResponse(ResponseCode); } bool HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); if (BucketSplitOffset == std::string_view::npos) { return false; } OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset)); if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); })) { return false; } std::string_view HashSegment; std::string_view ValueSegment; std::string_view::size_type ValueSplitOffset = Key.find_last_of('/'); // We know there is a slash so no need to check for npos return if (ValueSplitOffset == BucketSplitOffset) { // Basic cache record lookup HashSegment = Key.substr(BucketSplitOffset + 1); } else { // Cache record + valueid lookup HashSegment = Key.substr(BucketSplitOffset + 1, ValueSplitOffset - BucketSplitOffset - 1); ValueSegment = Key.substr(ValueSplitOffset + 1); } if (HashSegment.size() != IoHash::StringLength) { return false; } if (!ValueSegment.empty() && ValueSegment.size() == IoHash::StringLength) { const bool IsOk = ParseHexBytes(ValueSegment.data(), ValueSegment.size(), OutRef.ValueContentId.Hash); if (!IsOk) { return false; } } else { OutRef.ValueContentId = IoHash::Zero; } const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); if (!IsOk) { return false; } return true; } void HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) { switch (Request.RequestVerb()) { case HttpVerb::kPost: { const HttpContentType ContentType = Request.RequestContentType(); const HttpContentType AcceptType = Request.AcceptContentType(); if ((ContentType != HttpContentType::kCbObject && ContentType != HttpContentType::kCbPackage) || AcceptType != HttpContentType::kCbPackage) { return Request.WriteResponse(HttpResponseCode::BadRequest); } Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable { CbPackage Package; CbObjectView Object; CbObject ObjectBuffer; if (ContentType == HttpContentType::kCbObject) { ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body)); Object = ObjectBuffer; } else { Package = ParsePackageMessage(Body); Object = Package.GetObject(); } const std::string_view Method = Object["Method"sv].AsString(); if (Method == "PutCacheRecords"sv) { HandleRpcPutCacheRecords(AsyncRequest, Package); } else if (Method == "GetCacheRecords"sv) { HandleRpcGetCacheRecords(AsyncRequest, Object); } else if (Method == "PutCacheValues"sv) { HandleRpcPutCacheValues(AsyncRequest, Package); } else if (Method == "GetCacheValues"sv) { HandleRpcGetCacheValues(AsyncRequest, Object); } else if (Method == "GetCacheChunks"sv) { HandleRpcGetCacheChunks(AsyncRequest, Object); } else { AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); } }); } break; default: Request.WriteResponse(HttpResponseCode::BadRequest); break; } } void HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) { ZEN_TRACE_CPU("Z$::RpcPutCacheRecords"); CbObjectView BatchObject = BatchRequest.GetObject(); CbObjectView Params = BatchObject["Params"sv].AsObjectView(); CachePolicy DefaultPolicy; ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv); std::string_view PolicyText = Params["DefaultPolicy"].AsString(); DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::vector Results; for (CbFieldView RequestField : Params["Requests"sv]) { CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView RecordObject = RequestObject["Record"sv].AsObjectView(); CbObjectView KeyView = RecordObject["Key"sv].AsObjectView(); CbFieldView BucketField = KeyView["Bucket"sv]; CbFieldView HashField = KeyView["Hash"sv]; CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) { return Request.WriteResponse(HttpResponseCode::BadRequest); } CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); PutRequestData PutRequest{std::move(Key), RecordObject, std::move(Policy)}; PutResult Result = PutCacheRecord(PutRequest, &BatchRequest); if (Result == PutResult::Invalid) { return Request.WriteResponse(HttpResponseCode::BadRequest); } Results.push_back(Result == PutResult::Success); } if (Results.empty()) { return Request.WriteResponse(HttpResponseCode::BadRequest); } CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); for (bool Value : Results) { ResponseObject.AddBool(Value); } ResponseObject.EndArray(); CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; RpcResponse.Save(MemStream); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } HttpStructuredCacheService::PutResult HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package) { std::vector ValidAttachments; AttachmentCount Count; CbObjectView Record = Request.RecordObject; uint64_t RecordObjectSize = Record.GetSize(); uint64_t TransferredSize = RecordObjectSize; Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) { const IoHash ValueHash = HashView.AsHash(); if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr) { if (Attachment->IsCompressedBinary()) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); ValidAttachments.emplace_back(InsertResult.DecompressedId); if (InsertResult.New) { Count.New++; } Count.Valid++; TransferredSize += Chunk.GetCompressedSize(); } else { ZEN_WARN("PUT - '{}/{}' '{}' FAILED, attachment '{}' is not compressed", Request.Key.Bucket, Request.Key.Hash, ToString(HttpContentType::kCbPackage), ValueHash); Count.Invalid++; } } else if (m_CidStore.ContainsChunk(ValueHash)) { ValidAttachments.emplace_back(ValueHash); Count.Valid++; } Count.Total++; }); if (Count.Invalid > 0) { return PutResult::Invalid; } ZEN_DEBUG("PUT - '{}/{}' {}, attachments '{}/{}/{}' (new/valid/total)", Request.Key.Bucket, Request.Key.Hash, NiceBytes(TransferredSize), Count.New, Count.Valid, Count.Total); ZenCacheValue CacheValue; CacheValue.Value = IoBuffer(Record.GetSize()); Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize())); CacheValue.Value.SetContentType(ZenContentType::kCbObject); m_CacheStore.Put(Request.Key.Bucket, Request.Key.Hash, CacheValue); const bool IsPartialRecord = Count.Valid != Count.Total; if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord) { m_UpstreamCache.EnqueueUpstream( {.Type = ZenContentType::kCbPackage, .Key = Request.Key, .ValueContentIds = std::move(ValidAttachments)}); } return PutResult::Success; } void HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheRecords"); CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); struct ValueRequestData { Oid ValueId; IoHash ContentId; CompressedBuffer Payload; CachePolicy DownstreamPolicy; bool Exists = false; bool ReadFromUpstream = false; }; struct RecordRequestData { CacheKeyRequest Upstream; CbObjectView RecordObject; IoBuffer RecordCacheValue; CacheRecordPolicy DownstreamPolicy; std::vector Values; bool Complete = false; bool UsedUpstream = false; }; std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::vector Requests; std::vector UpstreamIndexes; CbArrayView RequestsArray = Params["Requests"sv].AsArrayView(); Requests.reserve(RequestsArray.Num()); auto ParseValues = [](RecordRequestData& Request) { CbArrayView ValuesArray = Request.RecordObject["Values"sv].AsArrayView(); Request.Values.reserve(ValuesArray.Num()); for (CbFieldView ValueField : ValuesArray) { CbObjectView ValueObject = ValueField.AsObjectView(); Oid ValueId = ValueObject["Id"sv].AsObjectId(); CbFieldView RawHashField = ValueObject["RawHash"sv]; IoHash RawHash = RawHashField.AsBinaryAttachment(); if (ValueId && !RawHashField.HasError()) { Request.Values.push_back({ValueId, RawHash}); Request.Values.back().DownstreamPolicy = Request.DownstreamPolicy.GetValuePolicy(ValueId); } } }; for (CbFieldView RequestField : RequestsArray) { RecordRequestData& Request = Requests.emplace_back(); CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); CbFieldView BucketField = KeyObject["Bucket"sv]; CbFieldView HashField = KeyObject["Hash"sv]; CacheKey& Key = Request.Upstream.Key; Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); if (HashField.HasError() || Key.Bucket.empty()) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy); const CacheRecordPolicy& Policy = Request.DownstreamPolicy; ZenCacheValue CacheValue; bool NeedUpstreamAttachment = false; bool FoundLocalInvalid = false; ZenCacheValue RecordCacheValue; if (EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryLocal) && m_CacheStore.Get(Key.Bucket, Key.Hash, RecordCacheValue)) { Request.RecordCacheValue = std::move(RecordCacheValue.Value); if (Request.RecordCacheValue.GetContentType() != ZenContentType::kCbObject) { FoundLocalInvalid = true; } else { Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); ParseValues(Request); Request.Complete = true; for (ValueRequestData& Value : Request.Values) { CachePolicy ValuePolicy = Value.DownstreamPolicy; if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryLocal)) { // A value that is requested without the Query flag (such as None/Disable) counts as existing, because we // didn't ask for it and thus the record is complete in its absence. if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { Value.Exists = true; } else { NeedUpstreamAttachment = true; Value.ReadFromUpstream = true; Request.Complete = false; } } else if (EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { if (m_CidStore.ContainsChunk(Value.ContentId)) { Value.Exists = true; } else { if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { NeedUpstreamAttachment = true; Value.ReadFromUpstream = true; } Request.Complete = false; } } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) { ZEN_ASSERT(Chunk.GetSize() > 0); Value.Payload = CompressedBuffer::FromCompressed(SharedBuffer(Chunk)); Value.Exists = true; } else { if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { NeedUpstreamAttachment = true; Value.ReadFromUpstream = true; } Request.Complete = false; } } } } } if (!Request.Complete) { bool NeedUpstreamRecord = !Request.RecordObject && !FoundLocalInvalid && EnumHasAllFlags(Policy.GetRecordPolicy(), CachePolicy::QueryRemote); if (NeedUpstreamRecord || NeedUpstreamAttachment) { UpstreamIndexes.push_back(Requests.size() - 1); } } } if (Requests.empty()) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } if (!UpstreamIndexes.empty()) { std::vector UpstreamRequests; UpstreamRequests.reserve(UpstreamIndexes.size()); for (size_t Index : UpstreamIndexes) { RecordRequestData& Request = Requests[Index]; UpstreamRequests.push_back(&Request.Upstream); if (Request.Values.size()) { // We will be returning the local object and know all the value Ids that exist in it // Convert all their Downstream Values to upstream values, and add SkipData to any ones that we already have. CachePolicy UpstreamBasePolicy = ConvertToUpstream(Request.DownstreamPolicy.GetBasePolicy()) | CachePolicy::SkipMeta; CacheRecordPolicyBuilder Builder(UpstreamBasePolicy); for (ValueRequestData& Value : Request.Values) { CachePolicy UpstreamPolicy = ConvertToUpstream(Value.DownstreamPolicy); UpstreamPolicy |= !Value.ReadFromUpstream ? CachePolicy::SkipData : CachePolicy::None; Builder.AddValuePolicy(Value.ValueId, UpstreamPolicy); } Request.Upstream.Policy = Builder.Build(); } else { // We don't know which Values exist in the Record; ask the upstrem for all values that the client wants, // and convert the CacheRecordPolicy to an upstream policy Request.Upstream.Policy = Request.DownstreamPolicy.ConvertToUpstream(); } } const auto OnCacheRecordGetComplete = [this, &ParseValues](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; } RecordRequestData& Request = *reinterpret_cast(reinterpret_cast(&Params.Request) - offsetof(RecordRequestData, Upstream)); const CacheKey& Key = Request.Upstream.Key; if (!Request.RecordObject) { CbObject ObjectBuffer = CbObject::Clone(Params.Record); Request.RecordCacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); Request.RecordCacheValue.SetContentType(ZenContentType::kCbObject); Request.RecordObject = ObjectBuffer; if (EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::StoreLocal)) { m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = {Request.RecordCacheValue}}); } ParseValues(Request); Request.UsedUpstream = true; } Request.Complete = true; for (ValueRequestData& Value : Request.Values) { if (Value.Exists) { continue; } CachePolicy ValuePolicy = Value.DownstreamPolicy; if (!EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { Request.Complete = false; continue; } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData) || EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { if (const CbAttachment* Attachment = Params.Package.FindAttachment(Value.ContentId)) { if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) { Request.UsedUpstream = true; Value.Exists = true; if (EnumHasAllFlags(ValuePolicy, CachePolicy::StoreLocal)) { m_CidStore.AddChunk(Compressed); } if (!EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { Value.Payload = Compressed; } } else { ZEN_DEBUG("Uncompressed value '{}' from upstream cache record '{}/{}'", Value.ContentId, Key.Bucket, Key.Hash); } } if (!Value.Exists && !EnumHasAllFlags(ValuePolicy, CachePolicy::SkipData)) { Request.Complete = false; } // Request.Complete does not need to be set to false for upstream SkipData attachments. // In the PartialRecord==false case, the upstream will have failed the entire record if any SkipData attachment // didn't exist and we will not get here. In the PartialRecord==true case, we do not need to inform the client of // any missing SkipData attachments. } } }; m_UpstreamCache.GetCacheRecords(UpstreamRequests, std::move(OnCacheRecordGetComplete)); } CbPackage ResponsePackage; CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); for (RecordRequestData& Request : Requests) { const CacheKey& Key = Request.Upstream.Key; if (Request.Complete || (Request.RecordObject && EnumHasAllFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::PartialRecord))) { ResponseObject << Request.RecordObject; for (ValueRequestData& Value : Request.Values) { if (!EnumHasAllFlags(Value.DownstreamPolicy, CachePolicy::SkipData) && Value.Payload) { ResponsePackage.AddAttachment(CbAttachment(Value.Payload)); } } ZEN_DEBUG("HIT - '{}/{}' {}{}{}", Key.Bucket, Key.Hash, NiceBytes(Request.RecordCacheValue.Size()), Request.Complete ? ""sv : " (PARTIAL)"sv, Request.UsedUpstream ? " (UPSTREAM)"sv : ""sv); m_CacheStats.HitCount++; m_CacheStats.UpstreamHitCount += Request.UsedUpstream ? 1 : 0; } else { ResponseObject.AddNull(); if (!EnumHasAnyFlags(Request.DownstreamPolicy.GetRecordPolicy(), CachePolicy::Query)) { // If they requested no query, do not record this as a miss ZEN_DEBUG("DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); } else { ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, Request.RecordObject ? ""sv : "(PARTIAL)"sv); m_CacheStats.MissCount++; } } } ResponseObject.EndArray(); ResponsePackage.SetObject(ResponseObject.Save()); BinaryWriter MemStream; ResponsePackage.Save(MemStream); HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } void HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest) { ZEN_TRACE_CPU("Z$::RpcPutCacheValues"); CbObjectView BatchObject = BatchRequest.GetObject(); CbObjectView Params = BatchObject["Params"sv].AsObjectView(); ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv); std::string_view PolicyText = Params["DefaultPolicy"].AsString(); CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; std::vector Results; for (CbFieldView RequestField : Params["Requests"sv]) { CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyView = RequestObject["Key"sv].AsObjectView(); CbFieldView BucketField = KeyView["Bucket"sv]; CbFieldView HashField = KeyView["Hash"sv]; CacheKey Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); if (BucketField.HasError() || HashField.HasError() || Key.Bucket.empty()) { return Request.WriteResponse(HttpResponseCode::BadRequest); } PolicyText = RequestObject["Policy"sv].AsString(); CachePolicy Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; IoHash RawHash = RequestObject["RawHash"sv].AsBinaryAttachment(); bool Succeeded = false; uint64_t TransferredSize = 0; if (const CbAttachment* Attachment = BatchRequest.FindAttachment(RawHash)) { if (Attachment->IsCompressedBinary()) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { // TODO: Implement upstream puts of CacheValues with StoreLocal == false. // Currently ProcessCacheRecord requires that the value exist in the local cache to put it upstream. Policy |= CachePolicy::StoreLocal; } if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { IoBuffer Value = Chunk.GetCompressed().Flatten().AsIoBuffer(); Value.SetContentType(ZenContentType::kCompressedBinary); m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Value}); TransferredSize = Chunk.GetCompressedSize(); } Succeeded = true; } else { ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}' FAILED, value is not compressed", Key.Bucket, Key.Hash, RawHash); return Request.WriteResponse(HttpResponseCode::BadRequest); } } else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { ZenCacheValue ExistingValue; if (m_CacheStore.Get(Key.Bucket, Key.Hash, ExistingValue) && IsCompressedBinary(ExistingValue.Value.GetContentType())) { Succeeded = true; } } // We do not search the Upstream. No data in a put means the caller is probing for whether they need to do a heavy put. // If it doesn't exist locally they should do the heavy put rather than having us fetch it from upstream. if (Succeeded && EnumHasAllFlags(Policy, CachePolicy::StoreRemote)) { m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCompressedBinary, .Key = Key}); } Results.push_back(Succeeded); ZEN_DEBUG("PUTCACHEVALUES - '{}/{}' {}, '{}'", Key.Bucket, Key.Hash, NiceBytes(TransferredSize), Succeeded ? "Added"sv : "Invalid"); } if (Results.empty()) { return Request.WriteResponse(HttpResponseCode::BadRequest); } CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); for (bool Value : Results) { ResponseObject.AddBool(Value); } ResponseObject.EndArray(); CbPackage RpcResponse; RpcResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; RpcResponse.Save(MemStream); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } void HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { ZEN_TRACE_CPU("Z$::RpcGetCacheValues"); CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); std::string_view PolicyText = Params["DefaultPolicy"sv].AsString(); CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default; struct RequestData { CacheKey Key; CachePolicy Policy; CompressedBuffer Result; }; std::vector Requests; ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv); for (CbFieldView RequestField : Params["Requests"sv]) { RequestData& Request = Requests.emplace_back(); CbObjectView RequestObject = RequestField.AsObjectView(); CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); CbFieldView BucketField = KeyObject["Bucket"sv]; CbFieldView HashField = KeyObject["Hash"sv]; Request.Key = CacheKey::Create(BucketField.AsString(), HashField.AsHash()); if (BucketField.HasError() || HashField.HasError() || Request.Key.Bucket.empty()) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } PolicyText = RequestObject["Policy"sv].AsString(); Request.Policy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; CacheKey& Key = Request.Key; CachePolicy Policy = Request.Policy; CompressedBuffer& Result = Request.Result; ZenCacheValue CacheValue; std::string_view Source; if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal)) { if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue) && IsCompressedBinary(CacheValue.Value.GetContentType())) { Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); if (Result) { Source = "LOCAL"sv; } } } if (!Result && EnumHasAllFlags(Policy, CachePolicy::QueryRemote)) { GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Key.Bucket, Key.Hash}, ZenContentType::kCompressedBinary); if (UpstreamResult.Success && IsCompressedBinary(UpstreamResult.Value.GetContentType())) { Result = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)); if (Result) { UpstreamResult.Value.SetContentType(ZenContentType::kCompressedBinary); Source = "UPSTREAM"sv; // TODO: Respect the StoreLocal flag once we have upstream existence-only checks. For now the requirement // that we copy data from upstream even when SkipData and !StoreLocal are true means that it is too expensive // for us to keep the data only on the upstream server. // if (EnumHasAllFlags(Policy, CachePolicy::StoreLocal)) { m_CacheStore.Put(Key.Bucket, Key.Hash, ZenCacheValue{UpstreamResult.Value}); } } } } if (Result) { ZEN_DEBUG("GETCACHEVALUES HIT - '{}/{}' {} ({})", Key.Bucket, Key.Hash, NiceBytes(Result.GetCompressed().GetSize()), Source); m_CacheStats.HitCount++; } else if (!EnumHasAnyFlags(Policy, CachePolicy::Query)) { // If they requested no query, do not record this as a miss ZEN_DEBUG("GETCACHEVALUES DISABLEDQUERY - '{}/{}'", Key.Bucket, Key.Hash); } else { ZEN_DEBUG("GETCACHEVALUES MISS - '{}/{}'", Key.Bucket, Key.Hash); m_CacheStats.MissCount++; } } if (Requests.empty()) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } CbPackage RpcResponse; CbObjectWriter ResponseObject; ResponseObject.BeginArray("Result"sv); for (const RequestData& Request : Requests) { ResponseObject.BeginObject(); { const CompressedBuffer& Result = Request.Result; if (Result) { ResponseObject.AddHash("RawHash"sv, IoHash::FromBLAKE3(Result.GetRawHash())); if (!EnumHasAllFlags(Request.Policy, CachePolicy::SkipData)) { RpcResponse.AddAttachment(CbAttachment(Result)); } else { ResponseObject.AddInteger("RawSize"sv, Result.GetRawSize()); } } } ResponseObject.EndObject(); } ResponseObject.EndArray(); RpcResponse.SetObject(ResponseObject.Save()); BinaryWriter MemStream; RpcResponse.Save(MemStream); HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } namespace cache::detail { struct RecordValue { Oid ValueId; IoHash ContentId; uint64_t RawSize; }; struct RecordBody { IoBuffer CacheValue; std::vector Values; std::string_view Source; CachePolicy DownstreamPolicy; bool Exists = false; bool HasRequest = false; bool ValuesRead = false; }; struct ChunkRequest { CacheChunkRequest* Key = nullptr; RecordBody* Record = nullptr; CompressedBuffer Value; std::string_view Source; uint64_t TotalSize = 0; uint64_t RequestedSize = 0; uint64_t RequestedOffset = 0; CachePolicy DownstreamPolicy; bool Exists = false; bool TotalSizeKnown = false; bool IsRecordRequest = false; }; } // namespace cache::detail void HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest) { using namespace cache::detail; ZEN_TRACE_CPU("Z$::RpcGetCacheChunks"); std::vector RecordKeys; // Data about a Record necessary to identify it to the upstream std::vector Records; // Scratch-space data about a Record when fulfilling RecordRequests std::vector RequestKeys; // Data about a ChunkRequest necessary to identify it to the upstream std::vector Requests; // Intermediate and result data about a ChunkRequest std::vector RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key std::vector ValueRequests; // The ChunkRequests that are requesting a Value Key std::vector UpstreamChunks; // ChunkRequests that we need to send to the upstream // Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests if (!ParseGetCacheChunksRequest(RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest)) { return HttpRequest.WriteResponse(HttpResponseCode::BadRequest); } // For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we // have it locally, and otherwise append a request for the payload to UpstreamChunks GetLocalCacheRecords(RecordKeys, Records, RecordRequests, UpstreamChunks); // For each Value request, load the Value if we have it locally and otherwise append a request for the payload to UpstreamChunks GetLocalCacheValues(ValueRequests, UpstreamChunks); // Call GetCacheChunks on the upstream for any payloads we do not have locally GetUpstreamCacheChunks(UpstreamChunks, RequestKeys, Requests); // Send the payload and descriptive data about each chunk to the client WriteGetCacheChunksResponse(Requests, HttpRequest); } bool HttpStructuredCacheService::ParseGetCacheChunksRequest(std::vector& RecordKeys, std::vector& Records, std::vector& RequestKeys, std::vector& Requests, std::vector& RecordRequests, std::vector& ValueRequests, CbObjectView RpcRequest) { using namespace cache::detail; ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv); CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString(); CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default; CbArrayView ChunkRequestsArray = Params["ChunkRequests"sv].AsArrayView(); size_t NumRequests = static_cast(ChunkRequestsArray.Num()); // Note that these reservations allow us to take pointers to the elements while populating them. If the reservation is removed, // we will need to change the pointers to indexes to handle reallocations. RecordKeys.reserve(NumRequests); Records.reserve(NumRequests); RequestKeys.reserve(NumRequests); Requests.reserve(NumRequests); RecordRequests.reserve(NumRequests); ValueRequests.reserve(NumRequests); CacheKeyRequest* PreviousRecordKey = nullptr; RecordBody* PreviousRecord = nullptr; for (CbFieldView RequestView : ChunkRequestsArray) { CbObjectView RequestObject = RequestView.AsObjectView(); CacheChunkRequest& RequestKey = RequestKeys.emplace_back(); ChunkRequest& Request = Requests.emplace_back(); Request.Key = &RequestKey; CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); CbFieldView HashField = KeyObject["Hash"sv]; RequestKey.Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), HashField.AsHash()); if (RequestKey.Key.Bucket.empty() || HashField.HasError()) { ZEN_WARN("GetCacheChunks: Invalid key in ChunkRequest."); return false; } RequestKey.ChunkId = RequestObject["ChunkId"sv].AsHash(); RequestKey.ValueId = RequestObject["ValueId"sv].AsObjectId(); RequestKey.RawOffset = RequestObject["RawOffset"sv].AsUInt64(); RequestKey.RawSize = RequestObject["RawSize"sv].AsUInt64(UINT64_MAX); Request.RequestedSize = RequestKey.RawSize; Request.RequestedOffset = RequestKey.RawOffset; std::string_view PolicyText = RequestObject["Policy"sv].AsString(); Request.DownstreamPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : DefaultPolicy; Request.IsRecordRequest = (bool)RequestKey.ValueId; if (!Request.IsRecordRequest) { ValueRequests.push_back(&Request); } else { RecordRequests.push_back(&Request); CacheKeyRequest* RecordKey = nullptr; RecordBody* Record = nullptr; if (!PreviousRecordKey || PreviousRecordKey->Key < RequestKey.Key) { RecordKey = &RecordKeys.emplace_back(); PreviousRecordKey = RecordKey; Record = &Records.emplace_back(); PreviousRecord = Record; RecordKey->Key = RequestKey.Key; } else if (RequestKey.Key == PreviousRecordKey->Key) { RecordKey = PreviousRecordKey; Record = PreviousRecord; } else { ZEN_WARN("GetCacheChunks: Keys in ChunkRequest are not sorted: {}/{} came after {}/{}.", RequestKey.Key.Bucket, RequestKey.Key.Hash, PreviousRecordKey->Key.Bucket, PreviousRecordKey->Key.Hash); return false; } Request.Record = Record; if (RequestKey.ChunkId == RequestKey.ChunkId.Zero) { Record->DownstreamPolicy = Record->HasRequest ? Union(Record->DownstreamPolicy, Request.DownstreamPolicy) : Request.DownstreamPolicy; Record->HasRequest = true; } } } if (Requests.empty()) { return false; } return true; } void HttpStructuredCacheService::GetLocalCacheRecords(std::vector& RecordKeys, std::vector& Records, std::vector& RecordRequests, std::vector& OutUpstreamChunks) { using namespace cache::detail; std::vector UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) { CacheKeyRequest& RecordKey = RecordKeys[RecordIndex]; RecordBody& Record = Records[RecordIndex]; if (Record.HasRequest) { Record.DownstreamPolicy |= CachePolicy::SkipData | CachePolicy::SkipMeta; if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; if (m_CacheStore.Get(RecordKey.Key.Bucket, RecordKey.Key.Hash, CacheValue)) { Record.Exists = true; Record.CacheValue = std::move(CacheValue.Value); Record.Source = "LOCAL"sv; } } if (!Record.Exists && EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::QueryRemote)) { RecordKey.Policy = CacheRecordPolicy(ConvertToUpstream(Record.DownstreamPolicy)); UpstreamRecordRequests.push_back(&RecordKey); } } } if (!UpstreamRecordRequests.empty()) { const auto OnCacheRecordGetComplete = [this, &RecordKeys, &Records](CacheRecordGetCompleteParams&& Params) { if (!Params.Record) { return; } CacheKeyRequest& RecordKey = Params.Request; size_t RecordIndex = std::distance(RecordKeys.data(), &RecordKey); RecordBody& Record = Records[RecordIndex]; const CacheKey& Key = RecordKey.Key; Record.Exists = true; CbObject ObjectBuffer = CbObject::Clone(Params.Record); Record.CacheValue = ObjectBuffer.GetBuffer().AsIoBuffer(); Record.CacheValue.SetContentType(ZenContentType::kCbObject); Record.Source = "UPSTREAM"sv; if (EnumHasAllFlags(Record.DownstreamPolicy, CachePolicy::StoreLocal)) { m_CacheStore.Put(Key.Bucket, Key.Hash, {.Value = Record.CacheValue}); } }; m_UpstreamCache.GetCacheRecords(UpstreamRecordRequests, std::move(OnCacheRecordGetComplete)); } std::vector UpstreamPayloadRequests; for (ChunkRequest* Request : RecordRequests) { if (Request->Key->ChunkId == IoHash::Zero) { // Unreal uses a 12 byte ID to address cache record values. When the uncompressed hash (ChunkId) // is missing, parse the cache record and try to find the raw hash from the ValueId. RecordBody& Record = *Request->Record; if (!Record.ValuesRead) { Record.ValuesRead = true; if (Record.CacheValue && Record.CacheValue.GetContentType() == ZenContentType::kCbObject) { CbObjectView RecordObject = CbObjectView(Record.CacheValue.GetData()); CbArrayView ValuesArray = RecordObject["Values"sv].AsArrayView(); Record.Values.reserve(ValuesArray.Num()); for (CbFieldView ValueField : ValuesArray) { CbObjectView ValueObject = ValueField.AsObjectView(); Oid ValueId = ValueObject["Id"sv].AsObjectId(); CbFieldView RawHashField = ValueObject["RawHash"sv]; IoHash RawHash = RawHashField.AsBinaryAttachment(); if (ValueId && !RawHashField.HasError()) { Record.Values.push_back({ValueId, RawHash, ValueObject["RawSize"sv].AsUInt64()}); } } } } for (const RecordValue& Value : Record.Values) { if (Value.ValueId == Request->Key->ValueId) { Request->Key->ChunkId = Value.ContentId; Request->TotalSize = Value.RawSize; Request->TotalSizeKnown = true; break; } } } // Now load the ContentId from the local ContentIdStore or from the upstream if (Request->Key->ChunkId != IoHash::Zero) { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData) && Request->TotalSizeKnown) { if (m_CidStore.ContainsChunk(Request->Key->ChunkId)) { Request->Exists = true; Request->Source = "LOCAL"sv; } } else if (IoBuffer Payload = m_CidStore.FindChunkByCid(Request->Key->ChunkId)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload)); if (Compressed) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Compressed; } Request->Exists = true; Request->TotalSize = Compressed.GetRawSize(); Request->TotalSizeKnown = true; Request->Source = "LOCAL"sv; } } } if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) { Request->Key->Policy = ConvertToUpstream(Request->DownstreamPolicy); OutUpstreamChunks.push_back(Request->Key); } } } } void HttpStructuredCacheService::GetLocalCacheValues(std::vector& ValueRequests, std::vector& OutUpstreamChunks) { using namespace cache::detail; for (ChunkRequest* Request : ValueRequests) { if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryLocal)) { ZenCacheValue CacheValue; if (m_CacheStore.Get(Request->Key->Key.Bucket, Request->Key->Key.Hash, CacheValue)) { if (IsCompressedBinary(CacheValue.Value.GetContentType())) { CompressedBuffer Result = CompressedBuffer::FromCompressed(SharedBuffer(CacheValue.Value)); if (Result) { if (!EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::SkipData)) { Request->Value = Result; } Request->Key->ChunkId = IoHash::FromBLAKE3(Result.GetRawHash()); Request->Exists = true; Request->TotalSize = Result.GetRawSize(); Request->TotalSizeKnown = true; Request->Source = "LOCAL"sv; } } } } if (!Request->Exists && EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::QueryRemote)) { if (EnumHasAllFlags(Request->DownstreamPolicy, CachePolicy::StoreLocal)) { // Convert the Offset,Size request into a request for the entire value; we will need it all to be able to store it locally Request->Key->RawOffset = 0; Request->Key->RawSize = UINT64_MAX; } OutUpstreamChunks.push_back(Request->Key); } } } void HttpStructuredCacheService::GetUpstreamCacheChunks(std::vector& UpstreamChunks, std::vector& RequestKeys, std::vector& Requests) { using namespace cache::detail; if (!UpstreamChunks.empty()) { const auto OnCacheValueGetComplete = [this, &RequestKeys, &Requests](CacheValueGetCompleteParams&& Params) { if (Params.RawHash == Params.RawHash.Zero) { return; } CacheChunkRequest& Key = Params.Request; size_t RequestIndex = std::distance(RequestKeys.data(), &Key); ChunkRequest& Request = Requests[RequestIndex]; if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal) || !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Value)); if (!Compressed || Compressed.GetRawSize() != Params.RawSize || IoHash::FromBLAKE3(Compressed.GetRawHash()) != Params.RawHash) { return; } if (EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::StoreLocal)) { if (Request.IsRecordRequest) { m_CidStore.AddChunk(Compressed); } else { m_CacheStore.Put(Key.Key.Bucket, Key.Key.Hash, {.Value = Params.Value}); } } if (!EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { Request.Value = std::move(Compressed); } } Key.ChunkId = Params.RawHash; Request.Exists = true; Request.TotalSize = Params.RawSize; Request.TotalSizeKnown = true; Request.Source = "UPSTREAM"sv; m_CacheStats.UpstreamHitCount++; }; m_UpstreamCache.GetCacheValues(UpstreamChunks, std::move(OnCacheValueGetComplete)); } } void HttpStructuredCacheService::WriteGetCacheChunksResponse(std::vector& Requests, zen::HttpServerRequest& HttpRequest) { using namespace cache::detail; CbPackage RpcResponse; CbObjectWriter Writer; Writer.BeginArray("Result"sv); for (ChunkRequest& Request : Requests) { Writer.BeginObject(); { if (Request.Exists) { Writer.AddHash("RawHash"sv, Request.Key->ChunkId); if (Request.Value && !EnumHasAllFlags(Request.DownstreamPolicy, CachePolicy::SkipData)) { RpcResponse.AddAttachment(CbAttachment(Request.Value)); } else { Writer.AddInteger("RawSize"sv, Request.TotalSize); } ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId, NiceBytes(Request.TotalSize), Request.IsRecordRequest ? "Record"sv : "Value"sv, Request.Source); m_CacheStats.HitCount++; } else if (!EnumHasAnyFlags(Request.DownstreamPolicy, CachePolicy::Query)) { ZEN_DEBUG("SKIP - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); } else { ZEN_DEBUG("MISS - '{}/{}/{}'", Request.Key->Key.Bucket, Request.Key->Key.Hash, Request.Key->ValueId); m_CacheStats.MissCount++; } } Writer.EndObject(); } Writer.EndArray(); RpcResponse.SetObject(Writer.Save()); BinaryWriter MemStream; RpcResponse.Save(MemStream); HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); } void HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; EmitSnapshot("requests", m_HttpRequests, Cbo); EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); const uint64_t HitCount = m_CacheStats.HitCount; const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; const uint64_t MissCount = m_CacheStats.MissCount; const uint64_t TotalCount = HitCount + MissCount; const CasStoreSize CasSize = m_CidStore.CasSize(); const GcStorageSize CacheSize = m_CacheStore.StorageSize(); Cbo.BeginObject("cache"); Cbo.BeginObject("size"); Cbo << "disk" << CacheSize.DiskSize; Cbo << "memory" << CacheSize.MemorySize; Cbo.EndObject(); Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); Cbo << "hits" << HitCount << "misses" << MissCount; Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0); Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); Cbo.EndObject(); Cbo.BeginObject("upstream"); m_UpstreamCache.GetStatus(Cbo); Cbo.EndObject(); Cbo.BeginObject("cas"); Cbo.BeginObject("size"); Cbo << "tiny" << CasSize.TinySize; Cbo << "small" << CasSize.SmallSize; Cbo << "large" << CasSize.LargeSize; Cbo << "total" << CasSize.TotalSize; Cbo.EndObject(); Cbo.EndObject(); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; Cbo << "ok" << true; Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } } // namespace zen