// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include #include #include #include #include #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" #include #include #include namespace zen { using namespace std::literals; zen::HttpContentType MapToHttpContentType(zen::ZenContentType Type) { switch (Type) { default: case zen::ZenContentType::kBinary: return zen::HttpContentType::kBinary; case zen::ZenContentType::kCbObject: return zen::HttpContentType::kCbObject; case zen::ZenContentType::kCbPackage: return zen::HttpContentType::kCbPackage; case zen::ZenContentType::kText: return zen::HttpContentType::kText; case zen::ZenContentType::kJSON: return zen::HttpContentType::kJSON; case zen::ZenContentType::kYAML: return zen::HttpContentType::kYAML; } }; HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore) : m_Log("cache", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) , m_CasStore(InStore) , m_CacheStore(InStore, RootPath) , m_CidStore(InCidStore) { m_Log.set_level(spdlog::level::debug); m_Log.info("initializing structured cache at '{}'", RootPath); #if 0 m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, "ue4.ddc"sv /* namespace */, "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */, "0oao91lrhqPiAlaGD0x7"sv /* client id */, "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); #endif #if 0 std::string_view UpstreamSpec = "http://arn-wd-15192.epicgames.net"sv; m_Log.info("Using upstream Zen cache at '{}'", UpstreamSpec); m_ZenClient = new ZenStructuredCacheClient(UpstreamSpec); #endif } HttpStructuredCacheService::~HttpStructuredCacheService() { spdlog::info("closing structured cache"); } const char* HttpStructuredCacheService::BaseUri() const { return "/z$/"; } void HttpStructuredCacheService::Flush() { } void HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) { CacheRef Ref; if (!ValidateUri(Request, /* out */ Ref)) { return Request.WriteResponse(zen::HttpResponse::BadRequest); // invalid URL } if (Ref.PayloadId == IoHash::Zero) { return HandleCacheRecordRequest(Request, Ref); } else { return HandleCachePayloadRequest(Request, Ref); } return; } void HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref) { switch (auto Verb = Request.RequestVerb()) { using enum zen::HttpVerb; case kHead: case kGet: { ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); if (!Success && m_ZenClient) { ZenStructuredCacheSession Session(*m_ZenClient); zen::Stopwatch Timer; try { Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); m_Log.debug("Zen upstream GET ({}/{}) succeeded after {:5}!", Ref.BucketSegment, Ref.HashKey, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); // TODO: this is incomplete and needs to propagate any referenced content Success = true; } catch (std::exception& e) { m_Log.warn("Zen upstream GET ({}/{}) FAILED after {:5}: '{}'", Ref.BucketSegment, Ref.HashKey, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), e.what()); throw; } } if (m_Cloud) { // Note that this is not fully functional, pending implementation work on // the Jupiter end CloudCacheSession Session(m_Cloud); zen::Stopwatch Timer; try { Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); m_Log.debug("Cloud upstream GET ({}/{}) succeeded after {:5}!", Ref.BucketSegment, Ref.HashKey, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); // TODO: this is incomplete and needs to propagate any referenced content } catch (std::exception& e) { m_Log.warn("Cloud upstream GET ({}/{}) FAILED after {:5}: '{}'", Ref.BucketSegment, Ref.HashKey, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), e.what()); throw; } } if (!Success) { m_Log.debug("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); return Request.WriteResponse(zen::HttpResponse::NotFound); } if (Verb == kHead) { Request.SetSuppressResponseBody(); } m_Log.debug("HIT - '{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Value.Value.Size(), Value.Value.GetContentType()); return Request.WriteResponse(zen::HttpResponse::OK, MapToHttpContentType(Value.Value.GetContentType()), Value.Value); } break; case kPut: { if (zen::IoBuffer Body = Request.ReadPayload()) { if (Body.Size() == 0) { return Request.WriteResponse(zen::HttpResponse::BadRequest); } ZenCacheValue Value; Value.Value = Body; HttpContentType ContentType = Request.RequestContentType(); bool IsCompactBinary; switch (ContentType) { case HttpContentType::kUnknownContentType: case HttpContentType::kBinary: IsCompactBinary = false; break; case HttpContentType::kCbObject: IsCompactBinary = true; break; default: return Request.WriteResponse(zen::HttpResponse::BadRequest); } // Compute index data if (IsCompactBinary) { // Validate payload before accessing it zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); if (ValidationResult != CbValidateError::None) { m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); // TODO: add details in response return Request.WriteResponse(HttpResponse::BadRequest); } // Extract data for index zen::CbObjectView Cbo(Body.Data()); std::vector References; Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); if (!References.empty()) { zen::CbObjectWriter Idx; Idx.BeginArray("r"); for (const IoHash& Hash : References) { Idx.AddHash(Hash); } Idx.EndArray(); // TODO: store references in index } } m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); // This is currently synchronous for simplicity and debuggability but should // absolutely be made asynchronous. By default these should be deferred // because the client should not care if the data has propagated upstream or // not if (m_ZenClient) { ZenStructuredCacheSession Session(*m_ZenClient); zen::Stopwatch Timer; try { Session.Put(Ref.BucketSegment, Ref.HashKey, Value); m_Log.debug("Zen upstream PUT ({}) succeeded after {:5}!", Ref.HashKey, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); } catch (std::exception& e) { m_Log.warn("Zen upstream PUT ({}) FAILED after {:5}: '{}'", Ref.HashKey, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), e.what()); throw; } } if (m_Cloud) { CloudCacheSession Session(m_Cloud); zen::Stopwatch Timer; try { Session.Put(Ref.BucketSegment, Ref.HashKey, Value); m_Log.debug("upstream PUT ({}) succeeded after {:5}!", Ref.HashKey, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); } catch (std::exception& e) { m_Log.warn("upstream PUT ({}) FAILED after {:5}: '{}'", Ref.HashKey, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), e.what()); throw; } } return Request.WriteResponse(zen::HttpResponse::Created); } else { return; } } break; case kPost: break; default: break; } } void HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref) { // Note: the URL references the uncompressed payload hash - so this maintains the mapping // from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash // // this is a PITA but a consequence of the fact that the client side code is not able to // address data by compressed hash switch (auto Verb = Request.RequestVerb()) { using enum zen::HttpVerb; case kHead: case kGet: { zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); if (!Payload) { m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, Payload.Size(), Payload.GetContentType()); return Request.WriteResponse(zen::HttpResponse::NotFound); } m_Log.debug("HIT - '{}/{}/{}' ({} bytes, {})", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, Payload.Size(), Payload.GetContentType()); if (Verb == kHead) { Request.SetSuppressResponseBody(); } return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Payload); } break; case kPut: { if (zen::IoBuffer Body = Request.ReadPayload()) { if (Body.Size() == 0) { return Request.WriteResponse(zen::HttpResponse::BadRequest, HttpContentType::kText, "Empty payload not permitted"); } zen::IoHash ChunkHash = zen::IoHash::HashMemory(Body); zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(Body)); if (!Compressed) { // All attachment payloads need to be in compressed buffer format return Request.WriteResponse(zen::HttpResponse::BadRequest, HttpContentType::kText, "Attachments must be compressed"); } else { if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) { // the URL specified content id and content hashes don't match! return Request.WriteResponse(HttpResponse::BadRequest); } zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); if (Result.New) { return Request.WriteResponse(zen::HttpResponse::Created); } else { return Request.WriteResponse(zen::HttpResponse::OK); } } } } break; case kPost: break; default: break; } } bool HttpStructuredCacheService::ValidateUri(zen::HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); if (BucketSplitOffset == std::string_view::npos) { return false; } OutRef.BucketSegment = Key.substr(0, BucketSplitOffset); if (!std::all_of(begin(OutRef.BucketSegment), end(OutRef.BucketSegment), [](const char c) { return std::isalnum(c); })) { return false; } std::string_view HashSegment; std::string_view PayloadSegment; std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/'); // We know there is a slash so no need to check for npos return if (PayloadSplitOffset == BucketSplitOffset) { // Basic cache record lookup HashSegment = Key.substr(BucketSplitOffset + 1); } else { // Cache record + payload lookup HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1); PayloadSegment = Key.substr(PayloadSplitOffset + 1); } if (HashSegment.size() != zen::IoHash::StringLength) { return false; } if (!PayloadSegment.empty() && PayloadSegment.size() == zen::IoHash::StringLength) { const bool IsOk = zen::ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); if (!IsOk) { return false; } } else { OutRef.PayloadId = zen::IoHash::Zero; } const bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); if (!IsOk) { return false; } return true; } } // namespace zen