// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include #include #include #include #include #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" #include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" #include #include #include #include #include #include namespace zen { using namespace std::literals; zen::HttpContentType MapToHttpContentType(zen::ZenContentType Type) { switch (Type) { default: case zen::ZenContentType::kBinary: return zen::HttpContentType::kBinary; case zen::ZenContentType::kCbObject: return zen::HttpContentType::kCbObject; case zen::ZenContentType::kCbPackage: return zen::HttpContentType::kCbPackage; case zen::ZenContentType::kText: return zen::HttpContentType::kText; case zen::ZenContentType::kJSON: return zen::HttpContentType::kJSON; case zen::ZenContentType::kYAML: return zen::HttpContentType::kYAML; } }; ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore, zen::CasStore& InStore, zen::CidStore& InCidStore, std::unique_ptr UpstreamCache) : m_Log("cache", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) , m_CacheStore(InCacheStore) , m_CasStore(InStore) , m_CidStore(InCidStore) , m_UpstreamCache(std::move(UpstreamCache)) { m_Log.set_level(spdlog::level::debug); } HttpStructuredCacheService::~HttpStructuredCacheService() { spdlog::info("closing structured cache"); } const char* HttpStructuredCacheService::BaseUri() const { return "/z$/"; } void HttpStructuredCacheService::Flush() { } void HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) { CacheRef Ref; if (!ValidateKeyUri(Request, /* out */ Ref)) { std::string_view Key = Request.RelativeUri(); if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference return HandleCacheBucketRequest(Request, Key); } return Request.WriteResponse(zen::HttpResponse::BadRequest); // invalid URL } if (Ref.PayloadId == IoHash::Zero) { return HandleCacheRecordRequest(Request, Ref); } else { return HandleCachePayloadRequest(Request, Ref); } return; } void HttpStructuredCacheService::HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket) { ZEN_UNUSED(Request, Bucket); switch (auto Verb = Request.RequestVerb()) { using enum zen::HttpVerb; case kHead: case kGet: { // Query stats } break; case kDelete: // Drop bucket if (m_CacheStore.DropBucket(Bucket)) { return Request.WriteResponse(zen::HttpResponse::OK); } else { return Request.WriteResponse(zen::HttpResponse::NotFound); } break; } } void HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref) { switch (auto Verb = Request.RequestVerb()) { using enum zen::HttpVerb; case kHead: case kGet: { ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); if (!Success && m_UpstreamCache) { const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : ZenContentType::kCbObject; if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); UpstreamResult.Success) { Value.Value = UpstreamResult.Value; Success = true; if (CacheRecordType == ZenContentType::kCbObject) { const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(MemoryView(UpstreamResult.Value.Data(), UpstreamResult.Value.Size()), zen::CbValidateMode::All); if (ValidationResult == CbValidateError::None) { zen::CbObjectView Cbo(UpstreamResult.Value.Data()); std::vector References; Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); if (!References.empty()) { zen::CbObjectWriter Idx; Idx.BeginArray("references"); for (const IoHash& Hash : References) { Idx.AddHash(Hash); } Idx.EndArray(); Value.IndexData = Idx.Save(); } } else { Value.Value = IoBuffer(); Success = false; m_Log.warn("Upstream cache record '{}/{}' failed validation", Ref.BucketSegment, Ref.HashKey); } } if (Success) { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); } } } if (!Success) { m_Log.debug("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); return Request.WriteResponse(zen::HttpResponse::NotFound); } if (Verb == kHead) { Request.SetSuppressResponseBody(); } m_Log.debug("HIT - '{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Value.Value.Size(), Value.Value.GetContentType()); return Request.WriteResponse(zen::HttpResponse::OK, MapToHttpContentType(Value.Value.GetContentType()), Value.Value); } break; case kPut: { zen::IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { return Request.WriteResponse(zen::HttpResponse::BadRequest); } const HttpContentType ContentType = Request.RequestContentType(); bool IsCompactBinary = false; switch (ContentType) { case HttpContentType::kUnknownContentType: case HttpContentType::kBinary: IsCompactBinary = false; break; case HttpContentType::kCbObject: IsCompactBinary = true; break; default: return Request.WriteResponse(zen::HttpResponse::BadRequest); } if (!IsCompactBinary) { // TODO: create a cache record and put value in CAS? m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); m_Log.debug("PUT (binary) - '{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Body.Size(), Body.GetContentType()); if (m_UpstreamCache) { auto Result = m_UpstreamCache->EnqueueUpstream( {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); ZEN_ASSERT(Result.Success); } return Request.WriteResponse(zen::HttpResponse::Created); } // Validate payload before accessing it const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); if (ValidationResult != CbValidateError::None) { m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); // TODO: add details in response, kText || kCbObject? return Request.WriteResponse(HttpResponse::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); } // Extract referenced payload hashes zen::CbObjectView Cbo(Body.Data()); std::vector References; std::vector MissingRefs; Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); ZenCacheValue CacheValue; CacheValue.Value = Body; if (!References.empty()) { zen::CbObjectWriter Idx; Idx.BeginArray("references"); for (const IoHash& Hash : References) { Idx.AddHash(Hash); if (!m_CidStore.ContainsChunk(Hash)) { MissingRefs.push_back(Hash); } } Idx.EndArray(); CacheValue.IndexData = Idx.Save(); } m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); m_Log.debug("PUT (cache record) - '{}/{}' ({} bytes, {}, ({}/{} refs/missing))", Ref.BucketSegment, Ref.HashKey, CacheValue.Value.Size(), CacheValue.Value.GetContentType(), References.size(), MissingRefs.size()); if (MissingRefs.empty()) { // Only enqueue valid cache records, i.e. all referenced payloads exists if (m_UpstreamCache) { auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(References)}); ZEN_ASSERT(Result.Success); } return Request.WriteResponse(zen::HttpResponse::Created); } else { // TODO: Binary attachments? zen::CbObjectWriter Response; Response.BeginArray("needs"); for (const IoHash& MissingRef : MissingRefs) { Response.AddHash(MissingRef); m_Log.debug("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); } Response.EndArray(); // Return Created | BadRequest? return Request.WriteResponse(zen::HttpResponse::Created, Response.Save()); } } break; case kPost: break; default: break; } } void HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref) { // Note: the URL references the uncompressed payload hash - so this maintains the mapping // from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash // // this is a PITA but a consequence of the fact that the client side code is not able to // address data by compressed hash switch (auto Verb = Request.RequestVerb()) { using enum zen::HttpVerb; case kHead: case kGet: { zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); if (!Payload && m_UpstreamCache) { if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); UpstreamResult.Success) { if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Payload))) { Payload = UpstreamResult.Payload; zen::IoHash ChunkHash = zen::IoHash::HashMemory(Payload); zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); } else { m_Log.warn("got uncompressed upstream cache payload"); } } } if (!Payload) { m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, Payload.Size(), Payload.GetContentType()); return Request.WriteResponse(zen::HttpResponse::NotFound); } m_Log.debug("HIT - '{}/{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, Payload.Size(), Payload.GetContentType()); if (Verb == kHead) { Request.SetSuppressResponseBody(); } return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Payload); } break; case kPut: { if (zen::IoBuffer Body = Request.ReadPayload()) { if (Body.Size() == 0) { return Request.WriteResponse(zen::HttpResponse::BadRequest, HttpContentType::kText, "Empty payload not permitted"); } zen::IoHash ChunkHash = zen::IoHash::HashMemory(Body); zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(Body)); if (!Compressed) { // All attachment payloads need to be in compressed buffer format return Request.WriteResponse(zen::HttpResponse::BadRequest, HttpContentType::kText, "Attachments must be compressed"); } else { if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) { // the URL specified content id and content hashes don't match! return Request.WriteResponse(HttpResponse::BadRequest); } zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); m_Log.debug("PUT ({}) - '{}/{}/{}' ({} bytes, {})", Result.New ? "NEW" : "OLD", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, Body.Size(), Body.GetContentType()); if (Result.New) { return Request.WriteResponse(zen::HttpResponse::Created); } else { return Request.WriteResponse(zen::HttpResponse::OK); } } } } break; case kPost: break; default: break; } } bool HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); if (BucketSplitOffset == std::string_view::npos) { return false; } OutRef.BucketSegment = ToLower(Key.substr(0, BucketSplitOffset)); if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); })) { return false; } std::string_view HashSegment; std::string_view PayloadSegment; std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/'); // We know there is a slash so no need to check for npos return if (PayloadSplitOffset == BucketSplitOffset) { // Basic cache record lookup HashSegment = Key.substr(BucketSplitOffset + 1); } else { // Cache record + payload lookup HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1); PayloadSegment = Key.substr(PayloadSplitOffset + 1); } if (HashSegment.size() != zen::IoHash::StringLength) { return false; } if (!PayloadSegment.empty() && PayloadSegment.size() == zen::IoHash::StringLength) { const bool IsOk = zen::ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); if (!IsOk) { return false; } } else { OutRef.PayloadId = zen::IoHash::Zero; } const bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); if (!IsOk) { return false; } return true; } } // namespace zen