diff options
| author | Dan Engelbrecht <[email protected]> | 2025-01-16 09:19:08 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-01-16 09:19:08 +0100 |
| commit | a5158f9fc806d506590dd9bf0e3282cb76c3ac4e (patch) | |
| tree | 95a6dd46ad0520de4018e08ef6b3f409e25af3c3 /src/zenutil/jupiter.cpp | |
| parent | 5.5.17 (diff) | |
| download | zen-a5158f9fc806d506590dd9bf0e3282cb76c3ac4e.tar.xz zen-a5158f9fc806d506590dd9bf0e3282cb76c3ac4e.zip | |
move basicfile.h/cpp -> zencore (#273)
move jupiter.h/cpp -> zenutil
move packageformat.h/.cpp -> zenhttp
zenutil now depends on zenhttp instead of the inverse
Diffstat (limited to 'src/zenutil/jupiter.cpp')
| -rw-r--r-- | src/zenutil/jupiter.cpp | 660 |
1 files changed, 660 insertions, 0 deletions
diff --git a/src/zenutil/jupiter.cpp b/src/zenutil/jupiter.cpp new file mode 100644 index 000000000..df4af0c13 --- /dev/null +++ b/src/zenutil/jupiter.cpp @@ -0,0 +1,660 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/jupiter.h> + +#include <zencore/basicfile.h> +#include <zencore/compactbinary.h> +#include <zencore/compositebuffer.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/scopeguard.h> +#include <zencore/thread.h> +#include <zencore/trace.h> +#include <zenhttp/formatters.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +ZEN_THIRD_PARTY_INCLUDES_END + +//#if ZEN_PLATFORM_WINDOWS +//# pragma comment(lib, "Crypt32.lib") +//# pragma comment(lib, "Wldap32.lib") +//#endif + +#include <json11.hpp> + +using namespace std::literals; + +namespace zen { + +namespace detail { + CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) + { + if (Response.Error) + { + return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = Response.Error.value().ErrorCode, + .Reason = Response.ErrorMessage(ErrorPrefix), + .Success = false}; + } + if (!Response.IsSuccess()) + { + return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = static_cast<int32_t>(Response.StatusCode), + .Reason = Response.ErrorMessage(ErrorPrefix), + .Success = false}; + } + return {.Response = Response.ResponsePayload, + .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = 0, + .Success = true}; + } +} // namespace detail + +CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) +{ +} + +CloudCacheSession::~CloudCacheSession() +{ +} + +CloudCacheResult +CloudCacheSession::Authenticate() +{ + bool OK = m_CacheClient->m_HttpClient.Authenticate(); + return {.Success = OK}; +} + +CloudCacheResult +CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) +{ + ZEN_TRACE_CPU("JupiterClient::GetRef"); + + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + {HttpClient::Accept(RefType)}); + + return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv); +} + +CloudCacheResult +CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetBlob"); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kBinary)}); + + return detail::ConvertResponse(Response); +} + +CloudCacheResult +CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) +{ + ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); + + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + TempFolderPath, + {HttpClient::Accept(ZenContentType::kCompressedBinary)}); + + return detail::ConvertResponse(Response); +} + +CloudCacheResult +CloudCacheSession::GetInlineBlob(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoHash& OutPayloadHash, + std::filesystem::path TempFolderPath) +{ + ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); + + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + TempFolderPath, + {{"Accept", "application/x-jupiter-inline"}}); + + CloudCacheResult Result = detail::ConvertResponse(Response); + + if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) + { + const std::string& PayloadHashHeader = It->second; + if (PayloadHashHeader.length() == IoHash::StringLength) + { + OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); + } + } + + return Result; +} + +CloudCacheResult +CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetObject"); + + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); + + return detail::ConvertResponse(Response); +} + +PutRefResult +CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) +{ + ZEN_TRACE_CPU("JupiterClient::PutRef"); + + Ref.SetContentType(RefType); + + IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); + + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + Ref, + {{"X-Jupiter-IoHash", Hash.ToHexString()}}); + + PutRefResult Result = {detail::ConvertResponse(Response)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + Result.RawHash = Hash; + } + return Result; +} + +FinalizeRefResult +CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +{ + ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); + + HttpClient::Response Response = m_CacheClient->m_HttpClient.Post( + fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), + {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + + FinalizeRefResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + return Result; +} + +CloudCacheResult +CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) +{ + ZEN_TRACE_CPU("JupiterClient::PutBlob"); + + HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); + + return detail::ConvertResponse(Response); +} + +CloudCacheResult +CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) +{ + ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); + + Blob.SetContentType(ZenContentType::kCompressedBinary); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); + + return detail::ConvertResponse(Response); +} + +CloudCacheResult +CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) +{ + ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); + + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + Payload, + ZenContentType::kCompressedBinary); + + return detail::ConvertResponse(Response); +} + +CloudCacheResult +CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) +{ + ZEN_TRACE_CPU("JupiterClient::PutObject"); + + Object.SetContentType(ZenContentType::kCbObject); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); + + return detail::ConvertResponse(Response); +} + +CloudCacheResult +CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::RefExists"); + + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); + + return detail::ConvertResponse(Response); +} + +GetObjectReferencesResult +CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); + + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); + + GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + const CbObject ReferencesResponse = Response.AsObject(); + for (auto& Item : ReferencesResponse["references"sv]) + { + Result.References.insert(Item.AsHash()); + } + } + return Result; +} + +CloudCacheResult +CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "blobs"sv, Key); +} + +CloudCacheResult +CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); +} + +CloudCacheResult +CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "objects"sv, Key); +} + +CloudCacheExistsResult +CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "blobs"sv, Keys); +} + +CloudCacheExistsResult +CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); +} + +CloudCacheExistsResult +CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "objects"sv, Keys); +} + +std::vector<IoHash> +CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) +{ + // ExtendableStringBuilder<256> Uri; + // Uri << m_CacheClient->ServiceUrl(); + // Uri << "/api/v1/s/" << Namespace; + + ZEN_UNUSED(Namespace, BucketId, ChunkHashes); + + return {}; +} + +CloudCacheResult +CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); + + HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); + + return detail::ConvertResponse(Response); +} + +CloudCacheExistsResult +CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) +{ + ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); + + ExtendableStringBuilder<256> Body; + Body << "["; + for (const auto& Key : Keys) + { + Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; + } + Body << "]"; + IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); + Payload.SetContentType(ZenContentType::kJSON); + + HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), + Payload, + {HttpClient::Accept(ZenContentType::kCbObject)}); + + CloudCacheExistsResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + const CbObject ExistsResponse = Response.AsObject(); + for (auto& Item : ExistsResponse["needs"sv]) + { + Result.Needs.insert(Item.AsHash()); + } + } + return Result; +} + +CloudCacheResult +CloudCacheSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload); + return detail::ConvertResponse(Response, "CloudCacheSession::PutBuild"sv); +} + +CloudCacheResult +CloudCacheSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +{ + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "CloudCacheSession::GetBuild"sv); +} + +CloudCacheResult +CloudCacheSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +{ + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId)); + return detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuild"sv); +} + +PutBuildPartResult +CloudCacheSession::PutBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + std::string_view PartName, + const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + + IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); + + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName), + Payload, + {{"X-Jupiter-IoHash", Hash.ToHexString()}}); + + PutBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::PutBuildPart"sv)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + Result.RawHash = Hash; + } + return Result; +} + +CloudCacheResult +CloudCacheSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) +{ + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildPart"sv); +} + +CloudCacheResult +CloudCacheSession::PutBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + ZenContentType ContentType, + const CompositeBuffer& Payload) +{ + HttpClient::Response Response = m_CacheClient->m_HttpClient.Upload( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + Payload, + ContentType); + return detail::ConvertResponse(Response, "CloudCacheSession::PutBuildBlob"sv); +} + +CloudCacheResult +CloudCacheSession::GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + std::filesystem::path TempFolderPath) +{ + HttpClient::Response Response = m_CacheClient->m_HttpClient.Download( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + TempFolderPath); + return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildBlob"sv); +} + +CloudCacheResult +CloudCacheSession::PutBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Put( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + Payload); + return detail::ConvertResponse(Response, "CloudCacheSession::PutBlockMetadata"sv); +} + +FinalizeBuildPartResult +CloudCacheSession::FinalizeBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& RawHash) +{ + HttpClient::Response Response = m_CacheClient->m_HttpClient.Post( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()), + HttpClient::Accept(ZenContentType::kCbObject)); + + FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuildPart"sv)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + return Result; +} + +CloudCacheResult +CloudCacheSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) +{ + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "CloudCacheSession::FindBlocks"sv); +} + +/** + * An access token provider that holds a token that will never change. + */ +class StaticTokenProvider final : public CloudCacheTokenProvider +{ +public: + StaticTokenProvider(CloudCacheAccessToken Token) : m_Token(std::move(Token)) {} + + virtual ~StaticTokenProvider() = default; + + virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Token; } + +private: + CloudCacheAccessToken m_Token; +}; + +std::unique_ptr<CloudCacheTokenProvider> +CloudCacheTokenProvider::CreateFromStaticToken(CloudCacheAccessToken Token) +{ + return std::make_unique<StaticTokenProvider>(std::move(Token)); +} + +class OAuthClientCredentialsTokenProvider final : public CloudCacheTokenProvider +{ +public: + OAuthClientCredentialsTokenProvider(const CloudCacheTokenProvider::OAuthClientCredentialsParams& Params) + { + m_Url = std::string(Params.Url); + m_ClientId = std::string(Params.ClientId); + m_ClientSecret = std::string(Params.ClientSecret); + } + + virtual ~OAuthClientCredentialsTokenProvider() = default; + + virtual CloudCacheAccessToken AcquireAccessToken() final override + { + using namespace std::chrono; + + std::string Body = + fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}", m_ClientId, m_ClientSecret); + + cpr::Response Response = + cpr::Post(cpr::Url{m_Url}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{std::move(Body)}); + + if (Response.error || Response.status_code != 200) + { + return {}; + } + + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + + if (JsonError.empty() == false) + { + return {}; + } + + std::string Token = Json["access_token"].string_value(); + int64_t ExpiresInSeconds = static_cast<int64_t>(Json["expires_in"].int_value()); + CloudCacheAccessToken::TimePoint ExpireTime = CloudCacheAccessToken::Clock::now() + seconds(ExpiresInSeconds); + + return {.Value = fmt::format("Bearer {}", Token), .ExpireTime = ExpireTime}; + } + +private: + std::string m_Url; + std::string m_ClientId; + std::string m_ClientSecret; +}; + +std::unique_ptr<CloudCacheTokenProvider> +CloudCacheTokenProvider::CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params) +{ + return std::make_unique<OAuthClientCredentialsTokenProvider>(Params); +} + +class CallbackTokenProvider final : public CloudCacheTokenProvider +{ +public: + CallbackTokenProvider(std::function<CloudCacheAccessToken()>&& Callback) : m_Callback(std::move(Callback)) {} + + virtual ~CallbackTokenProvider() = default; + + virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Callback(); } + +private: + std::function<CloudCacheAccessToken()> m_Callback; +}; + +std::unique_ptr<CloudCacheTokenProvider> +CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback) +{ + return std::make_unique<CallbackTokenProvider>(std::move(Callback)); +} + +static std::optional<std::function<HttpClientAccessToken()>> +GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider) +{ + if (TokenProvider == nullptr) + { + return {}; + } + auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken { + CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken(); + return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime}; + }; + return ProviderFunc; +} + +CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider) +: m_Log(zen::logging::Get("jupiter")) +, m_DefaultDdcNamespace(Options.DdcNamespace) +, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) +, m_ComputeCluster(Options.ComputeCluster) +, m_TokenProvider(std::move(TokenProvider)) +, m_HttpClient(Options.ServiceUrl, + HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, + .Timeout = Options.Timeout, + .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = Options.AllowResume, + .RetryCount = Options.RetryCount}) +{ + ZEN_ASSERT(m_TokenProvider.get() != nullptr); +} + +CloudCacheClient::~CloudCacheClient() +{ +} + +} // namespace zen |