diff options
| author | Stefan Boberg <[email protected]> | 2025-01-29 15:08:03 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2025-01-29 15:08:03 +0100 |
| commit | e64c8727ecb073ca03e2c7d4b3972c375c1b6315 (patch) | |
| tree | 04a1a7c178c43666de7f7f9b472ed156f6373da5 /src/zenserver/upstream/jupiter.cpp | |
| parent | Merge branch 'main' of https://github.ol.epicgames.net/ue-foundation/zen (diff) | |
| parent | handle special backslash followed by quote for paths (#279) (diff) | |
| download | zen-sb/cleanup-main.tar.xz zen-sb/cleanup-main.zip | |
Merge branch 'main' of https://github.ol.epicgames.net/ue-foundation/zensb/cleanup-main
Diffstat (limited to 'src/zenserver/upstream/jupiter.cpp')
| -rw-r--r-- | src/zenserver/upstream/jupiter.cpp | 662 |
1 files changed, 0 insertions, 662 deletions
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp deleted file mode 100644 index 2ae977f00..000000000 --- a/src/zenserver/upstream/jupiter.cpp +++ /dev/null @@ -1,662 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "jupiter.h" - -#include "diag/logging.h" - -#include <zencore/compactbinary.h> -#include <zencore/compositebuffer.h> -#include <zencore/fmtutils.h> -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> -#include <zencore/scopeguard.h> -#include <zencore/thread.h> -#include <zencore/trace.h> -#include <zenhttp/formatters.h> -#include <zenutil/basicfile.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <fmt/format.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#if ZEN_PLATFORM_WINDOWS -# pragma comment(lib, "Crypt32.lib") -# pragma comment(lib, "Wldap32.lib") -#endif - -#include <json11.hpp> - -using namespace std::literals; - -namespace zen { - -namespace detail { - CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) - { - if (Response.Error) - { - return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = Response.Error.value().ErrorCode, - .Reason = Response.ErrorMessage(ErrorPrefix), - .Success = false}; - } - if (!Response.IsSuccess()) - { - return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = static_cast<int32_t>(Response.StatusCode), - .Reason = Response.ErrorMessage(ErrorPrefix), - .Success = false}; - } - return {.Response = Response.ResponsePayload, - .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = 0, - .Success = true}; - } -} // namespace detail - -CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) -{ -} - -CloudCacheSession::~CloudCacheSession() -{ -} - -CloudCacheResult -CloudCacheSession::Authenticate() -{ - bool OK = m_CacheClient->m_HttpClient.Authenticate(); - return {.Success = OK}; -} - -CloudCacheResult -CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) -{ - ZEN_TRACE_CPU("JupiterClient::GetRef"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - {HttpClient::Accept(RefType)}); - - return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv); -} - -CloudCacheResult -CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetBlob"); - HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), - {HttpClient::Accept(ZenContentType::kBinary)}); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) -{ - ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), - TempFolderPath, - {HttpClient::Accept(ZenContentType::kCompressedBinary)}); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::GetInlineBlob(std::string_view Namespace, - std::string_view BucketId, - const IoHash& Key, - IoHash& OutPayloadHash, - std::filesystem::path TempFolderPath) -{ - ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - TempFolderPath, - {{"Accept", "application/x-jupiter-inline"}}); - - CloudCacheResult Result = detail::ConvertResponse(Response); - - if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) - { - const std::string& PayloadHashHeader = It->second; - if (PayloadHashHeader.length() == IoHash::StringLength) - { - OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); - } - } - - return Result; -} - -CloudCacheResult -CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetObject"); - - HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), - {HttpClient::Accept(ZenContentType::kCbObject)}); - - return detail::ConvertResponse(Response); -} - -PutRefResult -CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) -{ - ZEN_TRACE_CPU("JupiterClient::PutRef"); - - Ref.SetContentType(RefType); - - IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - Ref, - {{"X-Jupiter-IoHash", Hash.ToHexString()}}); - - PutRefResult Result = {detail::ConvertResponse(Response)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - Result.RawHash = Hash; - } - return Result; -} - -FinalizeRefResult -CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) -{ - ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); - - HttpClient::Response Response = m_CacheClient->m_HttpClient.Post( - fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), - {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); - - FinalizeRefResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - } - return Result; -} - -CloudCacheResult -CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) -{ - ZEN_TRACE_CPU("JupiterClient::PutBlob"); - - HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) -{ - ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - - Blob.SetContentType(ZenContentType::kCompressedBinary); - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) -{ - ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), - Payload, - ZenContentType::kCompressedBinary); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) -{ - ZEN_TRACE_CPU("JupiterClient::PutObject"); - - Object.SetContentType(ZenContentType::kCbObject); - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::RefExists"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); - - return detail::ConvertResponse(Response); -} - -GetObjectReferencesResult -CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), - {HttpClient::Accept(ZenContentType::kCbObject)}); - - GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - const CbObject ReferencesResponse = Response.AsObject(); - for (auto& Item : ReferencesResponse["references"sv]) - { - Result.References.insert(Item.AsHash()); - } - } - return Result; -} - -CloudCacheResult -CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "blobs"sv, Key); -} - -CloudCacheResult -CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); -} - -CloudCacheResult -CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "objects"sv, Key); -} - -CloudCacheExistsResult -CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "blobs"sv, Keys); -} - -CloudCacheExistsResult -CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); -} - -CloudCacheExistsResult -CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "objects"sv, Keys); -} - -std::vector<IoHash> -CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) -{ - // ExtendableStringBuilder<256> Uri; - // Uri << m_CacheClient->ServiceUrl(); - // Uri << "/api/v1/s/" << Namespace; - - ZEN_UNUSED(Namespace, BucketId, ChunkHashes); - - return {}; -} - -CloudCacheResult -CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); - - HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); - - return detail::ConvertResponse(Response); -} - -CloudCacheExistsResult -CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) -{ - ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); - - ExtendableStringBuilder<256> Body; - Body << "["; - for (const auto& Key : Keys) - { - Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; - } - Body << "]"; - IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); - Payload.SetContentType(ZenContentType::kJSON); - - HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), - Payload, - {HttpClient::Accept(ZenContentType::kCbObject)}); - - CloudCacheExistsResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - const CbObject ExistsResponse = Response.AsObject(); - for (auto& Item : ExistsResponse["needs"sv]) - { - Result.Needs.insert(Item.AsHash()); - } - } - return Result; -} - -CloudCacheResult -CloudCacheSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload); - return detail::ConvertResponse(Response, "CloudCacheSession::PutBuild"sv); -} - -CloudCacheResult -CloudCacheSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) -{ - HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "CloudCacheSession::GetBuild"sv); -} - -CloudCacheResult -CloudCacheSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) -{ - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId)); - return detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuild"sv); -} - -PutBuildPartResult -CloudCacheSession::PutBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - std::string_view PartName, - const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - - IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName), - Payload, - {{"X-Jupiter-IoHash", Hash.ToHexString()}}); - - PutBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::PutBuildPart"sv)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - Result.RawHash = Hash; - } - return Result; -} - -CloudCacheResult -CloudCacheSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) -{ - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildPart"sv); -} - -CloudCacheResult -CloudCacheSession::PutBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& Hash, - ZenContentType ContentType, - const CompositeBuffer& Payload) -{ - HttpClient::Response Response = m_CacheClient->m_HttpClient.Upload( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - Payload, - ContentType); - return detail::ConvertResponse(Response, "CloudCacheSession::PutBuildBlob"sv); -} - -CloudCacheResult -CloudCacheSession::GetBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& Hash, - std::filesystem::path TempFolderPath) -{ - HttpClient::Response Response = m_CacheClient->m_HttpClient.Download( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - TempFolderPath); - return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildBlob"sv); -} - -CloudCacheResult -CloudCacheSession::PutBlockMetadata(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& Hash, - const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = m_CacheClient->m_HttpClient.Put( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - Payload); - return detail::ConvertResponse(Response, "CloudCacheSession::PutBlockMetadata"sv); -} - -FinalizeBuildPartResult -CloudCacheSession::FinalizeBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& RawHash) -{ - HttpClient::Response Response = m_CacheClient->m_HttpClient.Post( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()), - HttpClient::Accept(ZenContentType::kCbObject)); - - FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuildPart"sv)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - } - return Result; -} - -CloudCacheResult -CloudCacheSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) -{ - HttpClient::Response Response = m_CacheClient->m_HttpClient.Get( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "CloudCacheSession::FindBlocks"sv); -} - -/** - * An access token provider that holds a token that will never change. - */ -class StaticTokenProvider final : public CloudCacheTokenProvider -{ -public: - StaticTokenProvider(CloudCacheAccessToken Token) : m_Token(std::move(Token)) {} - - virtual ~StaticTokenProvider() = default; - - virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Token; } - -private: - CloudCacheAccessToken m_Token; -}; - -std::unique_ptr<CloudCacheTokenProvider> -CloudCacheTokenProvider::CreateFromStaticToken(CloudCacheAccessToken Token) -{ - return std::make_unique<StaticTokenProvider>(std::move(Token)); -} - -class OAuthClientCredentialsTokenProvider final : public CloudCacheTokenProvider -{ -public: - OAuthClientCredentialsTokenProvider(const CloudCacheTokenProvider::OAuthClientCredentialsParams& Params) - { - m_Url = std::string(Params.Url); - m_ClientId = std::string(Params.ClientId); - m_ClientSecret = std::string(Params.ClientSecret); - } - - virtual ~OAuthClientCredentialsTokenProvider() = default; - - virtual CloudCacheAccessToken AcquireAccessToken() final override - { - using namespace std::chrono; - - std::string Body = - fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}", m_ClientId, m_ClientSecret); - - cpr::Response Response = - cpr::Post(cpr::Url{m_Url}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{std::move(Body)}); - - if (Response.error || Response.status_code != 200) - { - return {}; - } - - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.text, JsonError); - - if (JsonError.empty() == false) - { - return {}; - } - - std::string Token = Json["access_token"].string_value(); - int64_t ExpiresInSeconds = static_cast<int64_t>(Json["expires_in"].int_value()); - CloudCacheAccessToken::TimePoint ExpireTime = CloudCacheAccessToken::Clock::now() + seconds(ExpiresInSeconds); - - return {.Value = fmt::format("Bearer {}", Token), .ExpireTime = ExpireTime}; - } - -private: - std::string m_Url; - std::string m_ClientId; - std::string m_ClientSecret; -}; - -std::unique_ptr<CloudCacheTokenProvider> -CloudCacheTokenProvider::CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params) -{ - return std::make_unique<OAuthClientCredentialsTokenProvider>(Params); -} - -class CallbackTokenProvider final : public CloudCacheTokenProvider -{ -public: - CallbackTokenProvider(std::function<CloudCacheAccessToken()>&& Callback) : m_Callback(std::move(Callback)) {} - - virtual ~CallbackTokenProvider() = default; - - virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Callback(); } - -private: - std::function<CloudCacheAccessToken()> m_Callback; -}; - -std::unique_ptr<CloudCacheTokenProvider> -CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback) -{ - return std::make_unique<CallbackTokenProvider>(std::move(Callback)); -} - -static std::optional<std::function<HttpClientAccessToken()>> -GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider) -{ - if (TokenProvider == nullptr) - { - return {}; - } - auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken { - CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken(); - return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime}; - }; - return ProviderFunc; -} - -CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider) -: m_Log(zen::logging::Get("jupiter")) -, m_DefaultDdcNamespace(Options.DdcNamespace) -, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) -, m_ComputeCluster(Options.ComputeCluster) -, m_TokenProvider(std::move(TokenProvider)) -, m_HttpClient(Options.ServiceUrl, - HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, - .Timeout = Options.Timeout, - .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()), - .AssumeHttp2 = Options.AssumeHttp2, - .AllowResume = Options.AllowResume, - .RetryCount = Options.RetryCount}) -{ - ZEN_ASSERT(m_TokenProvider.get() != nullptr); -} - -CloudCacheClient::~CloudCacheClient() -{ -} - -} // namespace zen |