aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/jupiter.h
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-02 10:01:47 +0200
committerGitHub <[email protected]>2023-05-02 10:01:47 +0200
commit075d17f8ada47e990fe94606c3d21df409223465 (patch)
treee50549b766a2f3c354798a54ff73404217b4c9af /src/zenserver/upstream/jupiter.h
parentfix: bundle shouldn't append content zip to zen (diff)
downloadzen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz
zen-075d17f8ada47e990fe94606c3d21df409223465.zip
moved source directories into `/src` (#264)
* moved source directories into `/src` * updated bundle.lua for new `src` path * moved some docs, icon * removed old test trees
Diffstat (limited to 'src/zenserver/upstream/jupiter.h')
-rw-r--r--src/zenserver/upstream/jupiter.h217
1 files changed, 217 insertions, 0 deletions
diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h
new file mode 100644
index 000000000..99e5c530f
--- /dev/null
+++ b/src/zenserver/upstream/jupiter.h
@@ -0,0 +1,217 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iohash.h>
+#include <zencore/logging.h>
+#include <zencore/refcount.h>
+#include <zencore/thread.h>
+#include <zenhttp/httpserver.h>
+
+#include <atomic>
+#include <chrono>
+#include <list>
+#include <memory>
+#include <set>
+#include <vector>
+
+struct ZenCacheValue;
+
+namespace cpr {
+class Session;
+}
+
+namespace zen {
+namespace detail {
+ struct CloudCacheSessionState;
+}
+
+class CbObjectView;
+class CloudCacheClient;
+class IoBuffer;
+struct IoHash;
+
+/**
+ * Cached access token, for use with `Authorization:` header
+ */
+struct CloudCacheAccessToken
+{
+ using Clock = std::chrono::system_clock;
+ using TimePoint = Clock::time_point;
+
+ static constexpr int64_t ExpireMarginInSeconds = 30;
+
+ std::string Value;
+ TimePoint ExpireTime;
+
+ bool IsValid() const
+ {
+ return Value.empty() == false &&
+ ExpireMarginInSeconds < std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - Clock::now()).count();
+ }
+};
+
+struct CloudCacheResult
+{
+ IoBuffer Response;
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ int32_t ErrorCode{};
+ std::string Reason;
+ bool Success = false;
+};
+
+struct PutRefResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+};
+
+struct FinalizeRefResult : CloudCacheResult
+{
+ std::vector<IoHash> Needs;
+};
+
+struct CloudCacheExistsResult : CloudCacheResult
+{
+ std::set<IoHash> Needs;
+};
+
+struct GetObjectReferencesResult : CloudCacheResult
+{
+ std::set<IoHash> References;
+};
+
+/**
+ * Context for performing Jupiter operations
+ *
+ * Maintains an HTTP connection so that subsequent operations don't need to go
+ * through the whole connection setup process
+ *
+ */
+class CloudCacheSession
+{
+public:
+ CloudCacheSession(CloudCacheClient* CacheClient);
+ ~CloudCacheSession();
+
+ CloudCacheResult Authenticate();
+ CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
+ CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash);
+
+ PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
+ CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob);
+ CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object);
+
+ FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
+
+ CloudCacheResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key);
+
+ GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key);
+
+ CloudCacheResult BlobExists(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key);
+ CloudCacheResult ObjectExists(std::string_view Namespace, const IoHash& Key);
+
+ CloudCacheExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+
+ CloudCacheResult PostComputeTasks(IoBuffer TasksData);
+ CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0);
+
+ std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
+
+ CloudCacheClient& Client() { return *m_CacheClient; };
+
+private:
+ inline spdlog::logger& Log() { return m_Log; }
+ cpr::Session& GetSession();
+ CloudCacheAccessToken GetAccessToken(bool RefreshToken = false);
+ bool VerifyAccessToken(long StatusCode);
+
+ CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key);
+
+ CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys);
+
+ spdlog::logger& m_Log;
+ RefPtr<CloudCacheClient> m_CacheClient;
+ detail::CloudCacheSessionState* m_SessionState;
+};
+
+/**
+ * Access token provider interface
+ */
+class CloudCacheTokenProvider
+{
+public:
+ virtual ~CloudCacheTokenProvider() = default;
+
+ virtual CloudCacheAccessToken AcquireAccessToken() = 0;
+
+ static std::unique_ptr<CloudCacheTokenProvider> CreateFromStaticToken(CloudCacheAccessToken Token);
+
+ struct OAuthClientCredentialsParams
+ {
+ std::string_view Url;
+ std::string_view ClientId;
+ std::string_view ClientSecret;
+ };
+
+ static std::unique_ptr<CloudCacheTokenProvider> CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params);
+
+ static std::unique_ptr<CloudCacheTokenProvider> CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback);
+};
+
+struct CloudCacheClientOptions
+{
+ std::string_view Name;
+ std::string_view ServiceUrl;
+ std::string_view DdcNamespace;
+ std::string_view BlobStoreNamespace;
+ std::string_view ComputeCluster;
+ std::chrono::milliseconds ConnectTimeout{5000};
+ std::chrono::milliseconds Timeout{};
+};
+
+/**
+ * Jupiter upstream cache client
+ */
+class CloudCacheClient : public RefCounted
+{
+public:
+ CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider);
+ ~CloudCacheClient();
+
+ CloudCacheAccessToken AcquireAccessToken();
+ std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
+ std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
+ std::string_view ComputeCluster() const { return m_ComputeCluster; }
+ std::string_view ServiceUrl() const { return m_ServiceUrl; }
+
+ spdlog::logger& Logger() { return m_Log; }
+
+private:
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
+ std::string m_DefaultDdcNamespace;
+ std::string m_DefaultBlobStoreNamespace;
+ std::string m_ComputeCluster;
+ std::chrono::milliseconds m_ConnectTimeout{};
+ std::chrono::milliseconds m_Timeout{};
+ std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider;
+
+ RwLock m_SessionStateLock;
+ std::list<detail::CloudCacheSessionState*> m_SessionStateCache;
+
+ detail::CloudCacheSessionState* AllocSessionState();
+ void FreeSessionState(detail::CloudCacheSessionState*);
+
+ friend class CloudCacheSession;
+};
+
+} // namespace zen