aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/jupiter.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-01-16 09:19:08 +0100
committerGitHub Enterprise <[email protected]>2025-01-16 09:19:08 +0100
commita5158f9fc806d506590dd9bf0e3282cb76c3ac4e (patch)
tree95a6dd46ad0520de4018e08ef6b3f409e25af3c3 /src/zenutil/jupiter.cpp
parent5.5.17 (diff)
downloadzen-a5158f9fc806d506590dd9bf0e3282cb76c3ac4e.tar.xz
zen-a5158f9fc806d506590dd9bf0e3282cb76c3ac4e.zip
move basicfile.h/cpp -> zencore (#273)
move jupiter.h/cpp -> zenutil move packageformat.h/.cpp -> zenhttp zenutil now depends on zenhttp instead of the inverse
Diffstat (limited to 'src/zenutil/jupiter.cpp')
-rw-r--r--src/zenutil/jupiter.cpp660
1 files changed, 660 insertions, 0 deletions
diff --git a/src/zenutil/jupiter.cpp b/src/zenutil/jupiter.cpp
new file mode 100644
index 000000000..df4af0c13
--- /dev/null
+++ b/src/zenutil/jupiter.cpp
@@ -0,0 +1,660 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/jupiter.h>
+
+#include <zencore/basicfile.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compositebuffer.h>
+#include <zencore/fmtutils.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/scopeguard.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
+#include <zenhttp/formatters.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+//#if ZEN_PLATFORM_WINDOWS
+//# pragma comment(lib, "Crypt32.lib")
+//# pragma comment(lib, "Wldap32.lib")
+//#endif
+
+#include <json11.hpp>
+
+using namespace std::literals;
+
+namespace zen {
+
+namespace detail {
+ CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
+ {
+ if (Response.Error)
+ {
+ return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = Response.Error.value().ErrorCode,
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Success = false};
+ }
+ if (!Response.IsSuccess())
+ {
+ return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = static_cast<int32_t>(Response.StatusCode),
+ .Reason = Response.ErrorMessage(ErrorPrefix),
+ .Success = false};
+ }
+ return {.Response = Response.ResponsePayload,
+ .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes),
+ .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes),
+ .ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = 0,
+ .Success = true};
+ }
+} // namespace detail
+
+CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient)
+{
+}
+
+CloudCacheSession::~CloudCacheSession()
+{
+}
+
+CloudCacheResult
+CloudCacheSession::Authenticate()
+{
+ bool OK = m_CacheClient->m_HttpClient.Authenticate();
+ return {.Success = OK};
+}
+
+CloudCacheResult
+CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetRef");
+
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ {HttpClient::Accept(RefType)});
+
+ return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv);
+}
+
+CloudCacheResult
+CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetBlob");
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kBinary)});
+
+ return detail::ConvertResponse(Response);
+}
+
+CloudCacheResult
+CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");
+
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ TempFolderPath,
+ {HttpClient::Accept(ZenContentType::kCompressedBinary)});
+
+ return detail::ConvertResponse(Response);
+}
+
+CloudCacheResult
+CloudCacheSession::GetInlineBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoHash& OutPayloadHash,
+ std::filesystem::path TempFolderPath)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");
+
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ TempFolderPath,
+ {{"Accept", "application/x-jupiter-inline"}});
+
+ CloudCacheResult Result = detail::ConvertResponse(Response);
+
+ if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end())
+ {
+ const std::string& PayloadHashHeader = It->second;
+ if (PayloadHashHeader.length() == IoHash::StringLength)
+ {
+ OutPayloadHash = IoHash::FromHexString(PayloadHashHeader);
+ }
+ }
+
+ return Result;
+}
+
+CloudCacheResult
+CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetObject");
+
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ return detail::ConvertResponse(Response);
+}
+
+PutRefResult
+CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutRef");
+
+ Ref.SetContentType(RefType);
+
+ IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
+
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ Ref,
+ {{"X-Jupiter-IoHash", Hash.ToHexString()}});
+
+ PutRefResult Result = {detail::ConvertResponse(Response)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ Result.RawHash = Hash;
+ }
+ return Result;
+}
+
+FinalizeRefResult
+CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
+{
+ ZEN_TRACE_CPU("JupiterClient::FinalizeRef");
+
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(
+ fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()),
+ {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
+
+ FinalizeRefResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+ return Result;
+}
+
+CloudCacheResult
+CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutBlob");
+
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
+
+ return detail::ConvertResponse(Response);
+}
+
+CloudCacheResult
+CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
+
+ Blob.SetContentType(ZenContentType::kCompressedBinary);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
+
+ return detail::ConvertResponse(Response);
+}
+
+CloudCacheResult
+CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
+
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ Payload,
+ ZenContentType::kCompressedBinary);
+
+ return detail::ConvertResponse(Response);
+}
+
+CloudCacheResult
+CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object)
+{
+ ZEN_TRACE_CPU("JupiterClient::PutObject");
+
+ Object.SetContentType(ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object);
+
+ return detail::ConvertResponse(Response);
+}
+
+CloudCacheResult
+CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::RefExists");
+
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()));
+
+ return detail::ConvertResponse(Response);
+}
+
+GetObjectReferencesResult
+CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");
+
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ GetObjectReferencesResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ const CbObject ReferencesResponse = Response.AsObject();
+ for (auto& Item : ReferencesResponse["references"sv])
+ {
+ Result.References.insert(Item.AsHash());
+ }
+ }
+ return Result;
+}
+
+CloudCacheResult
+CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "blobs"sv, Key);
+}
+
+CloudCacheResult
+CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "compressed-blobs"sv, Key);
+}
+
+CloudCacheResult
+CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key)
+{
+ return CacheTypeExists(Namespace, "objects"sv, Key);
+}
+
+CloudCacheExistsResult
+CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "blobs"sv, Keys);
+}
+
+CloudCacheExistsResult
+CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys);
+}
+
+CloudCacheExistsResult
+CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists(Namespace, "objects"sv, Keys);
+}
+
+std::vector<IoHash>
+CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
+{
+ // ExtendableStringBuilder<256> Uri;
+ // Uri << m_CacheClient->ServiceUrl();
+ // Uri << "/api/v1/s/" << Namespace;
+
+ ZEN_UNUSED(Namespace, BucketId, ChunkHashes);
+
+ return {};
+}
+
+CloudCacheResult
+CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
+{
+ ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
+
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString()));
+
+ return detail::ConvertResponse(Response);
+}
+
+CloudCacheExistsResult
+CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys)
+{
+ ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
+
+ ExtendableStringBuilder<256> Body;
+ Body << "[";
+ for (const auto& Key : Keys)
+ {
+ Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\"";
+ }
+ Body << "]";
+ IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size());
+ Payload.SetContentType(ZenContentType::kJSON);
+
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace),
+ Payload,
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+
+ CloudCacheExistsResult Result = {detail::ConvertResponse(Response)};
+
+ if (Result.Success)
+ {
+ const CbObject ExistsResponse = Response.AsObject();
+ for (auto& Item : ExistsResponse["needs"sv])
+ {
+ Result.Needs.insert(Item.AsHash());
+ }
+ }
+ return Result;
+}
+
+CloudCacheResult
+CloudCacheSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload);
+ return detail::ConvertResponse(Response, "CloudCacheSession::PutBuild"sv);
+}
+
+CloudCacheResult
+CloudCacheSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+{
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "CloudCacheSession::GetBuild"sv);
+}
+
+CloudCacheResult
+CloudCacheSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+{
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId));
+ return detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuild"sv);
+}
+
+PutBuildPartResult
+CloudCacheSession::PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+
+ IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size());
+
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName),
+ Payload,
+ {{"X-Jupiter-IoHash", Hash.ToHexString()}});
+
+ PutBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::PutBuildPart"sv)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ Result.RawHash = Hash;
+ }
+ return Result;
+}
+
+CloudCacheResult
+CloudCacheSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
+{
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildPart"sv);
+}
+
+CloudCacheResult
+CloudCacheSession::PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload)
+{
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Upload(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ Payload,
+ ContentType);
+ return detail::ConvertResponse(Response, "CloudCacheSession::PutBuildBlob"sv);
+}
+
+CloudCacheResult
+CloudCacheSession::GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath)
+{
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Download(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ TempFolderPath);
+ return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildBlob"sv);
+}
+
+CloudCacheResult
+CloudCacheSession::PutBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
+ Payload);
+ return detail::ConvertResponse(Response, "CloudCacheSession::PutBlockMetadata"sv);
+}
+
+FinalizeBuildPartResult
+CloudCacheSession::FinalizeBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& RawHash)
+{
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()),
+ HttpClient::Accept(ZenContentType::kCbObject));
+
+ FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuildPart"sv)};
+ if (Result.Success)
+ {
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
+ if (JsonError.empty())
+ {
+ json11::Json::array Needs = Json["needs"].array_items();
+ for (const auto& Need : Needs)
+ {
+ Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value()));
+ }
+ }
+ }
+ return Result;
+}
+
+CloudCacheResult
+CloudCacheSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId)
+{
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "CloudCacheSession::FindBlocks"sv);
+}
+
+/**
+ * An access token provider that holds a token that will never change.
+ */
+class StaticTokenProvider final : public CloudCacheTokenProvider
+{
+public:
+ StaticTokenProvider(CloudCacheAccessToken Token) : m_Token(std::move(Token)) {}
+
+ virtual ~StaticTokenProvider() = default;
+
+ virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Token; }
+
+private:
+ CloudCacheAccessToken m_Token;
+};
+
+std::unique_ptr<CloudCacheTokenProvider>
+CloudCacheTokenProvider::CreateFromStaticToken(CloudCacheAccessToken Token)
+{
+ return std::make_unique<StaticTokenProvider>(std::move(Token));
+}
+
+class OAuthClientCredentialsTokenProvider final : public CloudCacheTokenProvider
+{
+public:
+ OAuthClientCredentialsTokenProvider(const CloudCacheTokenProvider::OAuthClientCredentialsParams& Params)
+ {
+ m_Url = std::string(Params.Url);
+ m_ClientId = std::string(Params.ClientId);
+ m_ClientSecret = std::string(Params.ClientSecret);
+ }
+
+ virtual ~OAuthClientCredentialsTokenProvider() = default;
+
+ virtual CloudCacheAccessToken AcquireAccessToken() final override
+ {
+ using namespace std::chrono;
+
+ std::string Body =
+ fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}", m_ClientId, m_ClientSecret);
+
+ cpr::Response Response =
+ cpr::Post(cpr::Url{m_Url}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{std::move(Body)});
+
+ if (Response.error || Response.status_code != 200)
+ {
+ return {};
+ }
+
+ std::string JsonError;
+ json11::Json Json = json11::Json::parse(Response.text, JsonError);
+
+ if (JsonError.empty() == false)
+ {
+ return {};
+ }
+
+ std::string Token = Json["access_token"].string_value();
+ int64_t ExpiresInSeconds = static_cast<int64_t>(Json["expires_in"].int_value());
+ CloudCacheAccessToken::TimePoint ExpireTime = CloudCacheAccessToken::Clock::now() + seconds(ExpiresInSeconds);
+
+ return {.Value = fmt::format("Bearer {}", Token), .ExpireTime = ExpireTime};
+ }
+
+private:
+ std::string m_Url;
+ std::string m_ClientId;
+ std::string m_ClientSecret;
+};
+
+std::unique_ptr<CloudCacheTokenProvider>
+CloudCacheTokenProvider::CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params)
+{
+ return std::make_unique<OAuthClientCredentialsTokenProvider>(Params);
+}
+
+class CallbackTokenProvider final : public CloudCacheTokenProvider
+{
+public:
+ CallbackTokenProvider(std::function<CloudCacheAccessToken()>&& Callback) : m_Callback(std::move(Callback)) {}
+
+ virtual ~CallbackTokenProvider() = default;
+
+ virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Callback(); }
+
+private:
+ std::function<CloudCacheAccessToken()> m_Callback;
+};
+
+std::unique_ptr<CloudCacheTokenProvider>
+CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback)
+{
+ return std::make_unique<CallbackTokenProvider>(std::move(Callback));
+}
+
+static std::optional<std::function<HttpClientAccessToken()>>
+GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider)
+{
+ if (TokenProvider == nullptr)
+ {
+ return {};
+ }
+ auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken {
+ CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken();
+ return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime};
+ };
+ return ProviderFunc;
+}
+
+CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider)
+: m_Log(zen::logging::Get("jupiter"))
+, m_DefaultDdcNamespace(Options.DdcNamespace)
+, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
+, m_ComputeCluster(Options.ComputeCluster)
+, m_TokenProvider(std::move(TokenProvider))
+, m_HttpClient(Options.ServiceUrl,
+ HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout,
+ .Timeout = Options.Timeout,
+ .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = Options.AllowResume,
+ .RetryCount = Options.RetryCount})
+{
+ ZEN_ASSERT(m_TokenProvider.get() != nullptr);
+}
+
+CloudCacheClient::~CloudCacheClient()
+{
+}
+
+} // namespace zen