aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/storage/upstream/upstreamcache.h
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-10-14 11:32:16 +0200
committerGitHub Enterprise <[email protected]>2025-10-14 11:32:16 +0200
commitca09abbeef5b1788f4a52b61eedd2f3dd07f81f2 (patch)
tree005a50adfddf6982bab3a06bb93d4c50da1a11fd /src/zenserver/storage/upstream/upstreamcache.h
parentmake asiohttp work without IPv6 (#562) (diff)
downloadzen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.tar.xz
zen-ca09abbeef5b1788f4a52b61eedd2f3dd07f81f2.zip
move all storage-related services into storage tree (#571)
* move all storage-related services into storage tree * move config into config/ * also move admin service into storage since it mostly has storage related functionality * header consolidation
Diffstat (limited to 'src/zenserver/storage/upstream/upstreamcache.h')
-rw-r--r--src/zenserver/storage/upstream/upstreamcache.h167
1 files changed, 167 insertions, 0 deletions
diff --git a/src/zenserver/storage/upstream/upstreamcache.h b/src/zenserver/storage/upstream/upstreamcache.h
new file mode 100644
index 000000000..d5d61c8d9
--- /dev/null
+++ b/src/zenserver/storage/upstream/upstreamcache.h
@@ -0,0 +1,167 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinary.h>
+#include <zencore/compress.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/stats.h>
+#include <zencore/zencore.h>
+#include <zenstore/cache/cache.h>
+#include <zenstore/cache/upstreamcacheclient.h>
+
+#include <atomic>
+#include <chrono>
+#include <functional>
+#include <memory>
+#include <vector>
+
+namespace zen {
+
+class CbObjectView;
+class AuthMgr;
+class CbObjectView;
+class CbPackage;
+class CbObjectWriter;
+class CidStore;
+class ZenCacheStore;
+struct JupiterClientOptions;
+class JupiterAccessTokenProvider;
+struct ZenStructuredCacheClientOptions;
+
+struct UpstreamEndpointStats
+{
+ metrics::OperationTiming CacheGetRequestTiming;
+ metrics::OperationTiming CachePutRequestTiming;
+ metrics::Counter CacheGetTotalBytes;
+ metrics::Counter CachePutTotalBytes;
+ metrics::Counter CacheGetCount;
+ metrics::Counter CacheHitCount;
+ metrics::Counter CacheErrorCount;
+};
+
+enum class UpstreamEndpointState : uint32_t
+{
+ kDisabled,
+ kUnauthorized,
+ kError,
+ kOk
+};
+
+inline std::string_view
+ToString(UpstreamEndpointState State)
+{
+ using namespace std::literals;
+
+ switch (State)
+ {
+ case UpstreamEndpointState::kDisabled:
+ return "Disabled"sv;
+ case UpstreamEndpointState::kUnauthorized:
+ return "Unauthorized"sv;
+ case UpstreamEndpointState::kError:
+ return "Error"sv;
+ case UpstreamEndpointState::kOk:
+ return "Ok"sv;
+ default:
+ return "Unknown"sv;
+ }
+}
+
+struct UpstreamAuthConfig
+{
+ std::string_view OAuthUrl;
+ std::string_view OAuthClientId;
+ std::string_view OAuthClientSecret;
+ std::string_view OpenIdProvider;
+ std::string_view AccessToken;
+};
+
+struct UpstreamEndpointStatus
+{
+ std::string Reason;
+ UpstreamEndpointState State;
+};
+
+struct GetUpstreamCacheSingleResult
+{
+ GetUpstreamCacheResult Status;
+ IoBuffer Value;
+ const UpstreamEndpointInfo* Source = nullptr;
+};
+
+/**
+ * The upstream endpoint is responsible for handling upload/downloading of cache records.
+ */
+class UpstreamEndpoint
+{
+public:
+ virtual ~UpstreamEndpoint() = default;
+
+ virtual UpstreamEndpointStatus Initialize() = 0;
+
+ virtual const UpstreamEndpointInfo& GetEndpointInfo() const = 0;
+
+ virtual UpstreamEndpointState GetState() = 0;
+ virtual UpstreamEndpointStatus GetStatus() = 0;
+
+ virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0;
+ virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace,
+ std::span<CacheKeyRequest*> Requests,
+ OnCacheRecordGetComplete&& OnComplete) = 0;
+
+ virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace,
+ std::span<CacheValueRequest*> CacheValueRequests,
+ OnCacheValueGetComplete&& OnComplete) = 0;
+
+ virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
+ virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace,
+ std::span<CacheChunkRequest*> CacheChunkRequests,
+ OnCacheChunksGetComplete&& OnComplete) = 0;
+
+ virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
+ IoBuffer RecordValue,
+ std::span<IoBuffer const> Payloads) = 0;
+
+ virtual UpstreamEndpointStats& Stats() = 0;
+
+ static std::unique_ptr<UpstreamEndpoint> CreateZenEndpoint(const ZenStructuredCacheClientOptions& Options);
+
+ static std::unique_ptr<UpstreamEndpoint> CreateJupiterEndpoint(const JupiterClientOptions& Options,
+ const UpstreamAuthConfig& AuthConfig,
+ AuthMgr& Mgr);
+};
+
+/**
+ * Manages one or more upstream cache endpoints.
+ */
+
+class UpstreamCache : public UpstreamCacheClient
+{
+public:
+ virtual void Initialize() = 0;
+
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
+ virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0;
+
+ virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0;
+
+ virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace,
+ const CacheKey& CacheKey,
+ const IoHash& ValueContentId) = 0;
+
+ virtual void GetStatus(CbObjectWriter& CbO) = 0;
+};
+
+struct UpstreamCacheOptions
+{
+ std::chrono::seconds HealthCheckInterval{5};
+ uint32_t ThreadCount = 4;
+ bool ReadUpstream = true;
+ bool WriteUpstream = true;
+};
+
+std::unique_ptr<UpstreamCache> CreateUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);
+
+} // namespace zen