// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include #include #include #include #include #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" #include "zenstore/cidstore.h" #include #include namespace zen { using namespace std::literals; HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore) : m_CasStore(InStore) , m_CacheStore(InStore, RootPath) , m_CidStore(InCidStore) { spdlog::info("initializing structured cache at '{}'", RootPath); #if 0 m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, "ue4.ddc"sv /* namespace */, "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv /* provider */, "0oao91lrhqPiAlaGD0x7"sv /* client id */, "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); #endif } HttpStructuredCacheService::~HttpStructuredCacheService() { spdlog::info("closing structured cache"); } const char* HttpStructuredCacheService::BaseUri() const { return "/z$/"; } void HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) { CacheRef Ref; if (!ValidateUri(Request, /* out */ Ref)) { return Request.WriteResponse(zen::HttpResponse::BadRequest); // invalid URL } if (Ref.PayloadId == IoHash::Zero) { return HandleCacheRecordRequest(Request, Ref); } else { return HandleCachePayloadRequest(Request, Ref); } return; } void HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref) { switch (auto Verb = Request.RequestVerb()) { using enum zen::HttpVerb; case kHead: case kGet: { ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); if (!Success) { return Request.WriteResponse(zen::HttpResponse::NotFound); } if (Verb == kHead) { Request.SetSuppressResponseBody(); } return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value); } break; case kPut: { if (zen::IoBuffer Body = Request.ReadPayload()) { if (Body.Size() == 0) { return Request.WriteResponse(zen::HttpResponse::BadRequest); } ZenCacheValue Value; Value.Value = Body; HttpContentType ContentType = Request.RequestContentType(); bool IsCompactBinary; switch (ContentType) { case HttpContentType::kUnknownContentType: case HttpContentType::kBinary: IsCompactBinary = false; break; case HttpContentType::kCbObject: IsCompactBinary = true; break; default: return Request.WriteResponse(zen::HttpResponse::BadRequest); } // Compute index data if (IsCompactBinary) { // Validate payload before accessing it zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); if (ValidationResult != CbValidateError::None) { // TODO: add details in response return Request.WriteResponse(HttpResponse::BadRequest); } // Extract data for index zen::CbObjectView Cbo(Body.Data()); std::vector References; Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); if (!References.empty()) { zen::CbObjectWriter Idx; Idx.BeginArray("r"); for (const IoHash& Hash : References) { Idx.AddHash(Hash); } Idx.EndArray(); } // TODO: store references in index } m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); // This is currently synchronous for simplicity and debuggability but should be // made asynchronous if (m_Cloud) { CloudCacheSession Session(m_Cloud); zen::Stopwatch Timer; try { Session.Put(Ref.BucketSegment, Ref.HashKey, Value); spdlog::debug("upstream PUT ({}) succeeded after {:5}!", Ref.HashKey, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); } catch (std::exception& e) { spdlog::debug("upstream PUT ({}) failed after {:5}: '{}'", Ref.HashKey, zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), e.what()); throw; } } return Request.WriteResponse(zen::HttpResponse::Created); } else { return; } } break; case kPost: break; default: break; } } void HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref) { // Note: the URL references the uncompressed payload hash - so this maintains the mapping // from uncompressed CAS identity to the stored payload hash // // this is a PITA but a consequence of the fact that the client side code is not able to // address data by compressed hash switch (auto Verb = Request.RequestVerb()) { using enum zen::HttpVerb; case kHead: case kGet: { // TODO: need to map from uncompressed content address into the storage // (compressed) content address zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); if (!Payload) { return Request.WriteResponse(zen::HttpResponse::NotFound); } if (Verb == kHead) { Request.SetSuppressResponseBody(); } return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Payload); } break; case kPut: { if (zen::IoBuffer Body = Request.ReadPayload()) { if (Body.Size() == 0) { return Request.WriteResponse(zen::HttpResponse::BadRequest); } zen::IoHash ChunkHash = zen::IoHash::HashMemory(Body); zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(Body)); if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) { // the URL specified content id and content hashes don't match! return Request.WriteResponse(HttpResponse::BadRequest); } zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); if (Result.New) { return Request.WriteResponse(zen::HttpResponse::Created); } else { return Request.WriteResponse(zen::HttpResponse::OK); } } } break; case kPost: break; default: break; } } bool HttpStructuredCacheService::ValidateUri(zen::HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); if (BucketSplitOffset == std::string_view::npos) { return false; } OutRef.BucketSegment = Key.substr(0, BucketSplitOffset); std::string_view HashSegment; std::string_view PayloadSegment; std::string_view::size_type PayloadSplitOffset = Key.find_last_of('/'); // We know there is a slash so no need to check for npos return if (PayloadSplitOffset == BucketSplitOffset) { // Basic cache record lookup HashSegment = Key.substr(BucketSplitOffset + 1); } else { // Cache record + payload lookup HashSegment = Key.substr(BucketSplitOffset + 1, PayloadSplitOffset - BucketSplitOffset - 1); PayloadSegment = Key.substr(PayloadSplitOffset + 1); } if (HashSegment.size() != zen::IoHash::StringLength) { return false; } if (!PayloadSegment.empty() && PayloadSegment.size() == zen::IoHash::StringLength) { const bool IsOk = zen::ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); if (!IsOk) { return false; } } else { OutRef.PayloadId = zen::IoHash::Zero; } const bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); if (!IsOk) { return false; } return true; } } // namespace zen