// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" #include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" #include #include #include #include #include #include #include namespace zen { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// namespace detail { namespace cacheopt { constexpr std::string_view Local = "local"sv; constexpr std::string_view Remote = "remote"sv; constexpr std::string_view Data = "data"sv; constexpr std::string_view Meta = "meta"sv; constexpr std::string_view Value = "value"sv; constexpr std::string_view Attachments = "attachments"sv; }} // namespace detail::cacheopt ////////////////////////////////////////////////////////////////////////// enum class CachePolicy : uint8_t { None = 0, QueryLocal = 1 << 0, QueryRemote = 1 << 1, Query = QueryLocal | QueryRemote, StoreLocal = 1 << 2, StoreRemote = 1 << 3, Store = StoreLocal | StoreRemote, SkipMeta = 1 << 4, SkipValue = 1 << 5, SkipAttachments = 1 << 6, SkipData = SkipMeta | SkipValue | SkipAttachments, SkipLocalCopy = 1 << 7, Local = QueryLocal | StoreLocal, Remote = QueryRemote | StoreRemote, Default = Query | Store, Disable = None, }; gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); CachePolicy ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) { CachePolicy QueryPolicy = CachePolicy::Query; { std::string_view Opts = QueryParams.GetValue("query"sv); if (!Opts.empty()) { QueryPolicy = CachePolicy::None; ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) { if (Opt == detail::cacheopt::Local) { QueryPolicy |= CachePolicy::QueryLocal; } if (Opt == detail::cacheopt::Remote) { QueryPolicy |= CachePolicy::QueryRemote; } return true; }); } } CachePolicy StorePolicy = CachePolicy::Store; { std::string_view Opts = QueryParams.GetValue("store"sv); if (!Opts.empty()) { StorePolicy = CachePolicy::None; ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) { if (Opt == detail::cacheopt::Local) { StorePolicy |= CachePolicy::StoreLocal; } if (Opt == detail::cacheopt::Remote) { StorePolicy |= CachePolicy::StoreRemote; } return true; }); } } CachePolicy SkipPolicy = CachePolicy::None; { std::string_view Opts = QueryParams.GetValue("skip"sv); if (!Opts.empty()) { ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) { if (Opt == detail::cacheopt::Meta) { SkipPolicy |= CachePolicy::SkipMeta; } if (Opt == detail::cacheopt::Value) { SkipPolicy |= CachePolicy::SkipValue; } if (Opt == detail::cacheopt::Attachments) { SkipPolicy |= CachePolicy::SkipAttachments; } if (Opt == detail::cacheopt::Data) { SkipPolicy |= CachePolicy::SkipData; } return true; }); } } return QueryPolicy | StorePolicy | SkipPolicy; } ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, CasStore& InStore, CidStore& InCidStore, std::unique_ptr UpstreamCache) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_CasStore(InStore) , m_CidStore(InCidStore) , m_UpstreamCache(std::move(UpstreamCache)) { } HttpStructuredCacheService::~HttpStructuredCacheService() { ZEN_INFO("closing structured cache"); } const char* HttpStructuredCacheService::BaseUri() const { return "/z$/"; } void HttpStructuredCacheService::Flush() { } void HttpStructuredCacheService::Scrub(ScrubContext& Ctx) { if (m_LastScrubTime == Ctx.ScrubTimestamp()) { return; } m_LastScrubTime = Ctx.ScrubTimestamp(); m_CasStore.Scrub(Ctx); m_CidStore.Scrub(Ctx); m_CacheStore.Scrub(Ctx); } void HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { CacheRef Ref; if (!ValidateKeyUri(Request, /* out */ Ref)) { std::string_view Key = Request.RelativeUri(); if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference return HandleCacheBucketRequest(Request, Key); } return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } const auto QueryParams = Request.GetQueryParams(); CachePolicy Policy = ParseCachePolicy(QueryParams); if (Ref.PayloadId == IoHash::Zero) { return HandleCacheRecordRequest(Request, Ref, Policy); } else { return HandleCachePayloadRequest(Request, Ref, Policy); } return; } void HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Bucket) { ZEN_UNUSED(Request, Bucket); switch (auto Verb = Request.RequestVerb()) { using enum HttpVerb; case kHead: case kGet: { // Query stats } break; case kDelete: // Drop bucket if (m_CacheStore.DropBucket(Bucket)) { return Request.WriteResponse(HttpResponseCode::OK); } else { return Request.WriteResponse(HttpResponseCode::NotFound); } break; } } void HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { switch (auto Verb = Request.RequestVerb()) { using enum HttpVerb; case kHead: case kGet: { HandleGetCacheRecord(Request, Ref, Policy); if (Verb == kHead) { Request.SetSuppressResponseBody(); } } break; case kPut: HandlePutCacheRecord(Request, Ref, Policy); break; default: break; } } void HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { const ZenContentType AcceptType = Request.AcceptContentType(); ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); bool InUpstreamCache = false; const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); if (QueryUpstream) { const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage : ZenContentType::kCbObject; if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); UpstreamResult.Success) { Value.Value = UpstreamResult.Value; Success = true; InUpstreamCache = true; if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) { if (CacheRecordType == ZenContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); if (ValidationResult == CbValidateError::None) { CbObjectView CacheRecord(UpstreamResult.Value.Data()); CbObjectWriter IndexData; IndexData.BeginArray("references"); CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); IndexData.EndArray(); Value.IndexData = IndexData.Save(); } else { Success = false; ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream", Ref.BucketSegment, Ref.HashKey); } } if (Success) { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); } } else { ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); CbPackage Package; if (Package.TryLoad(UpstreamResult.Value)) { uint32_t AttachmentCount = 0; uint32_t ValidCount = 0; CbObject CacheRecord = Package.GetObject(); CacheRecord.IterateAttachments([this, &Package, &Ref, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) { if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) { if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) { m_CidStore.AddChunk(Chunk); ValidCount++; } else { ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed", Ref.BucketSegment, Ref.HashKey); } } AttachmentCount++; }); if (ValidCount == AttachmentCount) { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments)) { CbPackage PackageWithoutAttachments; PackageWithoutAttachments.SetObject(CacheRecord); BinaryWriter MemStream; PackageWithoutAttachments.Save(MemStream); Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); } } else { Success = false; ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package", Ref.BucketSegment, Ref.HashKey); } } else { Success = false; ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey); } } } } if (!Success) { ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); return Request.WriteResponse(HttpResponseCode::NotFound); } if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) { CbObjectView CacheRecord(Value.Value.Data()); const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All); if (ValidationResult != CbValidateError::None) { ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); } const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments); uint32_t AttachmentCount = 0; uint32_t ValidCount = 0; uint64_t AttachmentBytes = 0ull; CbPackage Package; if (!SkipAttachments) { CacheRecord.IterateAttachments( [this, &Ref, &Package, &AttachmentCount, &ValidCount, &AttachmentBytes](CbFieldView AttachmentHash) { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); AttachmentBytes += Chunk.Size(); ValidCount++; } AttachmentCount++; }); if (ValidCount != AttachmentCount) { ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", Ref.BucketSegment, Ref.HashKey, ValidCount, AttachmentCount); return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); } } Package.SetObject(LoadCompactBinaryObject(Value.Value)); ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments (LOCAL)", Ref.BucketSegment, Ref.HashKey, NiceBytes(AttachmentBytes + Value.Value.Size()), AttachmentCount); BinaryWriter MemStream; Package.Save(MemStream); IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); } else { ZEN_DEBUG("HIT - '{}/{}' {} ({})", Ref.BucketSegment, Ref.HashKey, NiceBytes(Value.Value.Size()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); } } void HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { return Request.WriteResponse(HttpResponseCode::BadRequest); } const HttpContentType ContentType = Request.RequestContentType(); const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote)); if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); if (StoreUpstream) { ZEN_ASSERT(m_UpstreamCache); auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); } Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); if (ValidationResult != CbValidateError::None) { ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary", Ref.BucketSegment, Ref.HashKey, Body.Size()); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); } CbObjectView CacheRecord(Body.Data()); std::vector ValidAttachments; uint32_t AttachmentCount = 0; CacheRecord.IterateAttachments([this, &AttachmentCount, &ValidAttachments](CbFieldView AttachmentHash) { const IoHash Hash = AttachmentHash.AsHash(); if (m_CidStore.ContainsChunk(Hash)) { ValidAttachments.emplace_back(Hash); } AttachmentCount++; }); const uint32_t ValidCount = static_cast(ValidAttachments.size()); const bool ValidCacheRecord = ValidCount == AttachmentCount; if (ValidCacheRecord) { ZEN_DEBUG("PUT - cache record '{}/{}' {}, {} attachments", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ValidCount); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); if (StoreUpstream) { ZEN_ASSERT(m_UpstreamCache); auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); } else { ZEN_WARN("PUT - cache record '{}/{}' FAILED, found {}/{} attachments", Ref.BucketSegment, Ref.HashKey, ValidCount, AttachmentCount); Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Missing attachments"sv); } } else if (ContentType == HttpContentType::kCbPackage) { CbPackage Package; if (!Package.TryLoad(Body)) { ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey); return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); } CbObject CacheRecord = Package.GetObject(); std::span Attachments = Package.GetAttachments(); std::vector ValidAttachments; int32_t NewAttachmentCount = 0; ValidAttachments.reserve(Attachments.size()); CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &NewAttachmentCount](CbFieldView AttachmentHash) { if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) { if (Attachment->IsCompressedBinary()) { CompressedBuffer Chunk = Attachment->AsCompressedBinary(); CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); ValidAttachments.emplace_back(InsertResult.DecompressedId); if (InsertResult.New) { NewAttachmentCount++; } } else { ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed", Ref.BucketSegment, Ref.HashKey, AttachmentHash.AsHash()); } } else { ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'", Ref.BucketSegment, Ref.HashKey, AttachmentHash.AsHash()); } }); const bool AttachmentsValid = ValidAttachments.size() == Attachments.size(); if (!AttachmentsValid) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv); } ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} new attachments", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.GetSize()), NewAttachmentCount, Attachments.size()); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); if (StoreUpstream) { ZEN_ASSERT(m_UpstreamCache); auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); } Request.WriteResponse(HttpResponseCode::Created); } else { Request.WriteResponse(HttpResponseCode::BadRequest); } } void HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { switch (auto Verb = Request.RequestVerb()) { using enum HttpVerb; case kHead: case kGet: { HandleGetCachePayload(Request, Ref, Policy); if (Verb == kHead) { Request.SetSuppressResponseBody(); } } break; case kPut: HandlePutCachePayload(Request, Ref, Policy); break; default: break; } } void HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); bool InUpstreamCache = false; const bool QueryUpstream = !Payload && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); if (QueryUpstream) { if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { Payload = UpstreamResult.Value; IoHash ChunkHash = IoHash::HashBuffer(Payload); CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); InUpstreamCache = true; m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); } else { ZEN_WARN("got uncompressed upstream cache payload"); } } } if (!Payload) { ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); return Request.WriteResponse(HttpResponseCode::NotFound); } ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, NiceBytes(Payload.Size()), Payload.GetContentType(), InUpstreamCache ? "UPSTREAM" : "LOCAL"); Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); } void HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { // Note: Individual cache payloads are not propagated upstream until a valid cache record has been stored ZEN_UNUSED(Policy); IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { return Request.WriteResponse(HttpResponseCode::BadRequest); } IoHash ChunkHash = IoHash::HashBuffer(Body); CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); if (!Compressed) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); } if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv); } CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, NiceBytes(Body.Size()), Body.GetContentType(), Result.New ? "NEW" : "OLD"); const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; Request.WriteResponse(ResponseCode); } bool HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); if (BucketSplitOffset == std::string_view::npos) { return false; } OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset)); if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); })) { return false; } std::string_view HashSegment; std::string_view PayloadSegment; std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/'); // We know there is a slash so no need to check for npos return if (PayloadSplitOffset == BucketSplitOffset) { // Basic cache record lookup HashSegment = Key.substr(BucketSplitOffset + 1); } else { // Cache record + payload lookup HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1); PayloadSegment = Key.substr(PayloadSplitOffset + 1); } if (HashSegment.size() != IoHash::StringLength) { return false; } if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength) { const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); if (!IsOk) { return false; } } else { OutRef.PayloadId = IoHash::Zero; } const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); if (!IsOk) { return false; } return true; } } // namespace zen