diff options
| author | Per Larsson <[email protected]> | 2022-01-24 11:11:10 +0100 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2022-01-24 11:11:10 +0100 |
| commit | dc6becffb513280170958f94e18c1b2966ade4d1 (patch) | |
| tree | c7f9cccafcc21e241abdecde6f5219ab1009aff6 | |
| parent | Format fix. (diff) | |
| download | zen-dc6becffb513280170958f94e18c1b2966ade4d1.tar.xz zen-dc6becffb513280170958f94e18c1b2966ade4d1.zip | |
Refactored upstream cache to better handle different states in prep for dynamic auth tokens.
| -rw-r--r-- | zenserver/auth/authservice.cpp | 37 | ||||
| -rw-r--r-- | zenserver/auth/authservice.h | 22 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 51 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 32 | ||||
| -rw-r--r-- | zenserver/config.cpp | 7 | ||||
| -rw-r--r-- | zenserver/config.h | 1 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 1 | ||||
| -rw-r--r-- | zenserver/upstream/upstream.h | 8 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 690 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 82 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamservice.cpp | 208 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamservice.h | 25 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 1 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 78 |
14 files changed, 804 insertions, 439 deletions
diff --git a/zenserver/auth/authservice.cpp b/zenserver/auth/authservice.cpp new file mode 100644 index 000000000..c6def15b4 --- /dev/null +++ b/zenserver/auth/authservice.cpp @@ -0,0 +1,37 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <auth/authservice.h> +#include <zencore/string.h> + +namespace zen { + +using namespace std::literals; + +HttpAuthService::HttpAuthService() +{ + m_Router.RegisterRoute( + "token", + [this](HttpRouterRequest& RouterRequest) { + HttpServerRequest& ServerRequest = RouterRequest.ServerRequest(); + ServerRequest.WriteResponse(HttpResponseCode::OK); + }, + HttpVerb::kPost); +} + +HttpAuthService::~HttpAuthService() +{ +} + +const char* +HttpAuthService::BaseUri() const +{ + return "/auth/"; +} + +void +HttpAuthService::HandleRequest(zen::HttpServerRequest& Request) +{ + m_Router.HandleRequest(Request); +} + +} // namespace zen diff --git a/zenserver/auth/authservice.h b/zenserver/auth/authservice.h new file mode 100644 index 000000000..30b2b5864 --- /dev/null +++ b/zenserver/auth/authservice.h @@ -0,0 +1,22 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpserver.h> + +namespace zen { + +class HttpAuthService final : public zen::HttpService +{ +public: + HttpAuthService(); + virtual ~HttpAuthService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; + +private: + HttpRequestRouter m_Router; +}; + +} // namespace zen diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 2675590e0..5918d5178 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -59,17 +59,17 @@ struct AttachmentCount ////////////////////////////////////////////////////////////////////////// -HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - std::unique_ptr<UpstreamCache> UpstreamCache) +HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_StatsService(StatsService) , m_StatusService(StatusService) , m_CidStore(InCidStore) -, m_UpstreamCache(std::move(UpstreamCache)) +, m_UpstreamCache(UpstreamCache) { m_StatsService.RegisterHandler("z$", *this); m_StatusService.RegisterHandler("z$", *this); @@ -206,7 +206,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; const bool PartialOnError = (Policy & CachePolicy::PartialOnError) == CachePolicy::PartialOnError; - const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); + const bool QueryUpstream = (Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote; bool Success = false; ZenCacheValue LocalCacheValue; @@ -292,7 +292,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); - if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType); + if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache.GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Success) { Success = true; @@ -437,7 +437,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request } const HttpContentType ContentType = Request.RequestContentType(); - const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote)); + const bool StoreUpstream = (Policy & CachePolicy::StoreRemote) == CachePolicy::StoreRemote; Body.SetContentType(ContentType); @@ -448,8 +448,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (StoreUpstream) { - ZEN_ASSERT(m_UpstreamCache); - m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = {Ref.BucketSegment, Ref.HashKey}}); + m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kBinary, .Key = {Ref.BucketSegment, Ref.HashKey}}); } Request.WriteResponse(HttpResponseCode::Created); @@ -492,8 +491,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (StoreUpstream && !IsPartialRecord) { - ZEN_ASSERT(m_UpstreamCache); - m_UpstreamCache->EnqueueUpstream( + m_UpstreamCache.EnqueueUpstream( {.Type = ZenContentType::kCbObject, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); } @@ -574,8 +572,7 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (StoreUpstream && !IsPartialRecord) { - ZEN_ASSERT(m_UpstreamCache); - m_UpstreamCache->EnqueueUpstream( + m_UpstreamCache.EnqueueUpstream( {.Type = ZenContentType::kCbPackage, .Key = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(ValidAttachments)}); } @@ -611,11 +608,11 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques { IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); bool InUpstreamCache = false; - const bool QueryUpstream = !Payload && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + const bool QueryUpstream = !Payload && (Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote; if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) + if (auto UpstreamResult = m_UpstreamCache.GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { @@ -825,7 +822,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req const bool PartialOnError = Policy.HasRecordPolicy(CachePolicy::PartialOnError); const bool SkipAttachments = Policy.HasRecordPolicy(CachePolicy::SkipAttachments); - const bool QueryRemote = Policy.HasRecordPolicy(CachePolicy::QueryRemote) && m_UpstreamCache; + const bool QueryRemote = Policy.HasRecordPolicy(CachePolicy::QueryRemote); for (CbFieldView KeyView : Params["CacheKeys"sv]) { @@ -896,7 +893,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req ++KeyIndex; } - if (!UpstreamRequests.empty() && m_UpstreamCache) + if (!UpstreamRequests.empty()) { const auto OnCacheRecordGetComplete = [this, &CacheValues, &RpcResponse, PartialOnError, SkipAttachments](CacheRecordGetCompleteParams&& Params) { @@ -973,7 +970,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Req } }; - m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); + m_UpstreamCache.GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); } CbObjectWriter ResponseObject; @@ -1156,7 +1153,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re ++RequestIndex; } - if (!UpstreamRequests.empty() && m_UpstreamCache) + if (!UpstreamRequests.empty()) { const auto OnCachePayloadGetComplete = [this, &Chunks](CachePayloadGetCompleteParams&& Params) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload))) @@ -1183,7 +1180,7 @@ HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Re } }; - m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); + m_UpstreamCache.GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); } CbPackage RpcResponse; @@ -1242,13 +1239,9 @@ HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) : 0.0); Cbo.EndObject(); - - if (m_UpstreamCache) - { - Cbo.BeginObject("upstream"); - m_UpstreamCache->GetStatus(Cbo); - Cbo.EndObject(); - } + Cbo.BeginObject("upstream"); + m_UpstreamCache.GetStatus(Cbo); + Cbo.EndObject(); Cbo.BeginObject("cas"); Cbo.BeginObject("size"); diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 9ee7da99b..1bf3940e7 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -55,11 +55,11 @@ enum class CachePolicy : uint32_t; class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider { public: - HttpStructuredCacheService(ZenCacheStore& InCacheStore, - CidStore& InCidStore, - HttpStatsService& StatsService, - HttpStatusService& StatusService, - std::unique_ptr<UpstreamCache> UpstreamCache); + HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, + UpstreamCache& UpstreamCache); ~HttpStructuredCacheService(); virtual const char* BaseUri() const override; @@ -97,17 +97,17 @@ private: virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; - spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - ZenCacheStore& m_CacheStore; - HttpStatsService& m_StatsService; - HttpStatusService& m_StatusService; - CidStore& m_CidStore; - std::unique_ptr<UpstreamCache> m_UpstreamCache; - uint64_t m_LastScrubTime = 0; - metrics::OperationTiming m_HttpRequests; - metrics::OperationTiming m_UpstreamGetRequestTiming; - CacheStats m_CacheStats; + spdlog::logger& Log() { return m_Log; } + spdlog::logger& m_Log; + ZenCacheStore& m_CacheStore; + HttpStatsService& m_StatsService; + HttpStatusService& m_StatusService; + CidStore& m_CidStore; + UpstreamCache& m_UpstreamCache; + uint64_t m_LastScrubTime = 0; + metrics::OperationTiming m_HttpRequests; + metrics::OperationTiming m_UpstreamGetRequestTiming; + CacheStats m_CacheStats; }; } // namespace zen diff --git a/zenserver/config.cpp b/zenserver/config.cpp index da37efbb7..4afe012dd 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -271,13 +271,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_option("cache", "", - "upstream-stats", - "Collect performance metrics for upstream endpoints", - cxxopts::value<bool>(ServerOptions.UpstreamCacheConfig.StatsEnabled)->default_value("false"), - ""); - - options.add_option("cache", - "", "upstream-connect-timeout-ms", "Connect timeout in millisecond(s). Default 5000 ms.", cxxopts::value<int32_t>(ServerOptions.UpstreamCacheConfig.ConnectTimeoutMilliseconds)->default_value("5000"), diff --git a/zenserver/config.h b/zenserver/config.h index e994c248e..2c31e7bd9 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -54,7 +54,6 @@ struct ZenUpstreamCacheConfig int32_t ConnectTimeoutMilliseconds = 5000; int32_t TimeoutMilliseconds = 0; UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite; - bool StatsEnabled = false; }; struct ZenCacheEvictionPolicy diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 68c7361e0..bb1797393 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -132,6 +132,7 @@ private: struct CloudCacheClientOptions { + std::string_view Name; std::string_view ServiceUrl; std::string_view DdcNamespace; std::string_view BlobStoreNamespace; diff --git a/zenserver/upstream/upstream.h b/zenserver/upstream/upstream.h new file mode 100644 index 000000000..a57301206 --- /dev/null +++ b/zenserver/upstream/upstream.h @@ -0,0 +1,8 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <upstream/jupiter.h> +#include <upstream/upstreamcache.h> +#include <upstream/upstreamservice.h> +#include <upstream/zen.h> diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 65624ef17..3d6641a4f 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -25,6 +25,7 @@ #include <algorithm> #include <atomic> +#include <shared_mutex> #include <thread> #include <unordered_map> @@ -34,6 +35,52 @@ using namespace std::literals; namespace detail { + class UpstreamStatus + { + public: + UpstreamEndpointState EndpointState() const { return static_cast<UpstreamEndpointState>(m_State.load(std::memory_order_relaxed)); } + + UpstreamEndpointStatus EndpointStatus() const + { + const UpstreamEndpointState State = EndpointState(); + { + std::unique_lock _(m_Mutex); + return {.Reason = m_ErrorText, .State = State}; + } + } + + void Set(UpstreamEndpointState NewState) + { + m_State.store(static_cast<uint32_t>(NewState), std::memory_order_relaxed); + { + std::unique_lock _(m_Mutex); + m_ErrorText.clear(); + } + } + + void Set(UpstreamEndpointState NewState, std::string ErrorText) + { + m_State.store(static_cast<uint32_t>(NewState), std::memory_order_relaxed); + { + std::unique_lock _(m_Mutex); + m_ErrorText = std::move(ErrorText); + } + } + + void SetFromErrorCode(int32_t ErrorCode, std::string_view ErrorText) + { + if (ErrorCode != 0) + { + Set(ErrorCode == 401 ? UpstreamEndpointState::kUnauthorized : UpstreamEndpointState::kError, std::string(ErrorText)); + } + } + + private: + mutable std::mutex m_Mutex; + std::string m_ErrorText; + std::atomic_uint32_t m_State; + }; + class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: @@ -41,7 +88,8 @@ namespace detail { : m_Log(zen::logging::Get("upstream")) , m_UseLegacyDdc(Options.UseLegacyDdc) { - m_Info.Name = "Horde"sv; + ZEN_ASSERT(!Options.Name.empty()); + m_Info.Name = Options.Name; m_Info.Url = Options.ServiceUrl; m_Client = new CloudCacheClient(Options); } @@ -50,27 +98,45 @@ namespace detail { virtual const UpstreamEndpointInfo& GetEndpointInfo() const override { return m_Info; } - virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } - - virtual bool IsHealthy() const override { return m_HealthOk.load(); } - - virtual UpstreamEndpointHealth CheckHealth() override + virtual UpstreamEndpointStatus Initialize() override { try { + if (m_Status.EndpointState() == UpstreamEndpointState::kOk) + { + return {.State = UpstreamEndpointState::kOk}; + } + CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.Authenticate(); - m_HealthOk = Result.Success && Result.ErrorCode == 0; + if (Result.Success) + { + m_Status.Set(UpstreamEndpointState::kOk); + } + else if (Result.ErrorCode != 0) + { + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + } + else + { + m_Status.Set(UpstreamEndpointState::kUnauthorized); + } - return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; + return m_Status.EndpointStatus(); } catch (std::exception& Err) { - return {.Reason = Err.what(), .Ok = false}; + m_Status.Set(UpstreamEndpointState::kError, Err.what()); + + return {.Reason = Err.what(), .State = GetState()}; } } + virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); } + + virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Horde::GetSingleCacheRecord"); @@ -127,6 +193,8 @@ namespace detail { } } + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + if (Result.ErrorCode == 0) { return {.Value = Result.Response, @@ -136,13 +204,13 @@ namespace detail { } else { - m_HealthOk = false; return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } } catch (std::exception& Err) { - m_HealthOk = false; + m_Status.Set(UpstreamEndpointState::kError, Err.what()); + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -170,6 +238,8 @@ namespace detail { CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); + m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); + if (RefResult.ErrorCode == 0) { const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All); @@ -180,6 +250,8 @@ namespace detail { CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); AppendResult(BlobResult, Result); + m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); + if (BlobResult.ErrorCode == 0) { if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response))) @@ -187,17 +259,9 @@ namespace detail { Package.AddAttachment(CbAttachment(Chunk)); } } - else - { - m_HealthOk = false; - } }); } } - else - { - m_HealthOk = false; - } } OnComplete({.Key = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); @@ -215,6 +279,8 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId); + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + if (Result.ErrorCode == 0) { return {.Value = Result.Response, @@ -224,13 +290,13 @@ namespace detail { } else { - m_HealthOk = false; return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } } catch (std::exception& Err) { - m_HealthOk = false; + m_Status.Set(UpstreamEndpointState::kError, Err.what()); + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -255,7 +321,8 @@ namespace detail { Payload = BlobResult.Response; AppendResult(BlobResult, Result); - m_HealthOk = BlobResult.ErrorCode == 0; + + m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); } OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload}); @@ -292,7 +359,7 @@ namespace detail { } } - m_HealthOk = Result.ErrorCode == 0; + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); return {.Reason = std::move(Result.Reason), .Bytes = Result.Bytes, @@ -323,7 +390,7 @@ namespace detail { BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); } - m_HealthOk = BlobResult.ErrorCode == 0; + m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); if (!BlobResult.Success) { @@ -344,7 +411,7 @@ namespace detail { RefResult = Session.PutRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, ZenContentType::kCbObject); } - m_HealthOk = RefResult.ErrorCode == 0; + m_Status.SetFromErrorCode(RefResult.ErrorCode, RefResult.Reason); if (!RefResult.Success) { @@ -366,7 +433,8 @@ namespace detail { const IoHash RefHash = IoHash::HashBuffer(RecordValue); FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); - m_HealthOk = FinalizeResult.ErrorCode == 0; + + m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); if (!FinalizeResult.Success) { @@ -385,7 +453,8 @@ namespace detail { } FinalizeResult = Session.FinalizeRef(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RefHash); - m_HealthOk = FinalizeResult.ErrorCode == 0; + + m_Status.SetFromErrorCode(FinalizeResult.ErrorCode, FinalizeResult.Reason); if (!FinalizeResult.Success) { @@ -420,7 +489,8 @@ namespace detail { } catch (std::exception& Err) { - m_HealthOk = false; + m_Status.Set(UpstreamEndpointState::kError, Err.what()); + return {.Reason = std::string(Err.what()), .Success = false}; } } @@ -444,11 +514,10 @@ namespace detail { spdlog::logger& m_Log; UpstreamEndpointInfo m_Info; + UpstreamStatus m_Status; + UpstreamEndpointStats m_Stats; bool m_UseLegacyDdc; - std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; - UpstreamEndpointStats m_Stats; - std::atomic_bool m_HealthOk{false}; }; class ZenUpstreamEndpoint final : public UpstreamEndpoint @@ -466,7 +535,7 @@ namespace detail { public: ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) : m_Log(zen::logging::Get("upstream")) - , m_Info({.Name = std::string("Zen")}) + , m_Info({.Name = std::string(Options.Name)}) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) { @@ -480,62 +549,43 @@ namespace detail { virtual const UpstreamEndpointInfo& GetEndpointInfo() const override { return m_Info; } - virtual UpstreamEndpointHealth Initialize() override - { - const ZenEndpoint& Ep = GetEndpoint(); - if (Ep.Ok) - { - m_Info.Url = Ep.Url; - m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); - - m_HealthOk = true; - return {.Ok = true}; - } - - m_HealthOk = false; - return {.Reason = Ep.Reason}; - } - - virtual bool IsHealthy() const override { return m_HealthOk; } - - virtual UpstreamEndpointHealth CheckHealth() override + virtual UpstreamEndpointStatus Initialize() override { try { - if (m_Client.IsNull()) + if (m_Status.EndpointState() == UpstreamEndpointState::kOk) { - const ZenEndpoint& Ep = GetEndpoint(); - if (Ep.Ok) - { - m_Info.Url = Ep.Url; - m_Client = - new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); - - m_HealthOk = true; - return {.Ok = true}; - } - - return {.Reason = Ep.Reason}; + return {.State = UpstreamEndpointState::kOk}; } - ZenStructuredCacheSession Session(*m_Client); - ZenCacheResult Result; + const ZenEndpoint& Ep = GetEndpoint(); - for (int32_t Attempt = 0, MaxAttempts = 2; Attempt < MaxAttempts && !Result.Success; ++Attempt) + m_Info.Url = Ep.Url; + + if (Ep.Ok) { - Result = Session.CheckHealth(); + m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); + m_Status.Set(UpstreamEndpointState::kOk); + } + else + { + m_Status.Set(UpstreamEndpointState::kError, Ep.Reason); } - m_HealthOk = Result.ErrorCode == 0; - - return {.Reason = std::move(Result.Reason), .Ok = m_HealthOk}; + return m_Status.EndpointStatus(); } catch (std::exception& Err) { - return {.Reason = Err.what(), .Ok = false}; + m_Status.Set(UpstreamEndpointState::kError, Err.what()); + + return {.Reason = Err.what(), .State = GetState()}; } } + virtual UpstreamEndpointState GetState() override { return m_Status.EndpointState(); } + + virtual UpstreamEndpointStatus GetStatus() override { return m_Status.EndpointStatus(); } + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::Zen::GetSingleCacheRecord"); @@ -545,6 +595,8 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + if (Result.ErrorCode == 0) { return {.Value = Result.Response, @@ -554,13 +606,13 @@ namespace detail { } else { - m_HealthOk = false; return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } } catch (std::exception& Err) { - m_HealthOk = false; + m_Status.Set(UpstreamEndpointState::kError, Err.what()); + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -608,6 +660,8 @@ namespace detail { Result = Session.InvokeRpc(BatchRequest.Save()); } + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + if (Result.Success) { if (BatchResponse.TryLoad(Result.Response)) @@ -621,10 +675,6 @@ namespace detail { return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } - else if (Result.ErrorCode) - { - m_HealthOk = false; - } for (size_t Index : KeyIndex) { @@ -643,6 +693,8 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + if (Result.ErrorCode == 0) { return {.Value = Result.Response, @@ -652,13 +704,13 @@ namespace detail { } else { - m_HealthOk = false; return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; } } catch (std::exception& Err) { - m_HealthOk = false; + m_Status.Set(UpstreamEndpointState::kError, Err.what()); + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -713,6 +765,8 @@ namespace detail { Result = Session.InvokeRpc(BatchRequest.Save()); } + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + if (Result.Success) { if (BatchResponse.TryLoad(Result.Response)) @@ -736,10 +790,6 @@ namespace detail { return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; } } - else if (Result.ErrorCode) - { - m_HealthOk = false; - } for (size_t Index : RequestIndex) { @@ -789,10 +839,10 @@ namespace detail { for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, PackagePayload, CacheRecord.Type); - - m_HealthOk = Result.ErrorCode == 0; } + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + TotalBytes = Result.Bytes; TotalElapsedSeconds = Result.ElapsedSeconds; } @@ -807,10 +857,10 @@ namespace detail { CacheRecord.Key.Hash, CacheRecord.PayloadIds[Idx], Payloads[Idx]); - - m_HealthOk = Result.ErrorCode == 0; } + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; @@ -827,10 +877,10 @@ namespace detail { for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { Result = Session.PutCacheRecord(CacheRecord.Key.Bucket, CacheRecord.Key.Hash, RecordValue, CacheRecord.Type); - - m_HealthOk = Result.ErrorCode == 0; } + m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); + TotalBytes += Result.Bytes; TotalElapsedSeconds += Result.ElapsedSeconds; } @@ -842,7 +892,8 @@ namespace detail { } catch (std::exception& Err) { - m_HealthOk = false; + m_Status.Set(UpstreamEndpointState::kError, Err.what()); + return {.Reason = std::string(Err.what()), .Success = false}; } } @@ -885,109 +936,18 @@ namespace detail { spdlog::logger& m_Log; UpstreamEndpointInfo m_Info; + UpstreamStatus m_Status; + UpstreamEndpointStats m_Stats; std::vector<ZenEndpoint> m_Endpoints; std::chrono::milliseconds m_ConnectTimeout; std::chrono::milliseconds m_Timeout; RefPtr<ZenStructuredCacheClient> m_Client; - UpstreamEndpointStats m_Stats; - std::atomic_bool m_HealthOk{false}; }; } // namespace detail ////////////////////////////////////////////////////////////////////////// -struct UpstreamStats -{ - static constexpr uint64_t MaxSampleCount = 1000ull; - - UpstreamStats(bool Enabled) : m_Enabled(Enabled) {} - - void Add(spdlog::logger& Logger, - UpstreamEndpoint& Endpoint, - const GetUpstreamCacheResult& Result, - const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) - { - UpstreamEndpointStats& Stats = Endpoint.Stats(); - - if (Result.Error) - { - Stats.ErrorCount++; - } - else if (Result.Success) - { - Stats.HitCount++; - Stats.DownBytes.fetch_add(Result.Bytes); - Stats.TimeDownMs.fetch_add(uint64_t(Result.ElapsedSeconds * 1000.0)); - } - else - { - Stats.MissCount++; - } - - if (m_Enabled && m_SampleCount++ % MaxSampleCount) - { - Dump(Logger, Endpoints); - } - } - - void Add(spdlog::logger& Logger, - UpstreamEndpoint& Endpoint, - const PutUpstreamCacheResult& Result, - const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) - { - UpstreamEndpointStats& Stats = Endpoint.Stats(); - if (Result.Success) - { - Stats.UpCount++; - Stats.UpBytes.fetch_add(Result.Bytes); - Stats.TimeUpMs.fetch_add(uint64_t(Result.ElapsedSeconds * 1000.0)); - } - else - { - Stats.ErrorCount++; - } - - if (m_Enabled && m_SampleCount++ % MaxSampleCount) - { - Dump(Logger, Endpoints); - } - } - - void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) - { - for (auto& Ep : Endpoints) - { - // These stats will not be totally correct as the numbers are not captured atomically - UpstreamEndpointStats& Stats = Ep->Stats(); - const uint64_t HitCount = Stats.HitCount; - const uint64_t MissCount = Stats.MissCount; - const double DownMBytes = double(Stats.DownBytes) / 1024.0 / 1024.0; - const double SecondsDown = double(Stats.TimeDownMs) / 1000.0; - const double UpMBytes = double(Stats.UpBytes) / 1024.0 / 1024.0; - const double SecondsUp = double(Stats.TimeUpMs) / 1000.0; - - const double UpSpeed = UpMBytes > 0 ? UpMBytes / SecondsUp : 0.0; - const double DownSpeed = DownMBytes > 0 ? DownMBytes / SecondsDown : 0.0; - const uint64_t TotalCount = HitCount + MissCount; - const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0; - - Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->GetEndpointInfo().Name, - HitRate, - DownMBytes, - DownSpeed, - UpMBytes, - UpSpeed); - } - } - - bool m_Enabled; - std::atomic_uint64_t m_SampleCount = {}; -}; - -////////////////////////////////////////////////////////////////////////// - class DefaultUpstreamCache final : public UpstreamCache { public: @@ -996,71 +956,87 @@ public: , m_Options(Options) , m_CacheStore(CacheStore) , m_CidStore(CidStore) - , m_Stats(Options.StatsEnabled) { } virtual ~DefaultUpstreamCache() { Shutdown(); } - virtual bool Initialize() override + virtual void Initialize() override { - for (auto& Endpoint : m_Endpoints) + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { - const UpstreamEndpointHealth Health = Endpoint->Initialize(); - const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); - - if (Health.Ok) - { - ZEN_INFO("'{}' endpoint '{}' OK", Info.Name, Info.Url); - } - else - { - ZEN_WARN("'{}' endpoint '{}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason); - } + m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); } - m_RunState.IsRunning = !m_Endpoints.empty(); + m_EndpointMonitorThread = std::thread(&DefaultUpstreamCache::MonitorEndpoints, this); + m_RunState.IsRunning = true; + } + + virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override + { + const UpstreamEndpointStatus Status = Endpoint->Initialize(); + const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); - if (m_RunState.IsRunning) + ZEN_INFO("register endpoint '{} - {}' {}", Info.Name, Info.Url, ToString(Status.State)); + + // Register endpoint even if it fails, the health monitor thread will probe failing endpoint(s) + std::unique_lock<std::shared_mutex> _(m_EndpointsMutex); + m_Endpoints.emplace_back(std::move(Endpoint)); + } + + virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) override + { + std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); + + for (auto& Ep : m_Endpoints) { - for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) + if (!Fn(*Ep)) { - m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); + break; } - - m_EndpointMonitorThread = std::thread(&DefaultUpstreamCache::MonitorEndpoints, this); } - - return m_RunState.IsRunning; } - virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } - virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { ZEN_TRACE_CPU("Upstream::GetCacheRecord"); + std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); + if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { - if (Endpoint->IsHealthy()) + if (Endpoint->GetState() != UpstreamEndpointState::kOk) { - const GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); - m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + continue; + } - if (Result.Success) - { - return Result; - } + UpstreamEndpointStats& Stats = Endpoint->Stats(); + GetUpstreamCacheResult Result; + { + metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); + Result = Endpoint->GetCacheRecord(CacheKey, Type); + } - if (Result.Error) - { - ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); - } + Stats.CacheGetCount.Increment(1); + Stats.CacheGetTotalBytes.Increment(Result.Bytes); + + if (Result.Success) + { + Stats.CacheHitCount.Increment(1); + + return Result; + } + + if (Result.Error) + { + Stats.CacheErrorCount.Increment(1); + + ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); } } } @@ -1075,42 +1051,62 @@ public: { ZEN_TRACE_CPU("Upstream::GetCacheRecords"); - std::vector<size_t> MissingKeys(KeyIndex.begin(), KeyIndex.end()); + std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); + + std::vector<size_t> RemainingKeys(KeyIndex.begin(), KeyIndex.end()); if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { - if (Endpoint->IsHealthy() && !MissingKeys.empty()) + if (RemainingKeys.empty()) + { + break; + } + + if (Endpoint->GetState() != UpstreamEndpointState::kOk) + { + continue; + } + + UpstreamEndpointStats& Stats = Endpoint->Stats(); + std::vector<size_t> Missing; + GetUpstreamCacheResult Result; { - std::vector<size_t> Missing; + metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); - auto Result = Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { + Result = Endpoint->GetCacheRecords(CacheKeys, RemainingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { if (Params.Record) { OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); + + Stats.CacheHitCount.Increment(1); } else { Missing.push_back(Params.KeyIndex); } }); + } - if (Result.Error) - { - ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); - } + Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); + Stats.CacheGetTotalBytes.Increment(Result.Bytes); - m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); - MissingKeys = std::move(Missing); + if (Result.Error) + { + Stats.CacheErrorCount.Increment(1); + + ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); } + + RemainingKeys = std::move(Missing); } } - for (size_t Index : MissingKeys) + for (size_t Index : RemainingKeys) { OnComplete({.Key = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); } @@ -1122,43 +1118,62 @@ public: { ZEN_TRACE_CPU("Upstream::GetCachePayloads"); - std::vector<size_t> MissingPayloads(RequestIndex.begin(), RequestIndex.end()); + std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); + + std::vector<size_t> RemainingKeys(RequestIndex.begin(), RequestIndex.end()); if (m_Options.ReadUpstream) { for (auto& Endpoint : m_Endpoints) { - if (Endpoint->IsHealthy() && !MissingPayloads.empty()) + if (RemainingKeys.empty()) { - std::vector<size_t> Missing; + break; + } - auto Result = - Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) { - if (Params.Payload) - { - OnComplete(std::forward<CachePayloadGetCompleteParams>(Params)); - } - else - { - Missing.push_back(Params.RequestIndex); - } - }); + if (Endpoint->GetState() != UpstreamEndpointState::kOk) + { + continue; + } - if (Result.Error) - { - ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); - } + UpstreamEndpointStats& Stats = Endpoint->Stats(); + std::vector<size_t> Missing; + GetUpstreamCacheResult Result; + { + metrics::OperationTiming::Scope Scope(Endpoint->Stats().CacheGetRequestTiming); + + Result = Endpoint->GetCachePayloads(CacheChunkRequests, RemainingKeys, [&](CachePayloadGetCompleteParams&& Params) { + if (Params.Payload) + { + OnComplete(std::forward<CachePayloadGetCompleteParams>(Params)); + + Stats.CacheHitCount.Increment(1); + } + else + { + Missing.push_back(Params.RequestIndex); + } + }); + } - m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); - MissingPayloads = std::move(Missing); + Stats.CacheGetCount.Increment(int64_t(RemainingKeys.size())); + Stats.CacheGetTotalBytes.Increment(Result.Bytes); + + if (Result.Error) + { + Stats.CacheErrorCount.Increment(1); + + ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); } + + RemainingKeys = std::move(Missing); } } - for (size_t Index : MissingPayloads) + for (size_t Index : RemainingKeys) { OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); } @@ -1172,23 +1187,37 @@ public: { for (auto& Endpoint : m_Endpoints) { - if (Endpoint->IsHealthy()) + if (Endpoint->GetState() != UpstreamEndpointState::kOk) { - const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(CacheKey, PayloadId); - m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + continue; + } - if (Result.Success) - { - return Result; - } + UpstreamEndpointStats& Stats = Endpoint->Stats(); + GetUpstreamCacheResult Result; - if (Result.Error) - { - ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'", - Endpoint->GetEndpointInfo().Url, - Result.Error.Reason, - Result.Error.ErrorCode); - } + { + metrics::OperationTiming::Scope Scope(Stats.CacheGetRequestTiming); + Result = Endpoint->GetCachePayload(CacheKey, PayloadId); + } + + Stats.CacheGetCount.Increment(1); + Stats.CacheGetTotalBytes.Increment(Result.Bytes); + + if (Result.Success) + { + Stats.CacheHitCount.Increment(1); + + return Result; + } + + if (Result.Error) + { + Stats.CacheErrorCount.Increment(1); + + ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); } } } @@ -1196,7 +1225,7 @@ public: return {}; } - virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { if (m_RunState.IsRunning && m_Options.WriteUpstream) { @@ -1208,11 +1237,7 @@ public: { ProcessCacheRecord(std::move(CacheRecord)); } - - return {.Success = true}; } - - return {}; } virtual void GetStatus(CbObjectWriter& Status) override @@ -1225,22 +1250,35 @@ public: Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) { - const UpstreamEndpointInfo& Info = Ep->GetEndpointInfo(); - Status.BeginObject(); - Status << "name" << Info.Name; - Status << "url" << Info.Url; - Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); + const UpstreamEndpointInfo& EpInfo = Ep->GetEndpointInfo(); + const UpstreamEndpointStatus EpStatus = Ep->GetStatus(); + UpstreamEndpointStats& EpStats = Ep->Stats(); - UpstreamEndpointStats& Stats = Ep->Stats(); - const uint64_t HitCount = Stats.HitCount; - const uint64_t MissCount = Stats.MissCount; - const uint64_t TotalCount = HitCount + MissCount; - const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0; + Status.BeginObject(); + Status << "name" << EpInfo.Name; + Status << "url" << EpInfo.Url; + Status << "state" << ToString(EpStatus.State); + Status << "reason" << EpStatus.Reason; - Status << "hit_ratio" << HitRate; - Status << "downloaded_mb" << (double(Stats.DownBytes) / 1024.0 / 1024.0); - Status << "uploaded_mb" << Stats.UpBytes; - Status << "error_count" << Stats.ErrorCount; + Status.BeginObject("cache"sv); + { + const int64_t GetCount = EpStats.CacheGetCount.Value(); + const int64_t HitCount = EpStats.CacheHitCount.Value(); + const int64_t ErrorCount = EpStats.CacheErrorCount.Value(); + const double HitRatio = GetCount > 0 ? double(HitCount) / double(GetCount) : 0.0; + const double ErrorRatio = GetCount > 0 ? double(ErrorCount) / double(GetCount) : 0.0; + + metrics::EmitSnapshot("get_requests"sv, EpStats.CacheGetRequestTiming, Status); + Status << "get_bytes" << EpStats.CacheGetTotalBytes.Value(); + Status << "get_count" << GetCount; + Status << "hit_count" << HitCount; + Status << "hit_ratio" << HitRatio; + Status << "error_count" << ErrorCount; + Status << "error_ratio" << ErrorRatio; + metrics::EmitSnapshot("put_requests"sv, EpStats.CachePutRequestTiming, Status); + Status << "put_bytes" << EpStats.CachePutTotalBytes.Value(); + } + Status.EndObject(); Status.EndObject(); } @@ -1277,21 +1315,31 @@ private: } } + std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); + for (auto& Endpoint : m_Endpoints) { - if (Endpoint->IsHealthy()) + if (Endpoint->GetState() != UpstreamEndpointState::kOk) { - const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); - m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + continue; + } - if (!Result.Success) - { - ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", - CacheRecord.Key.Bucket, - CacheRecord.Key.Hash, - Endpoint->GetEndpointInfo().Url, - Result.Reason); - } + UpstreamEndpointStats& Stats = Endpoint->Stats(); + PutUpstreamCacheResult Result; + { + metrics::OperationTiming::Scope Scope(Stats.CachePutRequestTiming); + Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + } + + Stats.CachePutTotalBytes.Increment(Result.Bytes); + + if (!Result.Success) + { + ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", + CacheRecord.Key.Bucket, + CacheRecord.Key.Hash, + Endpoint->GetEndpointInfo().Url, + Result.Reason); } } } @@ -1334,21 +1382,35 @@ private: try { - for (auto& Endpoint : m_Endpoints) + std::vector<UpstreamEndpoint*> Endpoints; + { - if (!Endpoint->IsHealthy()) + std::shared_lock<std::shared_mutex> _(m_EndpointsMutex); + + for (auto& Endpoint : m_Endpoints) { - const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); - if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + if (Endpoint->GetState() == UpstreamEndpointState::kError || + Endpoint->GetState() == UpstreamEndpointState::kUnauthorized) { - ZEN_INFO("health check endpoint '{} - {}' OK", Info.Name, Info.Url, Health.Reason); - } - else - { - ZEN_WARN("health check endpoint '{} - {}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason); + Endpoints.push_back(Endpoint.get()); } } } + + for (auto& Endpoint : Endpoints) + { + const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); + const UpstreamEndpointStatus Status = Endpoint->Initialize(); + + if (Status.State == UpstreamEndpointState::kOk) + { + ZEN_INFO("health check endpoint '{} - {}' OK", Info.Name, Info.Url); + } + else + { + ZEN_WARN("health check endpoint '{} - {}' FAILED, reason '{}'", Info.Name, Info.Url, Status.Reason); + } + } } catch (std::exception& Err) { @@ -1403,7 +1465,7 @@ private: ZenCacheStore& m_CacheStore; CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; - UpstreamStats m_Stats; + std::shared_mutex m_EndpointsMutex; std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; std::vector<std::thread> m_UpstreamThreads; std::thread m_EndpointMonitorThread; diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 20c89a574..16d8c7929 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -4,6 +4,7 @@ #include <zencore/iobuffer.h> #include <zencore/iohash.h> +#include <zencore/stats.h> #include <zencore/zencore.h> #include <zenutil/cache/cache.h> @@ -35,7 +36,6 @@ struct UpstreamCacheOptions uint32_t ThreadCount = 4; bool ReadUpstream = true; bool WriteUpstream = true; - bool StatsEnabled = false; }; struct UpstreamError @@ -63,24 +63,6 @@ struct PutUpstreamCacheResult bool Success = false; }; -struct UpstreamEndpointHealth -{ - std::string Reason; - bool Ok = false; -}; - -struct UpstreamEndpointStats -{ - std::atomic_uint64_t HitCount{}; - std::atomic_uint64_t MissCount{}; - std::atomic_uint64_t UpCount{}; - std::atomic_uint64_t ErrorCount{}; - std::atomic_uint64_t UpBytes{}; - std::atomic_uint64_t DownBytes{}; - std::atomic_uint64_t TimeUpMs{}; - std::atomic_uint64_t TimeDownMs{}; -}; - struct CacheRecordGetCompleteParams { const CacheKey& Key; @@ -100,6 +82,51 @@ struct CachePayloadGetCompleteParams using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>; +struct UpstreamEndpointStats +{ + metrics::OperationTiming CacheGetRequestTiming; + metrics::OperationTiming CachePutRequestTiming; + metrics::Counter CacheGetTotalBytes; + metrics::Counter CachePutTotalBytes; + metrics::Counter CacheGetCount; + metrics::Counter CacheHitCount; + metrics::Counter CacheErrorCount; +}; + +enum class UpstreamEndpointState : uint32_t +{ + kDisabled, + kUnauthorized, + kError, + kOk +}; + +inline std::string_view +ToString(UpstreamEndpointState State) +{ + using namespace std::literals; + + switch (State) + { + case UpstreamEndpointState::kDisabled: + return "Disabled"sv; + case UpstreamEndpointState::kUnauthorized: + return "Unauthorized"sv; + case UpstreamEndpointState::kError: + return "Error"sv; + case UpstreamEndpointState::kOk: + return "Ok"sv; + default: + return "Unknown"sv; + } +} + +struct UpstreamEndpointStatus +{ + std::string Reason; + UpstreamEndpointState State; +}; + struct UpstreamEndpointInfo { std::string Name; @@ -116,11 +143,11 @@ public: virtual const UpstreamEndpointInfo& GetEndpointInfo() const = 0; - virtual UpstreamEndpointHealth Initialize() = 0; + virtual UpstreamEndpointStatus Initialize() = 0; - virtual bool IsHealthy() const = 0; + virtual UpstreamEndpointState GetState() = 0; - virtual UpstreamEndpointHealth CheckHealth() = 0; + virtual UpstreamEndpointStatus GetStatus() = 0; virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; @@ -150,10 +177,12 @@ class UpstreamCache public: virtual ~UpstreamCache() = default; - virtual bool Initialize() = 0; + virtual void Initialize() = 0; virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; + virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0; + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, @@ -167,12 +196,7 @@ public: std::span<size_t> RequestIndex, OnCachePayloadGetComplete&& OnComplete) = 0; - struct EnqueueResult - { - bool Success = false; - }; - - virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; virtual void GetStatus(CbObjectWriter& CbO) = 0; }; diff --git a/zenserver/upstream/upstreamservice.cpp b/zenserver/upstream/upstreamservice.cpp new file mode 100644 index 000000000..1cfd1df85 --- /dev/null +++ b/zenserver/upstream/upstreamservice.cpp @@ -0,0 +1,208 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <upstream/jupiter.h> +#include <upstream/upstreamcache.h> +#include <upstream/upstreamservice.h> +#include <upstream/zen.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/string.h> + +#include <json11.hpp> + +namespace zen { + +using namespace std::literals; + +namespace { + json11::Json TryGetJson(IoBuffer Body, HttpContentType ContentType, std::string& OutError) + { + if (!Body) + { + OutError = "No data"sv; + return json11::Json(); + } + + if ((ContentType == HttpContentType::kJSON || ContentType == HttpContentType::kCbObject) == false) + { + OutError = "Invalid content type"sv; + return json11::Json(); + } + + if (ContentType == ZenContentType::kJSON) + { + std::string JsonText(reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()); + return json11::Json::parse(JsonText, OutError); + } + + if (CbObject Obj = LoadCompactBinaryObject(Body)) + { + ExtendableStringBuilder<512> Sb; + return json11::Json::parse(Obj.ToJson(Sb).ToString(), OutError); + } + + OutError = "Invalid compact binary"sv; + return json11::Json(); + } + + void WriteErrorResponse(HttpServerRequest& Request, std::string_view Property, std::string_view Reason) + { + CbObjectWriter Response; + Response << "Result"sv << false; + Response.BeginObject("Error"sv); + Response << "Property"sv << Property << "Reason"sv << Reason; + Response.EndObject(); + + Request.WriteResponse(HttpResponseCode::BadRequest, Response.Save()); + } + + void WriteSuccessResponse(HttpServerRequest& Request) + { + CbObjectWriter Response; + Response << "Result"sv << true; + + Request.WriteResponse(HttpResponseCode::OK, Response.Save()); + } +} // namespace + +HttpUpstreamService::HttpUpstreamService(UpstreamCache& Upstream) : m_Upstream(Upstream) +{ + m_Router.RegisterRoute( + "endpoints", + [this](HttpRouterRequest& Req) { + CbObjectWriter Writer; + Writer.BeginArray("Endpoints"sv); + m_Upstream.IterateEndpoints([&Writer](UpstreamEndpoint& Ep) { + UpstreamEndpointInfo Info = Ep.GetEndpointInfo(); + UpstreamEndpointStatus Status = Ep.GetStatus(); + + Writer.BeginObject(); + Writer << "Name"sv << Info.Name; + Writer << "Url"sv << Info.Url; + Writer << "State"sv << ToString(Status.State); + Writer << "Reason"sv << Status.Reason; + Writer.EndObject(); + + return true; + }); + Writer.EndArray(); + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Writer.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "endpoints", + [this](HttpRouterRequest& RouterRequest) { + HttpServerRequest& ServerRequest = RouterRequest.ServerRequest(); + std::string JsonError; + + json11::Json Json = TryGetJson(ServerRequest.ReadPayload(), ServerRequest.RequestContentType(), JsonError); + + if (!JsonError.empty()) + { + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, JsonError); + } + + const auto Type = Json["Type"].string_value(); + const auto Name = Json["Name"].string_value(); + const auto Url = Json["Url"].string_value(); + const auto Namespace = Json["Namespace"].string_value(); + const auto OAuthProvider = Json["OAuthProvider"].string_value(); + const auto OAuthClientId = Json["OAuthClientId"].string_value(); + const auto OAuthSecret = Json["OAuthSecret"].string_value(); + const auto OAuthToken = Json["OAuthToken"].string_value(); + + if ((Type == "Horde"sv || Type == "Zen"sv) == false) + { + return WriteErrorResponse(ServerRequest, "Type"sv, "Invalid endpoint type, must be Zen or Horde"sv); + } + + if (Name.empty()) + { + return WriteErrorResponse(ServerRequest, "Name"sv, "Invalid endpoint name"sv); + } + + if (Url.empty()) + { + return WriteErrorResponse(ServerRequest, "Url"sv, "Invalid endpoint URL"sv); + } + + bool IsNameUnique = true; + m_Upstream.IterateEndpoints([&Name, &IsNameUnique](UpstreamEndpoint& Ep) { + IsNameUnique = IsNameUnique && Ep.GetEndpointInfo().Name != Name; + return IsNameUnique; + }); + + if (IsNameUnique == false) + { + return WriteErrorResponse(ServerRequest, "Url"sv, "Endpoint name is not unique"sv); + } + + std::unique_ptr<zen::UpstreamEndpoint> Endpoint; + + if (Type == "Zen"sv) + { + std::vector<std::string> Urls; + Urls.push_back(Json["Url"].string_value()); + Endpoint = zen::MakeZenUpstreamEndpoint({.Name = Name, .Urls = Urls}); + } + else + { + if (Namespace.empty()) + { + return WriteErrorResponse(ServerRequest, "Namespace"sv, "Invalid Horde namespace"sv); + } + + if (OAuthProvider.empty()) + { + return WriteErrorResponse(ServerRequest, "OAuthProvider"sv, "Invalid Horde OAuth provider URL"sv); + } + + if (OAuthToken.empty()) + { + if (OAuthClientId.empty()) + { + return WriteErrorResponse(ServerRequest, "OAuthClientId"sv, "Invalid Horde OAuth client ID"sv); + } + + if (OAuthSecret.empty()) + { + return WriteErrorResponse(ServerRequest, "OAuthSecret"sv, "Invalid Horde OAuth secret"sv); + } + } + + const zen::CloudCacheClientOptions Options = {.Name = Name, + .ServiceUrl = Url, + .DdcNamespace = Namespace, + .BlobStoreNamespace = Namespace, + .OAuthProvider = OAuthProvider, + .OAuthClientId = OAuthClientId, + .OAuthSecret = OAuthSecret, + .AccessToken = OAuthToken}; + + Endpoint = zen::MakeJupiterUpstreamEndpoint(Options); + } + + m_Upstream.RegisterEndpoint(std::move(Endpoint)); + + WriteSuccessResponse(ServerRequest); + }, + HttpVerb::kPost); +} + +HttpUpstreamService::~HttpUpstreamService() +{ +} + +const char* +HttpUpstreamService::BaseUri() const +{ + return "/upstream/"; +} + +void +HttpUpstreamService::HandleRequest(zen::HttpServerRequest& Request) +{ + m_Router.HandleRequest(Request); +} + +} // namespace zen diff --git a/zenserver/upstream/upstreamservice.h b/zenserver/upstream/upstreamservice.h new file mode 100644 index 000000000..0a42198c2 --- /dev/null +++ b/zenserver/upstream/upstreamservice.h @@ -0,0 +1,25 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpserver.h> + +namespace zen { + +class UpstreamCache; + +class HttpUpstreamService final : public zen::HttpService +{ +public: + HttpUpstreamService(UpstreamCache& Upstream); + virtual ~HttpUpstreamService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; + +private: + HttpRequestRouter m_Router; + UpstreamCache& m_Upstream; +}; + +} // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index e97ce755d..c2be2165a 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -102,6 +102,7 @@ struct ZenCacheResult struct ZenStructuredCacheClientOptions { + std::string_view Name; std::string_view Url; std::span<std::string const> Urls; std::chrono::milliseconds ConnectTimeout{}; diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 60691222b..b8648c8ee 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -99,6 +99,7 @@ ZEN_THIRD_PARTY_INCLUDES_END // #include "admin/admin.h" +#include "auth/authservice.h" #include "cache/structuredcache.h" #include "cache/structuredcachestore.h" #include "compute/apply.h" @@ -110,9 +111,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include "projectstore.h" #include "testing/httptest.h" #include "testing/launch.h" -#include "upstream/jupiter.h" -#include "upstream/upstreamcache.h" -#include "upstream/zen.h" +#include "upstream/upstream.h" #include "zenstore/gc.h" #include "zenstore/scrub.h" @@ -203,6 +202,10 @@ public: m_Http = zen::CreateHttpServer(ServerOptions.HttpServerClass); m_Http->Initialize(ServerOptions.BasePort); + + m_AuthService = std::make_unique<zen::HttpAuthService>(); + m_Http->RegisterService(*m_AuthService); + m_Http->RegisterService(m_HealthService); m_Http->RegisterService(m_StatsService); m_Http->RegisterService(m_StatusService); @@ -278,11 +281,6 @@ public: m_Http->RegisterService(m_CasService); - if (m_StructuredCacheService) - { - m_Http->RegisterService(*m_StructuredCacheService); - } - #if ZEN_WITH_COMPUTE_SERVICES if (m_HttpLaunchService) { @@ -529,6 +527,7 @@ private: } zen::Ref<zen::HttpServer> m_Http; + std::unique_ptr<zen::HttpAuthService> m_AuthService; zen::HttpStatusService m_StatusService; zen::HttpStatsService m_StatsService; zen::CasGc m_CasGc; @@ -543,6 +542,8 @@ private: zen::RefPtr<zen::ProjectStore> m_ProjectStore; zen::Ref<zen::LocalProjectService> m_LocalProjectService; std::unique_ptr<zen::HttpProjectService> m_HttpProjectService; + std::unique_ptr<zen::UpstreamCache> m_UpstreamCache; + std::unique_ptr<zen::HttpUpstreamService> m_UpstreamService; std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService; zen::HttpAdminService m_AdminService{m_GcScheduler}; zen::HttpHealthService m_HealthService; @@ -682,24 +683,23 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) ZEN_INFO("instantiating structured cache service"); m_CacheStore = std::make_unique<ZenCacheStore>(m_CasGc, m_DataRoot / "cache"); - std::unique_ptr<zen::UpstreamCache> UpstreamCache; - if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) - { - const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; - - zen::UpstreamCacheOptions UpstreamOptions; - UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; - UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; + const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; - if (UpstreamConfig.UpstreamThreadCount < 32) - { - UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); - } + zen::UpstreamCacheOptions UpstreamOptions; + UpstreamOptions.ReadUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; + UpstreamOptions.WriteUpstream = (uint8_t(ServerOptions.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; - UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled; + if (UpstreamConfig.UpstreamThreadCount < 32) + { + UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); + } - UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); + m_UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); + m_UpstreamService = std::make_unique<HttpUpstreamService>(*m_UpstreamCache); + m_UpstreamCache->Initialize(); + if (ServerOptions.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) + { // Zen upstream { std::vector<std::string> ZenUrls = UpstreamConfig.ZenConfig.Urls; @@ -723,10 +723,11 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) if (!ZenUrls.empty()) { std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = - zen::MakeZenUpstreamEndpoint({.Urls = ZenUrls, + zen::MakeZenUpstreamEndpoint({.Name = "Zen"sv, + .Urls = ZenUrls, .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}); - UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); + m_UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); } } @@ -736,7 +737,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) if (UpstreamConfig.JupiterConfig.UseProductionSettings) { Options = - zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, + zen::CloudCacheClientOptions{.Name = "Horde"sv, + .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, .DdcNamespace = "ue.ddc"sv, .BlobStoreNamespace = "ue.ddc"sv, .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, @@ -749,7 +751,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) { Options = - zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, + zen::CloudCacheClientOptions{.Name = "Horde"sv, + .ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, .DdcNamespace = "ue.ddc"sv, .BlobStoreNamespace = "ue.ddc"sv, .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, @@ -771,27 +774,16 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) if (!Options.ServiceUrl.empty()) { std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options); - UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); + m_UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); } } - - if (UpstreamCache->Initialize()) - { - ZEN_INFO("upstream cache active ({})", - UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE" - : UpstreamOptions.ReadUpstream ? "READONLY" - : UpstreamOptions.WriteUpstream ? "WRITEONLY" - : "DISABLED"); - } - else - { - UpstreamCache.reset(); - ZEN_INFO("NOT using upstream cache"); - } } - m_StructuredCacheService.reset( - new zen::HttpStructuredCacheService(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, std::move(UpstreamCache))); + m_StructuredCacheService = + std::make_unique<HttpStructuredCacheService>(*m_CacheStore, *m_CidStore, m_StatsService, m_StatusService, *m_UpstreamCache); + + m_Http->RegisterService(*m_StructuredCacheService); + m_Http->RegisterService(*m_UpstreamService); } //////////////////////////////////////////////////////////////////////////////// |