diff options
| author | Stefan Boberg <[email protected]> | 2025-10-14 11:32:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-14 11:32:16 +0200 |
| commit | ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2 (patch) | |
| tree | 005a50adfddf6982bab3a06bb93d4c50da1a11fd /src/zenserver/storage/upstream/upstreamcache.h | |
| parent | make asiohttp work without IPv6 (#562) (diff) | |
| download | zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.tar.xz zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.zip | |
move all storage-related services into storage tree (#571)
* move all storage-related services into storage tree
* move config into config/
* also move admin service into storage since it mostly has storage related functionality
* header consolidation
Diffstat (limited to 'src/zenserver/storage/upstream/upstreamcache.h')
| -rw-r--r-- | src/zenserver/storage/upstream/upstreamcache.h | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/src/zenserver/storage/upstream/upstreamcache.h b/src/zenserver/storage/upstream/upstreamcache.h new file mode 100644 index 000000000..d5d61c8d9 --- /dev/null +++ b/src/zenserver/storage/upstream/upstreamcache.h @@ -0,0 +1,167 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinary.h> +#include <zencore/compress.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/stats.h> +#include <zencore/zencore.h> +#include <zenstore/cache/cache.h> +#include <zenstore/cache/upstreamcacheclient.h> + +#include <atomic> +#include <chrono> +#include <functional> +#include <memory> +#include <vector> + +namespace zen { + +class CbObjectView; +class AuthMgr; +class CbObjectView; +class CbPackage; +class CbObjectWriter; +class CidStore; +class ZenCacheStore; +struct JupiterClientOptions; +class JupiterAccessTokenProvider; +struct ZenStructuredCacheClientOptions; + +struct UpstreamEndpointStats +{ + metrics::OperationTiming CacheGetRequestTiming; + metrics::OperationTiming CachePutRequestTiming; + metrics::Counter CacheGetTotalBytes; + metrics::Counter CachePutTotalBytes; + metrics::Counter CacheGetCount; + metrics::Counter CacheHitCount; + metrics::Counter CacheErrorCount; +}; + +enum class UpstreamEndpointState : uint32_t +{ + kDisabled, + kUnauthorized, + kError, + kOk +}; + +inline std::string_view +ToString(UpstreamEndpointState State) +{ + using namespace std::literals; + + switch (State) + { + case UpstreamEndpointState::kDisabled: + return "Disabled"sv; + case UpstreamEndpointState::kUnauthorized: + return "Unauthorized"sv; + case UpstreamEndpointState::kError: + return "Error"sv; + case UpstreamEndpointState::kOk: + return "Ok"sv; + default: + return "Unknown"sv; + } +} + +struct UpstreamAuthConfig +{ + std::string_view OAuthUrl; + std::string_view OAuthClientId; + std::string_view OAuthClientSecret; + std::string_view OpenIdProvider; + std::string_view AccessToken; +}; + +struct UpstreamEndpointStatus +{ + std::string Reason; + UpstreamEndpointState State; +}; + +struct GetUpstreamCacheSingleResult +{ + GetUpstreamCacheResult Status; + IoBuffer Value; + const UpstreamEndpointInfo* Source = nullptr; +}; + +/** + * The upstream endpoint is responsible for handling upload/downloading of cache records. + */ +class UpstreamEndpoint +{ +public: + virtual ~UpstreamEndpoint() = default; + + virtual UpstreamEndpointStatus Initialize() = 0; + + virtual const UpstreamEndpointInfo& GetEndpointInfo() const = 0; + + virtual UpstreamEndpointState GetState() = 0; + virtual UpstreamEndpointStatus GetStatus() = 0; + + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; + + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheValueRequest*> CacheValueRequests, + OnCacheValueGetComplete&& OnComplete) = 0; + + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheChunksGetComplete&& OnComplete) = 0; + + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, + IoBuffer RecordValue, + std::span<IoBuffer const> Payloads) = 0; + + virtual UpstreamEndpointStats& Stats() = 0; + + static std::unique_ptr<UpstreamEndpoint> CreateZenEndpoint(const ZenStructuredCacheClientOptions& Options); + + static std::unique_ptr<UpstreamEndpoint> CreateJupiterEndpoint(const JupiterClientOptions& Options, + const UpstreamAuthConfig& AuthConfig, + AuthMgr& Mgr); +}; + +/** + * Manages one or more upstream cache endpoints. + */ + +class UpstreamCache : public UpstreamCacheClient +{ +public: + virtual void Initialize() = 0; + + virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; + virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0; + + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) = 0; + + virtual void GetStatus(CbObjectWriter& CbO) = 0; +}; + +struct UpstreamCacheOptions +{ + std::chrono::seconds HealthCheckInterval{5}; + uint32_t ThreadCount = 4; + bool ReadUpstream = true; + bool WriteUpstream = true; +}; + +std::unique_ptr<UpstreamCache> CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); + +} // namespace zen |