From e664bbc31ff4ea866401c9303c53108242a6cb27 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 12 Aug 2021 10:05:20 +0200 Subject: Implemented flush operations for cache services Also implemented basic upstream query interface, which needs a bit more work to be fully functional (chunk propagation / fetching and new propagation policies as per DDC requirements) --- zenserver/cache/structuredcache.cpp | 184 ++++++++++++++++++++++++++++++++---- 1 file changed, 167 insertions(+), 17 deletions(-) (limited to 'zenserver/cache/structuredcache.cpp') diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 9083f764e..7f6f49e06 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -12,6 +12,7 @@ #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" +#include "upstream/zen.h" #include "zenstore/cidstore.h" #include @@ -22,12 +23,36 @@ namespace zen { using namespace std::literals; +zen::HttpContentType +MapToHttpContentType(zen::ZenContentType Type) +{ + switch (Type) + { + default: + case zen::ZenContentType::kBinary: + return zen::HttpContentType::kBinary; + case zen::ZenContentType::kCbObject: + return zen::HttpContentType::kCbObject; + case zen::ZenContentType::kCbPackage: + return zen::HttpContentType::kCbPackage; + case zen::ZenContentType::kText: + return zen::HttpContentType::kText; + case zen::ZenContentType::kJSON: + return zen::HttpContentType::kJSON; + case zen::ZenContentType::kYAML: + return zen::HttpContentType::kYAML; + } +}; + HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path RootPath, zen::CasStore& InStore, zen::CidStore& InCidStore) -: m_CasStore(InStore) +: m_Log("cache", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks())) +, m_CasStore(InStore) , m_CacheStore(InStore, RootPath) , m_CidStore(InCidStore) { - spdlog::info("initializing structured cache at '{}'", RootPath); + m_Log.set_level(spdlog::level::debug); + + m_Log.info("initializing structured cache at '{}'", RootPath); #if 0 m_Cloud = new CloudCacheClient("https://jupiter.devtools.epicgames.com"sv, @@ -36,6 +61,14 @@ HttpStructuredCacheService::HttpStructuredCacheService(std::filesystem::path Roo "0oao91lrhqPiAlaGD0x7"sv /* client id */, "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv /* oauth secret */); #endif + +#if 0 + std::string_view UpstreamSpec = "http://arn-wd-15192.epicgames.net"sv; + + m_Log.info("Using upstream Zen cache at '{}'", UpstreamSpec); + + m_ZenClient = new ZenStructuredCacheClient(UpstreamSpec); +#endif } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -49,6 +82,11 @@ HttpStructuredCacheService::BaseUri() const return "/z$/"; } +void +HttpStructuredCacheService::Flush() +{ +} + void HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) { @@ -84,8 +122,73 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZenCacheValue Value; bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); + if (!Success && m_ZenClient) + { + ZenStructuredCacheSession Session(*m_ZenClient); + + zen::Stopwatch Timer; + + try + { + Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); + + m_Log.debug("Zen upstream GET ({}/{}) succeeded after {:5}!", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + + // TODO: this is incomplete and needs to propagate any referenced content + + Success = true; + } + catch (std::exception& e) + { + m_Log.warn("Zen upstream GET ({}/{}) FAILED after {:5}: '{}'", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + + throw; + } + } + + if (m_Cloud) + { + // Note that this is not fully functional, pending implementation work on + // the Jupiter end + + CloudCacheSession Session(m_Cloud); + + zen::Stopwatch Timer; + + try + { + Value.Value = Session.Get(Ref.BucketSegment, Ref.HashKey); + + m_Log.debug("Cloud upstream GET ({}/{}) succeeded after {:5}!", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + + // TODO: this is incomplete and needs to propagate any referenced content + } + catch (std::exception& e) + { + m_Log.warn("Cloud upstream GET ({}/{}) FAILED after {:5}: '{}'", + Ref.BucketSegment, + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + + throw; + } + } + if (!Success) { + m_Log.debug("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); + return Request.WriteResponse(zen::HttpResponse::NotFound); } @@ -94,7 +197,13 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Request.SetSuppressResponseBody(); } - return Request.WriteResponse(zen::HttpResponse::OK, zen::HttpContentType::kBinary, Value.Value); + m_Log.debug("HIT - '{}/{}' ({} bytes, {})", + Ref.BucketSegment, + Ref.HashKey, + Value.Value.Size(), + Value.Value.GetContentType()); + + return Request.WriteResponse(zen::HttpResponse::OK, MapToHttpContentType(Value.Value.GetContentType()), Value.Value); } break; @@ -139,6 +248,8 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (ValidationResult != CbValidateError::None) { + m_Log.warn("Payload for key '{}/{}' ({} bytes) failed validation", Ref.BucketSegment, Ref.HashKey, Body.Size()); + // TODO: add details in response return Request.WriteResponse(HttpResponse::BadRequest); } @@ -160,15 +271,42 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req } Idx.EndArray(); - } - // TODO: store references in index + // TODO: store references in index + } } m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); - // This is currently synchronous for simplicity and debuggability but should be - // made asynchronous + // This is currently synchronous for simplicity and debuggability but should + // absolutely be made asynchronous. By default these should be deferred + // because the client should not care if the data has propagated upstream or + // not + + if (m_ZenClient) + { + ZenStructuredCacheSession Session(*m_ZenClient); + + zen::Stopwatch Timer; + + try + { + Session.Put(Ref.BucketSegment, Ref.HashKey, Value); + + m_Log.debug("Zen upstream PUT ({}) succeeded after {:5}!", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + } + catch (std::exception& e) + { + m_Log.warn("Zen upstream PUT ({}) FAILED after {:5}: '{}'", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + + throw; + } + } if (m_Cloud) { @@ -179,16 +317,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req try { Session.Put(Ref.BucketSegment, Ref.HashKey, Value); - spdlog::debug("upstream PUT ({}) succeeded after {:5}!", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); + + m_Log.debug("upstream PUT ({}) succeeded after {:5}!", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs())); } catch (std::exception& e) { - spdlog::debug("upstream PUT ({}) failed after {:5}: '{}'", - Ref.HashKey, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + m_Log.warn("upstream PUT ({}) FAILED after {:5}: '{}'", + Ref.HashKey, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); throw; } @@ -227,16 +366,27 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re case kHead: case kGet: { - // TODO: need to map from uncompressed content address into the storage - // (compressed) content address - zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); if (!Payload) { + m_Log.debug("MISS - '{}/{}/{}' ({} bytes, {})", + Ref.BucketSegment, + Ref.HashKey, + Ref.PayloadId, + Payload.Size(), + Payload.GetContentType()); + return Request.WriteResponse(zen::HttpResponse::NotFound); } + m_Log.debug("HIT - '{}/{}/{}' ({} bytes, {})", + Ref.BucketSegment, + Ref.HashKey, + Ref.PayloadId, + Payload.Size(), + Payload.GetContentType()); + if (Verb == kHead) { Request.SetSuppressResponseBody(); -- cgit v1.2.3