// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include #include #include #include #include #include #include #include #include #include #include namespace zen { class CbObjectView; class AuthMgr; class CbObjectView; class CbPackage; class CbObjectWriter; class CidStore; class ZenCacheStore; struct CloudCacheClientOptions; class CloudCacheTokenProvider; struct ZenStructuredCacheClientOptions; struct UpstreamCacheRecord { ZenContentType Type = ZenContentType::kBinary; std::string Namespace; CacheKey Key; std::vector ValueContentIds; }; struct UpstreamCacheOptions { std::chrono::seconds HealthCheckInterval{5}; uint32_t ThreadCount = 4; bool ReadUpstream = true; bool WriteUpstream = true; }; struct UpstreamError { int32_t ErrorCode{}; std::string Reason{}; explicit operator bool() const { return ErrorCode != 0; } }; struct UpstreamEndpointInfo { std::string Name; std::string Url; }; struct GetUpstreamCacheResult { UpstreamError Error{}; int64_t Bytes{}; double ElapsedSeconds{}; bool Success = false; }; struct GetUpstreamCacheSingleResult { GetUpstreamCacheResult Status; IoBuffer Value; const UpstreamEndpointInfo* Source = nullptr; }; struct PutUpstreamCacheResult { std::string Reason; int64_t Bytes{}; double ElapsedSeconds{}; bool Success = false; }; struct CacheRecordGetCompleteParams { CacheKeyRequest& Request; const CbObjectView& Record; const CbPackage& Package; double ElapsedSeconds{}; const UpstreamEndpointInfo* Source = nullptr; }; using OnCacheRecordGetComplete = std::function; struct CacheValueGetCompleteParams { CacheValueRequest& Request; IoHash RawHash; uint64_t RawSize; IoBuffer Value; double ElapsedSeconds{}; const UpstreamEndpointInfo* Source = nullptr; }; using OnCacheValueGetComplete = std::function; struct CacheChunkGetCompleteParams { CacheChunkRequest& Request; IoHash RawHash; uint64_t RawSize; IoBuffer Value; double ElapsedSeconds{}; const UpstreamEndpointInfo* Source = nullptr; }; using OnCacheChunksGetComplete = std::function; struct UpstreamEndpointStats { metrics::OperationTiming CacheGetRequestTiming; metrics::OperationTiming CachePutRequestTiming; metrics::Counter CacheGetTotalBytes; metrics::Counter CachePutTotalBytes; metrics::Counter CacheGetCount; metrics::Counter CacheHitCount; metrics::Counter CacheErrorCount; }; enum class UpstreamEndpointState : uint32_t { kDisabled, kUnauthorized, kError, kOk }; inline std::string_view ToString(UpstreamEndpointState State) { using namespace std::literals; switch (State) { case UpstreamEndpointState::kDisabled: return "Disabled"sv; case UpstreamEndpointState::kUnauthorized: return "Unauthorized"sv; case UpstreamEndpointState::kError: return "Error"sv; case UpstreamEndpointState::kOk: return "Ok"sv; default: return "Unknown"sv; } } struct UpstreamAuthConfig { std::string_view OAuthUrl; std::string_view OAuthClientId; std::string_view OAuthClientSecret; std::string_view OpenIdProvider; std::string_view AccessToken; }; struct UpstreamEndpointStatus { std::string Reason; UpstreamEndpointState State; }; /** * The upstream endpoint is responsible for handling upload/downloading of cache records. */ class UpstreamEndpoint { public: virtual ~UpstreamEndpoint() = default; virtual UpstreamEndpointStatus Initialize() = 0; virtual const UpstreamEndpointInfo& GetEndpointInfo() const = 0; virtual UpstreamEndpointState GetState() = 0; virtual UpstreamEndpointStatus GetStatus() = 0; virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, std::span Requests, OnCacheRecordGetComplete&& OnComplete) = 0; virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, std::span CacheValueRequests, OnCacheValueGetComplete&& OnComplete) = 0; virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0; virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, std::span CacheChunkRequests, OnCacheChunksGetComplete&& OnComplete) = 0; virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span Payloads) = 0; virtual UpstreamEndpointStats& Stats() = 0; static std::unique_ptr CreateZenEndpoint(const ZenStructuredCacheClientOptions& Options); static std::unique_ptr CreateJupiterEndpoint(const CloudCacheClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr); }; /** * Manages one or more upstream cache endpoints. */ class UpstreamCache { public: virtual ~UpstreamCache() = default; virtual void Initialize() = 0; virtual void RegisterEndpoint(std::unique_ptr Endpoint) = 0; virtual void IterateEndpoints(std::function&& Fn) = 0; virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; virtual void GetCacheRecords(std::string_view Namespace, std::span Requests, OnCacheRecordGetComplete&& OnComplete) = 0; virtual void GetCacheValues(std::string_view Namespace, std::span CacheValueRequests, OnCacheValueGetComplete&& OnComplete) = 0; virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& ValueContentId) = 0; virtual void GetCacheChunks(std::string_view Namespace, std::span CacheChunkRequests, OnCacheChunksGetComplete&& OnComplete) = 0; virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; virtual void GetStatus(CbObjectWriter& CbO) = 0; static std::unique_ptr Create(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); }; } // namespace zen