aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-01-23 12:49:46 +0100
committerDan Engelbrecht <[email protected]>2025-01-23 12:49:46 +0100
commit5d47e5946da5ccdba5bd2fb770b7bdfabb48fb4c (patch)
treecb64f2d93a9f5e8b0e1529ab0dc3371602688a4f /src/zenutil
parentchangelog (diff)
parenthandle special backslash followed by quote for paths (#279) (diff)
downloadzen-5d47e5946da5ccdba5bd2fb770b7bdfabb48fb4c.tar.xz
zen-5d47e5946da5ccdba5bd2fb770b7bdfabb48fb4c.zip
Merge remote-tracking branch 'origin/main' into de/zen-service-command
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/include/zenutil/jupiter.h256
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupiterclient.h57
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h152
-rw-r--r--src/zenutil/jupiter.cpp660
-rw-r--r--src/zenutil/jupiter/jupiterclient.cpp29
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp505
6 files changed, 743 insertions, 916 deletions
diff --git a/src/zenutil/include/zenutil/jupiter.h b/src/zenutil/include/zenutil/jupiter.h
deleted file mode 100644
index 50e4ad68a..000000000
--- a/src/zenutil/include/zenutil/jupiter.h
+++ /dev/null
@@ -1,256 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zenbase/refcount.h>
-#include <zencore/iohash.h>
-#include <zencore/logging.h>
-#include <zencore/thread.h>
-#include <zenhttp/httpclient.h>
-#include <zenhttp/httpserver.h>
-
-#include <atomic>
-#include <chrono>
-#include <list>
-#include <memory>
-#include <set>
-#include <vector>
-
-struct ZenCacheValue;
-
-namespace cpr {
-class Session;
-}
-
-namespace zen {
-
-class CbObjectView;
-class CloudCacheClient;
-class IoBuffer;
-struct IoHash;
-
-/**
- * Cached access token, for use with `Authorization:` header
- */
-struct CloudCacheAccessToken
-{
- using Clock = std::chrono::system_clock;
- using TimePoint = Clock::time_point;
-
- static constexpr int64_t ExpireMarginInSeconds = 30;
-
- std::string Value;
- TimePoint ExpireTime;
-
- bool IsValid() const
- {
- return Value.empty() == false &&
- ExpireMarginInSeconds < std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - Clock::now()).count();
- }
-};
-
-struct CloudCacheResult
-{
- IoBuffer Response;
- uint64_t SentBytes{};
- uint64_t ReceivedBytes{};
- double ElapsedSeconds{};
- int32_t ErrorCode{};
- std::string Reason;
- bool Success = false;
-};
-
-struct PutRefResult : CloudCacheResult
-{
- std::vector<IoHash> Needs;
- IoHash RawHash;
-};
-
-struct FinalizeRefResult : CloudCacheResult
-{
- std::vector<IoHash> Needs;
-};
-
-struct CloudCacheExistsResult : CloudCacheResult
-{
- std::set<IoHash> Needs;
-};
-
-struct GetObjectReferencesResult : CloudCacheResult
-{
- std::set<IoHash> References;
-};
-
-struct PutBuildPartResult : CloudCacheResult
-{
- std::vector<IoHash> Needs;
- IoHash RawHash;
-};
-
-struct FinalizeBuildPartResult : CloudCacheResult
-{
- std::vector<IoHash> Needs;
-};
-
-/**
- * Context for performing Jupiter operations
- *
- * Maintains an HTTP connection so that subsequent operations don't need to go
- * through the whole connection setup process
- *
- */
-class CloudCacheSession
-{
-public:
- CloudCacheSession(CloudCacheClient* CacheClient);
- ~CloudCacheSession();
-
- CloudCacheResult Authenticate();
-
- CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
- CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
- CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult GetInlineBlob(std::string_view Namespace,
- std::string_view BucketId,
- const IoHash& Key,
- IoHash& OutPayloadHash,
- std::filesystem::path TempFolderPath = {});
-
- PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
- CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
- CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
- CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob);
- CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object);
-
- FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
-
- CloudCacheResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key);
-
- GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key);
-
- CloudCacheResult BlobExists(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key);
- CloudCacheResult ObjectExists(std::string_view Namespace, const IoHash& Key);
-
- CloudCacheExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
- CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
- CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys);
-
- std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
-
- CloudCacheResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
- CloudCacheResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- CloudCacheResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- PutBuildPartResult PutBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- std::string_view PartName,
- const IoBuffer& Payload);
- CloudCacheResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
- CloudCacheResult PutBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload);
- CloudCacheResult GetBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- std::filesystem::path TempFolderPath);
- CloudCacheResult PutBlockMetadata(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- const IoBuffer& Payload);
- FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& RawHash);
- CloudCacheResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
-
- CloudCacheClient& Client() { return *m_CacheClient; };
-
-private:
- inline LoggerRef Log() { return m_Log; }
-
- CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key);
-
- CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys);
-
- LoggerRef m_Log;
- RefPtr<CloudCacheClient> m_CacheClient;
-};
-
-/**
- * Access token provider interface
- */
-class CloudCacheTokenProvider
-{
-public:
- virtual ~CloudCacheTokenProvider() = default;
-
- virtual CloudCacheAccessToken AcquireAccessToken() = 0;
-
- static std::unique_ptr<CloudCacheTokenProvider> CreateFromStaticToken(CloudCacheAccessToken Token);
-
- struct OAuthClientCredentialsParams
- {
- std::string_view Url;
- std::string_view ClientId;
- std::string_view ClientSecret;
- };
-
- static std::unique_ptr<CloudCacheTokenProvider> CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params);
-
- static std::unique_ptr<CloudCacheTokenProvider> CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback);
-};
-
-struct CloudCacheClientOptions
-{
- std::string_view Name;
- std::string_view ServiceUrl;
- std::string_view DdcNamespace;
- std::string_view BlobStoreNamespace;
- std::string_view ComputeCluster;
- std::chrono::milliseconds ConnectTimeout{5000};
- std::chrono::milliseconds Timeout{};
- bool AssumeHttp2 = false;
- bool AllowResume = false;
- uint8_t RetryCount = 0;
-};
-
-/**
- * Jupiter upstream cache client
- */
-class CloudCacheClient : public RefCounted
-{
-public:
- CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider);
- ~CloudCacheClient();
-
- std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
- std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
- std::string_view ComputeCluster() const { return m_ComputeCluster; }
- std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); }
-
- LoggerRef Logger() { return m_Log; }
-
-private:
- LoggerRef m_Log;
- const std::string m_DefaultDdcNamespace;
- const std::string m_DefaultBlobStoreNamespace;
- const std::string m_ComputeCluster;
- const std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider;
- HttpClient m_HttpClient;
-
- friend class CloudCacheSession;
-};
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupiterclient.h b/src/zenutil/include/zenutil/jupiter/jupiterclient.h
new file mode 100644
index 000000000..defe50edc
--- /dev/null
+++ b/src/zenutil/include/zenutil/jupiter/jupiterclient.h
@@ -0,0 +1,57 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenbase/refcount.h>
+#include <zencore/logging.h>
+#include <zenhttp/httpclient.h>
+
+#include <chrono>
+
+namespace zen {
+
+class IoBuffer;
+
+struct JupiterClientOptions
+{
+ std::string_view Name;
+ std::string_view ServiceUrl;
+ std::string_view DdcNamespace;
+ std::string_view BlobStoreNamespace;
+ std::string_view ComputeCluster;
+ std::chrono::milliseconds ConnectTimeout{5000};
+ std::chrono::milliseconds Timeout{};
+ bool AssumeHttp2 = false;
+ bool AllowResume = false;
+ uint8_t RetryCount = 0;
+};
+
+/**
+ * Jupiter upstream cache client
+ */
+class JupiterClient : public RefCounted
+{
+public:
+ JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider);
+ ~JupiterClient();
+
+ std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
+ std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
+ std::string_view ComputeCluster() const { return m_ComputeCluster; }
+ std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); }
+
+ LoggerRef Logger() { return m_Log; }
+ HttpClient& Client() { return m_HttpClient; }
+
+private:
+ LoggerRef m_Log;
+ const std::string m_DefaultDdcNamespace;
+ const std::string m_DefaultBlobStoreNamespace;
+ const std::string m_ComputeCluster;
+ std::function<HttpClientAccessToken()> m_TokenProvider;
+ HttpClient m_HttpClient;
+
+ friend class JupiterSession;
+};
+
+} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
new file mode 100644
index 000000000..6a80332f4
--- /dev/null
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -0,0 +1,152 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iohash.h>
+#include <zencore/logging.h>
+#include <zenhttp/httpclient.h>
+
+#include <set>
+
+namespace zen {
+
+class IoBuffer;
+
+struct JupiterResult
+{
+ IoBuffer Response;
+ uint64_t SentBytes{};
+ uint64_t ReceivedBytes{};
+ double ElapsedSeconds{};
+ int32_t ErrorCode{};
+ std::string Reason;
+ bool Success = false;
+};
+
+struct PutRefResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+ IoHash RawHash;
+};
+
+struct FinalizeRefResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+};
+
+struct JupiterExistsResult : JupiterResult
+{
+ std::set<IoHash> Needs;
+};
+
+struct GetObjectReferencesResult : JupiterResult
+{
+ std::set<IoHash> References;
+};
+
+struct PutBuildPartResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+ IoHash RawHash;
+};
+
+struct FinalizeBuildPartResult : JupiterResult
+{
+ std::vector<IoHash> Needs;
+};
+
+/**
+ * Context for performing Jupiter operations
+ *
+ * Maintains an HTTP connection so that subsequent operations don't need to go
+ * through the whole connection setup process
+ *
+ */
+class JupiterSession
+{
+public:
+ JupiterSession(LoggerRef InLog, HttpClient& InHttpClient);
+ ~JupiterSession();
+
+ JupiterResult Authenticate();
+
+ JupiterResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
+ JupiterResult GetBlob(std::string_view Namespace, const IoHash& Key);
+ JupiterResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
+ JupiterResult GetObject(std::string_view Namespace, const IoHash& Key);
+ JupiterResult GetInlineBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoHash& OutPayloadHash,
+ std::filesystem::path TempFolderPath = {});
+
+ PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType);
+ JupiterResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob);
+ JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob);
+ JupiterResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object);
+
+ FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
+
+ JupiterResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key);
+
+ GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key);
+
+ JupiterResult BlobExists(std::string_view Namespace, const IoHash& Key);
+ JupiterResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key);
+ JupiterResult ObjectExists(std::string_view Namespace, const IoHash& Key);
+
+ JupiterExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ JupiterExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+ JupiterExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys);
+
+ std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
+
+ JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
+ JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ PutBuildPartResult PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload);
+ JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+ JupiterResult PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload);
+ JupiterResult GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath);
+ JupiterResult PutBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ const IoBuffer& Payload);
+ FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& RawHash);
+ JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+
+private:
+ inline LoggerRef Log() { return m_Log; }
+
+ JupiterResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key);
+
+ JupiterExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys);
+
+ LoggerRef m_Log;
+ HttpClient& m_HttpClient;
+};
+
+} // namespace zen
diff --git a/src/zenutil/jupiter.cpp b/src/zenutil/jupiter.cpp
deleted file mode 100644
index df4af0c13..000000000
--- a/src/zenutil/jupiter.cpp
+++ /dev/null
@@ -1,660 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/jupiter.h>
-
-#include <zencore/basicfile.h>
-#include <zencore/compactbinary.h>
-#include <zencore/compositebuffer.h>
-#include <zencore/fmtutils.h>
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-#include <zencore/scopeguard.h>
-#include <zencore/thread.h>
-#include <zencore/trace.h>
-#include <zenhttp/formatters.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <fmt/format.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-//#if ZEN_PLATFORM_WINDOWS
-//# pragma comment(lib, "Crypt32.lib")
-//# pragma comment(lib, "Wldap32.lib")
-//#endif
-
-#include <json11.hpp>
-
-using namespace std::literals;
-
-namespace zen {
-
-namespace detail {
- CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
- {
- if (Response.Error)
- {
- return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
- .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
- .ElapsedSeconds = Response.ElapsedSeconds,
- .ErrorCode = Response.Error.value().ErrorCode,
- .Reason = Response.ErrorMessage(ErrorPrefix),
- .Success = false};
- }
- if (!Response.IsSuccess())
- {
- return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
- .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
- .ElapsedSeconds = Response.ElapsedSeconds,
- .ErrorCode = static_cast<int32_t>(Response.StatusCode),
- .Reason = Response.ErrorMessage(ErrorPrefix),
- .Success = false};
- }
- return {.Response = Response.ResponsePayload,
- .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
- .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
- .ElapsedSeconds = Response.ElapsedSeconds,
- .ErrorCode = 0,
- .Success = true};
- }
-} // namespace detail
-
-CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient)
-{
-}
-
-CloudCacheSession::~CloudCacheSession()
-{
-}
-
-CloudCacheResult
-CloudCacheSession::Authenticate()
-{
- bool OK = m_CacheClient->m_HttpClient.Authenticate();
- return {.Success = OK};
-}
-
-CloudCacheResult
-CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
-{
- ZEN_TRACE_CPU("JupiterClient::GetRef");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
- {HttpClient::Accept(RefType)});
-
- return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::GetBlob");
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()),
- {HttpClient::Accept(ZenContentType::kBinary)});
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath)
-{
- ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
- TempFolderPath,
- {HttpClient::Accept(ZenContentType::kCompressedBinary)});
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::GetInlineBlob(std::string_view Namespace,
- std::string_view BucketId,
- const IoHash& Key,
- IoHash& OutPayloadHash,
- std::filesystem::path TempFolderPath)
-{
- ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
- TempFolderPath,
- {{"Accept", "application/x-jupiter-inline"}});
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
-
- if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end())
- {
- const std::string& PayloadHashHeader = It->second;
- if (PayloadHashHeader.length() == IoHash::StringLength)
- {
- OutPayloadHash = IoHash::FromHexString(PayloadHashHeader);
- }
- }
-
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::GetObject");
-
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()),
- {HttpClient::Accept(ZenContentType::kCbObject)});
-
- return detail::ConvertResponse(Response);
-}
-
-PutRefResult
-CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
-{
- ZEN_TRACE_CPU("JupiterClient::PutRef");
-
- Ref.SetContentType(RefType);
-
- IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
- Ref,
- {{"X-Jupiter-IoHash", Hash.ToHexString()}});
-
- PutRefResult Result = {detail::ConvertResponse(Response)};
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- Result.RawHash = Hash;
- }
- return Result;
-}
-
-FinalizeRefResult
-CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
-{
- ZEN_TRACE_CPU("JupiterClient::FinalizeRef");
-
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(
- fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()),
- {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
-
- FinalizeRefResult Result = {detail::ConvertResponse(Response)};
-
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- }
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
-{
- ZEN_TRACE_CPU("JupiterClient::PutBlob");
-
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
-{
- ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
-
- Blob.SetContentType(ZenContentType::kCompressedBinary);
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload)
-{
- ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
- Payload,
- ZenContentType::kCompressedBinary);
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object)
-{
- ZEN_TRACE_CPU("JupiterClient::PutObject");
-
- Object.SetContentType(ZenContentType::kCbObject);
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object);
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheResult
-CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::RefExists");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()));
-
- return detail::ConvertResponse(Response);
-}
-
-GetObjectReferencesResult
-CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()),
- {HttpClient::Accept(ZenContentType::kCbObject)});
-
- GetObjectReferencesResult Result = {detail::ConvertResponse(Response)};
-
- if (Result.Success)
- {
- const CbObject ReferencesResponse = Response.AsObject();
- for (auto& Item : ReferencesResponse["references"sv])
- {
- Result.References.insert(Item.AsHash());
- }
- }
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key)
-{
- return CacheTypeExists(Namespace, "blobs"sv, Key);
-}
-
-CloudCacheResult
-CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key)
-{
- return CacheTypeExists(Namespace, "compressed-blobs"sv, Key);
-}
-
-CloudCacheResult
-CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key)
-{
- return CacheTypeExists(Namespace, "objects"sv, Key);
-}
-
-CloudCacheExistsResult
-CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
-{
- return CacheTypeExists(Namespace, "blobs"sv, Keys);
-}
-
-CloudCacheExistsResult
-CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
-{
- return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys);
-}
-
-CloudCacheExistsResult
-CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys)
-{
- return CacheTypeExists(Namespace, "objects"sv, Keys);
-}
-
-std::vector<IoHash>
-CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
-{
- // ExtendableStringBuilder<256> Uri;
- // Uri << m_CacheClient->ServiceUrl();
- // Uri << "/api/v1/s/" << Namespace;
-
- ZEN_UNUSED(Namespace, BucketId, ChunkHashes);
-
- return {};
-}
-
-CloudCacheResult
-CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
-{
- ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
-
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString()));
-
- return detail::ConvertResponse(Response);
-}
-
-CloudCacheExistsResult
-CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys)
-{
- ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
-
- ExtendableStringBuilder<256> Body;
- Body << "[";
- for (const auto& Key : Keys)
- {
- Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\"";
- }
- Body << "]";
- IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size());
- Payload.SetContentType(ZenContentType::kJSON);
-
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace),
- Payload,
- {HttpClient::Accept(ZenContentType::kCbObject)});
-
- CloudCacheExistsResult Result = {detail::ConvertResponse(Response)};
-
- if (Result.Success)
- {
- const CbObject ExistsResponse = Response.AsObject();
- for (auto& Item : ExistsResponse["needs"sv])
- {
- Result.Needs.insert(Item.AsHash());
- }
- }
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload);
- return detail::ConvertResponse(Response, "CloudCacheSession::PutBuild"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
-{
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId),
- HttpClient::Accept(ZenContentType::kCbObject));
- return detail::ConvertResponse(Response, "CloudCacheSession::GetBuild"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
-{
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId));
- return detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuild"sv);
-}
-
-PutBuildPartResult
-CloudCacheSession::PutBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- std::string_view PartName,
- const IoBuffer& Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
-
- IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
-
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName),
- Payload,
- {{"X-Jupiter-IoHash", Hash.ToHexString()}});
-
- PutBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::PutBuildPart"sv)};
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- Result.RawHash = Hash;
- }
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
-{
- HttpClient::Response Response =
- m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId),
- HttpClient::Accept(ZenContentType::kCbObject));
- return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildPart"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::PutBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload)
-{
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Upload(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- Payload,
- ContentType);
- return detail::ConvertResponse(Response, "CloudCacheSession::PutBuildBlob"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::GetBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- std::filesystem::path TempFolderPath)
-{
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Download(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- TempFolderPath);
- return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildBlob"sv);
-}
-
-CloudCacheResult
-CloudCacheSession::PutBlockMetadata(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- const IoBuffer& Payload)
-{
- ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
- Payload);
- return detail::ConvertResponse(Response, "CloudCacheSession::PutBlockMetadata"sv);
-}
-
-FinalizeBuildPartResult
-CloudCacheSession::FinalizeBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& RawHash)
-{
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()),
- HttpClient::Accept(ZenContentType::kCbObject));
-
- FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuildPart"sv)};
- if (Result.Success)
- {
- std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
- if (JsonError.empty())
- {
- json11::Json::array Needs = Json["needs"].array_items();
- for (const auto& Need : Needs)
- {
- Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
- }
- }
- }
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
-{
- HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(
- fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId),
- HttpClient::Accept(ZenContentType::kCbObject));
- return detail::ConvertResponse(Response, "CloudCacheSession::FindBlocks"sv);
-}
-
-/**
- * An access token provider that holds a token that will never change.
- */
-class StaticTokenProvider final : public CloudCacheTokenProvider
-{
-public:
- StaticTokenProvider(CloudCacheAccessToken Token) : m_Token(std::move(Token)) {}
-
- virtual ~StaticTokenProvider() = default;
-
- virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Token; }
-
-private:
- CloudCacheAccessToken m_Token;
-};
-
-std::unique_ptr<CloudCacheTokenProvider>
-CloudCacheTokenProvider::CreateFromStaticToken(CloudCacheAccessToken Token)
-{
- return std::make_unique<StaticTokenProvider>(std::move(Token));
-}
-
-class OAuthClientCredentialsTokenProvider final : public CloudCacheTokenProvider
-{
-public:
- OAuthClientCredentialsTokenProvider(const CloudCacheTokenProvider::OAuthClientCredentialsParams& Params)
- {
- m_Url = std::string(Params.Url);
- m_ClientId = std::string(Params.ClientId);
- m_ClientSecret = std::string(Params.ClientSecret);
- }
-
- virtual ~OAuthClientCredentialsTokenProvider() = default;
-
- virtual CloudCacheAccessToken AcquireAccessToken() final override
- {
- using namespace std::chrono;
-
- std::string Body =
- fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}", m_ClientId, m_ClientSecret);
-
- cpr::Response Response =
- cpr::Post(cpr::Url{m_Url}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{std::move(Body)});
-
- if (Response.error || Response.status_code != 200)
- {
- return {};
- }
-
- std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.text, JsonError);
-
- if (JsonError.empty() == false)
- {
- return {};
- }
-
- std::string Token = Json["access_token"].string_value();
- int64_t ExpiresInSeconds = static_cast<int64_t>(Json["expires_in"].int_value());
- CloudCacheAccessToken::TimePoint ExpireTime = CloudCacheAccessToken::Clock::now() + seconds(ExpiresInSeconds);
-
- return {.Value = fmt::format("Bearer {}", Token), .ExpireTime = ExpireTime};
- }
-
-private:
- std::string m_Url;
- std::string m_ClientId;
- std::string m_ClientSecret;
-};
-
-std::unique_ptr<CloudCacheTokenProvider>
-CloudCacheTokenProvider::CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params)
-{
- return std::make_unique<OAuthClientCredentialsTokenProvider>(Params);
-}
-
-class CallbackTokenProvider final : public CloudCacheTokenProvider
-{
-public:
- CallbackTokenProvider(std::function<CloudCacheAccessToken()>&& Callback) : m_Callback(std::move(Callback)) {}
-
- virtual ~CallbackTokenProvider() = default;
-
- virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Callback(); }
-
-private:
- std::function<CloudCacheAccessToken()> m_Callback;
-};
-
-std::unique_ptr<CloudCacheTokenProvider>
-CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback)
-{
- return std::make_unique<CallbackTokenProvider>(std::move(Callback));
-}
-
-static std::optional<std::function<HttpClientAccessToken()>>
-GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider)
-{
- if (TokenProvider == nullptr)
- {
- return {};
- }
- auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken {
- CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken();
- return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime};
- };
- return ProviderFunc;
-}
-
-CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider)
-: m_Log(zen::logging::Get("jupiter"))
-, m_DefaultDdcNamespace(Options.DdcNamespace)
-, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
-, m_ComputeCluster(Options.ComputeCluster)
-, m_TokenProvider(std::move(TokenProvider))
-, m_HttpClient(Options.ServiceUrl,
- HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout,
- .Timeout = Options.Timeout,
- .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()),
- .AssumeHttp2 = Options.AssumeHttp2,
- .AllowResume = Options.AllowResume,
- .RetryCount = Options.RetryCount})
-{
- ZEN_ASSERT(m_TokenProvider.get() != nullptr);
-}
-
-CloudCacheClient::~CloudCacheClient()
-{
-}
-
-} // namespace zen
diff --git a/src/zenutil/jupiter/jupiterclient.cpp b/src/zenutil/jupiter/jupiterclient.cpp
new file mode 100644
index 000000000..5e5da3750
--- /dev/null
+++ b/src/zenutil/jupiter/jupiterclient.cpp
@@ -0,0 +1,29 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/jupiter/jupiterclient.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+JupiterClient::JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider)
+: m_Log(zen::logging::Get("jupiter"sv))
+, m_DefaultDdcNamespace(Options.DdcNamespace)
+, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
+, m_ComputeCluster(Options.ComputeCluster)
+, m_TokenProvider(std::move(TokenProvider))
+, m_HttpClient(Options.ServiceUrl,
+ HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout,
+ .Timeout = Options.Timeout,
+ .AccessTokenProvider = std::move(TokenProvider),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = Options.AllowResume,
+ .RetryCount = Options.RetryCount})
+{
+}
+
+JupiterClient::~JupiterClient()
+{
+}
+
+} // namespace zen
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
new file mode 100644
index 000000000..f706a7efc
--- /dev/null
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -0,0 +1,505 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/jupiter/jupitersession.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/fmtutils.h>
+#include <zencore/trace.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+//#include <cpr/cpr.h>
+//#include <fmt/format.h>
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+using namespace std::literals;
+
+namespace zen {
+
+namespace detail {
+ JupiterResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
+ {
+ if (Response.Error)
+ {
+ return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = Response.Error.value().ErrorCode,
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Success = false};
+ }
+ if (!Response.IsSuccess())
+ {
+ return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = static_cast<int32_t>(Response.StatusCode),
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Success = false};
+ }
+ return {.Response = Response.ResponsePayload,
+ .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = 0,
+ .Success = true};
+ }
+} // namespace detail
+
+JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient) : m_Log(InLog), m_HttpClient(InHttpClient)
+{
+}
+
+JupiterSession::~JupiterSession()
+{
+}
+
+JupiterResult
+JupiterSession::Authenticate()
+{
+ bool OK = m_HttpClient.Authenticate();
+ return {.Success = OK};
+}
+
+JupiterResult
+JupiterSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetRef");
+
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), {HttpClient::Accept(RefType)});
+
+ return detail::ConvertResponse(Response, "JupiterSession::GetRef"sv);
+}
+
+JupiterResult
+JupiterSession::GetBlob(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetBlob");
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kBinary)});
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");
+
+ HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ TempFolderPath,
+ {HttpClient::Accept(ZenContentType::kCompressedBinary)});
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::GetInlineBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoHash& OutPayloadHash,
+ std::filesystem::path TempFolderPath)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");
+
+ HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ TempFolderPath,
+ {{"Accept", "application/x-jupiter-inline"}});
+
+ JupiterResult Result = detail::ConvertResponse(Response);
+
+ if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end())
+ {
+ const std::string& PayloadHashHeader = It->second;
+ if (PayloadHashHeader.length() == IoHash::StringLength)
+ {
+ OutPayloadHash = IoHash::FromHexString(PayloadHashHeader);
+ }
+ }
+
+ return Result;
+}
+
+JupiterResult
+JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetObject");
+
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ return detail::ConvertResponse(Response);
+}
+
+PutRefResult
+JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutRef");
+
+ Ref.SetContentType(RefType);
+
+ IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
+
+ HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ Ref,
+ {{"X-Jupiter-IoHash", Hash.ToHexString()}});
+
+ PutRefResult Result = {detail::ConvertResponse(Response)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ Result.RawHash = Hash;
+ }
+ return Result;
+}
+
+FinalizeRefResult
+JupiterSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
+{
+ ZEN_TRACE_CPU("JupiterClient::FinalizeRef");
+
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()),
+ {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
+
+ FinalizeRefResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutBlob");
+
+ HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
+
+ Blob.SetContentType(ZenContentType::kCompressedBinary);
+ HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
+
+ HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ Payload,
+ ZenContentType::kCompressedBinary);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutObject");
+
+ Object.SetContentType(ZenContentType::kCbObject);
+ HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object);
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterResult
+JupiterSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::RefExists");
+
+ HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()));
+
+ return detail::ConvertResponse(Response);
+}
+
+GetObjectReferencesResult
+JupiterSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");
+
+ HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ GetObjectReferencesResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ const CbObject ReferencesResponse = Response.AsObject();
+ for (auto& Item : ReferencesResponse["references"sv])
+ {
+ Result.References.insert(Item.AsHash());
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::BlobExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "blobs"sv, Key);
+}
+
+JupiterResult
+JupiterSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "compressed-blobs"sv, Key);
+}
+
+JupiterResult
+JupiterSession::ObjectExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "objects"sv, Key);
+}
+
+JupiterExistsResult
+JupiterSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "blobs"sv, Keys);
+}
+
+JupiterExistsResult
+JupiterSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys);
+}
+
+JupiterExistsResult
+JupiterSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "objects"sv, Keys);
+}
+
+std::vector<IoHash>
+JupiterSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
+{
+ // ExtendableStringBuilder<256> Uri;
+ // Uri << m_CacheClient->ServiceUrl();
+ // Uri << "/api/v1/s/" << Namespace;
+
+ ZEN_UNUSED(Namespace, BucketId, ChunkHashes);
+
+ return {};
+}
+
+JupiterResult
+JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
+
+ HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString()));
+
+ return detail::ConvertResponse(Response);
+}
+
+JupiterExistsResult
+JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys)
+{
+ ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
+
+ ExtendableStringBuilder<256> Body;
+ Body << "[";
+ for (const auto& Key : Keys)
+ {
+ Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\"";
+ }
+ Body << "]";
+ IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size());
+ Payload.SetContentType(ZenContentType::kJSON);
+
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), Payload, {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ JupiterExistsResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ const CbObject ExistsResponse = Response.AsObject();
+ for (auto& Item : ExistsResponse["needs"sv])
+ {
+ Result.Needs.insert(Item.AsHash());
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload);
+ return detail::ConvertResponse(Response, "JupiterSession::PutBuild"sv);
+}
+
+JupiterResult
+JupiterSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+{
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::GetBuild"sv);
+}
+
+JupiterResult
+JupiterSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+{
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId));
+ return detail::ConvertResponse(Response, "JupiterSession::FinalizeBuild"sv);
+}
+
+PutBuildPartResult
+JupiterSession::PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+
+ IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
+
+ HttpClient::Response Response =
+ m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName),
+ Payload,
+ {{"X-Jupiter-IoHash", Hash.ToHexString()}});
+
+ PutBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::PutBuildPart"sv)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ Result.RawHash = Hash;
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
+{
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::GetBuildPart"sv);
+}
+
+JupiterResult
+JupiterSession::PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload)
+{
+ HttpClient::Response Response = m_HttpClient.Upload(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ Payload,
+ ContentType);
+ return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv);
+}
+
+JupiterResult
+JupiterSession::GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath)
+{
+ HttpClient::Response Response = m_HttpClient.Download(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ TempFolderPath);
+ return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
+}
+
+JupiterResult
+JupiterSession::PutBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response = m_HttpClient.Put(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ Payload);
+ return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv);
+}
+
+FinalizeBuildPartResult
+JupiterSession::FinalizeBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& RawHash)
+{
+ HttpClient::Response Response = m_HttpClient.Post(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()),
+ HttpClient::Accept(ZenContentType::kCbObject));
+
+ FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::FinalizeBuildPart"sv)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+ return Result;
+}
+
+JupiterResult
+JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
+{
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv);
+}
+
+} // namespace zen