diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/upstream/upstreamcache.h | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenserver/upstream/upstreamcache.h')
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.h | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h new file mode 100644 index 000000000..695c06b32 --- /dev/null +++ b/src/zenserver/upstream/upstreamcache.h @@ -0,0 +1,252 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinary.h> +#include <zencore/compress.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/stats.h> +#include <zencore/zencore.h> +#include <zenutil/cache/cache.h> + +#include <atomic> +#include <chrono> +#include <functional> +#include <memory> +#include <vector> + +namespace zen { + +class CbObjectView; +class AuthMgr; +class CbObjectView; +class CbPackage; +class CbObjectWriter; +class CidStore; +class ZenCacheStore; +struct CloudCacheClientOptions; +class CloudCacheTokenProvider; +struct ZenStructuredCacheClientOptions; + +struct UpstreamCacheRecord +{ + ZenContentType Type = ZenContentType::kBinary; + std::string Namespace; + CacheKey Key; + std::vector<IoHash> ValueContentIds; +}; + +struct UpstreamCacheOptions +{ + std::chrono::seconds HealthCheckInterval{5}; + uint32_t ThreadCount = 4; + bool ReadUpstream = true; + bool WriteUpstream = true; +}; + +struct UpstreamError +{ + int32_t ErrorCode{}; + std::string Reason{}; + + explicit operator bool() const { return ErrorCode != 0; } +}; + +struct UpstreamEndpointInfo +{ + std::string Name; + std::string Url; +}; + +struct GetUpstreamCacheResult +{ + UpstreamError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; +}; + +struct GetUpstreamCacheSingleResult +{ + GetUpstreamCacheResult Status; + IoBuffer Value; + const UpstreamEndpointInfo* Source = nullptr; +}; + +struct PutUpstreamCacheResult +{ + std::string Reason; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; +}; + +struct CacheRecordGetCompleteParams +{ + CacheKeyRequest& Request; + const CbObjectView& Record; + const CbPackage& Package; + double ElapsedSeconds{}; + const UpstreamEndpointInfo* Source = nullptr; +}; + +using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>; + +struct CacheValueGetCompleteParams +{ + CacheValueRequest& Request; + IoHash RawHash; + uint64_t RawSize; + IoBuffer Value; + double ElapsedSeconds{}; + const UpstreamEndpointInfo* Source = nullptr; +}; + +using OnCacheValueGetComplete = std::function<void(CacheValueGetCompleteParams&&)>; + +struct CacheChunkGetCompleteParams +{ + CacheChunkRequest& Request; + IoHash RawHash; + uint64_t RawSize; + IoBuffer Value; + double ElapsedSeconds{}; + const UpstreamEndpointInfo* Source = nullptr; +}; + +using OnCacheChunksGetComplete = std::function<void(CacheChunkGetCompleteParams&&)>; + +struct UpstreamEndpointStats +{ + metrics::OperationTiming CacheGetRequestTiming; + metrics::OperationTiming CachePutRequestTiming; + metrics::Counter CacheGetTotalBytes; + metrics::Counter CachePutTotalBytes; + metrics::Counter CacheGetCount; + metrics::Counter CacheHitCount; + metrics::Counter CacheErrorCount; +}; + +enum class UpstreamEndpointState : uint32_t +{ + kDisabled, + kUnauthorized, + kError, + kOk +}; + +inline std::string_view +ToString(UpstreamEndpointState State) +{ + using namespace std::literals; + + switch (State) + { + case UpstreamEndpointState::kDisabled: + return "Disabled"sv; + case UpstreamEndpointState::kUnauthorized: + return "Unauthorized"sv; + case UpstreamEndpointState::kError: + return "Error"sv; + case UpstreamEndpointState::kOk: + return "Ok"sv; + default: + return "Unknown"sv; + } +} + +struct UpstreamAuthConfig +{ + std::string_view OAuthUrl; + std::string_view OAuthClientId; + std::string_view OAuthClientSecret; + std::string_view OpenIdProvider; + std::string_view AccessToken; +}; + +struct UpstreamEndpointStatus +{ + std::string Reason; + UpstreamEndpointState State; +}; + +/** + * The upstream endpoint is responsible for handling upload/downloading of cache records. + */ +class UpstreamEndpoint +{ +public: + virtual ~UpstreamEndpoint() = default; + + virtual UpstreamEndpointStatus Initialize() = 0; + + virtual const UpstreamEndpointInfo& GetEndpointInfo() const = 0; + + virtual UpstreamEndpointState GetState() = 0; + virtual UpstreamEndpointStatus GetStatus() = 0; + + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheResult GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; + + virtual GetUpstreamCacheResult GetCacheValues(std::string_view Namespace, + std::span<CacheValueRequest*> CacheValueRequests, + OnCacheValueGetComplete&& OnComplete) = 0; + + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, const CacheKey& CacheKey, const IoHash& PayloadId) = 0; + virtual GetUpstreamCacheResult GetCacheChunks(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheChunksGetComplete&& OnComplete) = 0; + + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, + IoBuffer RecordValue, + std::span<IoBuffer const> Payloads) = 0; + + virtual UpstreamEndpointStats& Stats() = 0; + + static std::unique_ptr<UpstreamEndpoint> CreateZenEndpoint(const ZenStructuredCacheClientOptions& Options); + + static std::unique_ptr<UpstreamEndpoint> CreateJupiterEndpoint(const CloudCacheClientOptions& Options, + const UpstreamAuthConfig& AuthConfig, + AuthMgr& Mgr); +}; + +/** + * Manages one or more upstream cache endpoints. + */ +class UpstreamCache +{ +public: + virtual ~UpstreamCache() = default; + + virtual void Initialize() = 0; + + virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; + virtual void IterateEndpoints(std::function<bool(UpstreamEndpoint&)>&& Fn) = 0; + + virtual GetUpstreamCacheSingleResult GetCacheRecord(std::string_view Namespace, const CacheKey& CacheKey, ZenContentType Type) = 0; + virtual void GetCacheRecords(std::string_view Namespace, + std::span<CacheKeyRequest*> Requests, + OnCacheRecordGetComplete&& OnComplete) = 0; + + virtual void GetCacheValues(std::string_view Namespace, + std::span<CacheValueRequest*> CacheValueRequests, + OnCacheValueGetComplete&& OnComplete) = 0; + + virtual GetUpstreamCacheSingleResult GetCacheChunk(std::string_view Namespace, + const CacheKey& CacheKey, + const IoHash& ValueContentId) = 0; + virtual void GetCacheChunks(std::string_view Namespace, + std::span<CacheChunkRequest*> CacheChunkRequests, + OnCacheChunksGetComplete&& OnComplete) = 0; + + virtual void EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; + + virtual void GetStatus(CbObjectWriter& CbO) = 0; + + static std::unique_ptr<UpstreamCache> Create(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); +}; + +} // namespace zen |