diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/upstream/jupiter.h | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'src/zenserver/upstream/jupiter.h')
| -rw-r--r-- | src/zenserver/upstream/jupiter.h | 217 |
1 files changed, 217 insertions, 0 deletions
diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h new file mode 100644 index 000000000..99e5c530f --- /dev/null +++ b/src/zenserver/upstream/jupiter.h @@ -0,0 +1,217 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> +#include <zencore/logging.h> +#include <zencore/refcount.h> +#include <zencore/thread.h> +#include <zenhttp/httpserver.h> + +#include <atomic> +#include <chrono> +#include <list> +#include <memory> +#include <set> +#include <vector> + +struct ZenCacheValue; + +namespace cpr { +class Session; +} + +namespace zen { +namespace detail { + struct CloudCacheSessionState; +} + +class CbObjectView; +class CloudCacheClient; +class IoBuffer; +struct IoHash; + +/** + * Cached access token, for use with `Authorization:` header + */ +struct CloudCacheAccessToken +{ + using Clock = std::chrono::system_clock; + using TimePoint = Clock::time_point; + + static constexpr int64_t ExpireMarginInSeconds = 30; + + std::string Value; + TimePoint ExpireTime; + + bool IsValid() const + { + return Value.empty() == false && + ExpireMarginInSeconds < std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - Clock::now()).count(); + } +}; + +struct CloudCacheResult +{ + IoBuffer Response; + int64_t Bytes{}; + double ElapsedSeconds{}; + int32_t ErrorCode{}; + std::string Reason; + bool Success = false; +}; + +struct PutRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + +struct FinalizeRefResult : CloudCacheResult +{ + std::vector<IoHash> Needs; +}; + +struct CloudCacheExistsResult : CloudCacheResult +{ + std::set<IoHash> Needs; +}; + +struct GetObjectReferencesResult : CloudCacheResult +{ + std::set<IoHash> References; +}; + +/** + * Context for performing Jupiter operations + * + * Maintains an HTTP connection so that subsequent operations don't need to go + * through the whole connection setup process + * + */ +class CloudCacheSession +{ +public: + CloudCacheSession(CloudCacheClient* CacheClient); + ~CloudCacheSession(); + + CloudCacheResult Authenticate(); + CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); + CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key); + CloudCacheResult GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash); + + PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob); + CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); + + FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + + CloudCacheResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); + + GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key); + + CloudCacheResult BlobExists(std::string_view Namespace, const IoHash& Key); + CloudCacheResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key); + CloudCacheResult ObjectExists(std::string_view Namespace, const IoHash& Key); + + CloudCacheExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys); + + CloudCacheResult PostComputeTasks(IoBuffer TasksData); + CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0); + + std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + + CloudCacheClient& Client() { return *m_CacheClient; }; + +private: + inline spdlog::logger& Log() { return m_Log; } + cpr::Session& GetSession(); + CloudCacheAccessToken GetAccessToken(bool RefreshToken = false); + bool VerifyAccessToken(long StatusCode); + + CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); + + CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); + + spdlog::logger& m_Log; + RefPtr<CloudCacheClient> m_CacheClient; + detail::CloudCacheSessionState* m_SessionState; +}; + +/** + * Access token provider interface + */ +class CloudCacheTokenProvider +{ +public: + virtual ~CloudCacheTokenProvider() = default; + + virtual CloudCacheAccessToken AcquireAccessToken() = 0; + + static std::unique_ptr<CloudCacheTokenProvider> CreateFromStaticToken(CloudCacheAccessToken Token); + + struct OAuthClientCredentialsParams + { + std::string_view Url; + std::string_view ClientId; + std::string_view ClientSecret; + }; + + static std::unique_ptr<CloudCacheTokenProvider> CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params); + + static std::unique_ptr<CloudCacheTokenProvider> CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback); +}; + +struct CloudCacheClientOptions +{ + std::string_view Name; + std::string_view ServiceUrl; + std::string_view DdcNamespace; + std::string_view BlobStoreNamespace; + std::string_view ComputeCluster; + std::chrono::milliseconds ConnectTimeout{5000}; + std::chrono::milliseconds Timeout{}; +}; + +/** + * Jupiter upstream cache client + */ +class CloudCacheClient : public RefCounted +{ +public: + CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider); + ~CloudCacheClient(); + + CloudCacheAccessToken AcquireAccessToken(); + std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } + std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } + std::string_view ComputeCluster() const { return m_ComputeCluster; } + std::string_view ServiceUrl() const { return m_ServiceUrl; } + + spdlog::logger& Logger() { return m_Log; } + +private: + spdlog::logger& m_Log; + std::string m_ServiceUrl; + std::string m_DefaultDdcNamespace; + std::string m_DefaultBlobStoreNamespace; + std::string m_ComputeCluster; + std::chrono::milliseconds m_ConnectTimeout{}; + std::chrono::milliseconds m_Timeout{}; + std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider; + + RwLock m_SessionStateLock; + std::list<detail::CloudCacheSessionState*> m_SessionStateCache; + + detail::CloudCacheSessionState* AllocSessionState(); + void FreeSessionState(detail::CloudCacheSessionState*); + + friend class CloudCacheSession; +}; + +} // namespace zen |