// Copyright Epic Games, Inc. All Rights Reserved. #include "jupiter.h" #include "diag/logging.h" #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_PLATFORM_WINDOWS # pragma comment(lib, "Crypt32.lib") # pragma comment(lib, "Wldap32.lib") #endif #include using namespace std::literals; namespace zen { namespace detail { CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) { if (Response.Error) { return {.SentBytes = gsl::narrow(Response.UploadedBytes), .ReceivedBytes = gsl::narrow(Response.DownloadedBytes), .ElapsedSeconds = Response.ElapsedSeconds, .ErrorCode = Response.Error.value().ErrorCode, .Reason = Response.ErrorMessage(ErrorPrefix), .Success = false}; } if (!Response.IsSuccess()) { return {.SentBytes = gsl::narrow(Response.UploadedBytes), .ReceivedBytes = gsl::narrow(Response.DownloadedBytes), .ElapsedSeconds = Response.ElapsedSeconds, .ErrorCode = static_cast(Response.StatusCode), .Reason = Response.ErrorMessage(ErrorPrefix), .Success = false}; } return {.Response = Response.ResponsePayload, .SentBytes = gsl::narrow(Response.UploadedBytes), .ReceivedBytes = gsl::narrow(Response.DownloadedBytes), .ElapsedSeconds = Response.ElapsedSeconds, .ErrorCode = 0, .Success = true}; } } // namespace detail CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) { } CloudCacheSession::~CloudCacheSession() { } CloudCacheResult CloudCacheSession::Authenticate() { bool OK = m_CacheClient->m_HttpClient.Authenticate(); return {.Success = OK}; } CloudCacheResult CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) { ZEN_TRACE_CPU("JupiterClient::GetRef"); HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), {HttpClient::Accept(RefType)}); return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv); } CloudCacheResult CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::GetBlob"); HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kBinary)}); return detail::ConvertResponse(Response); } CloudCacheResult CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) { ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); HttpClient::Response Response = m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), TempFolderPath, {HttpClient::Accept(ZenContentType::kCompressedBinary)}); return detail::ConvertResponse(Response); } CloudCacheResult CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash, std::filesystem::path TempFolderPath) { ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); HttpClient::Response Response = m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), TempFolderPath, {{"Accept", "application/x-jupiter-inline"}}); CloudCacheResult Result = detail::ConvertResponse(Response); if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) { const std::string& PayloadHashHeader = It->second; if (PayloadHashHeader.length() == IoHash::StringLength) { OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); } } return Result; } CloudCacheResult CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::GetObject"); HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kCbObject)}); return detail::ConvertResponse(Response); } PutRefResult CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { ZEN_TRACE_CPU("JupiterClient::PutRef"); Ref.SetContentType(RefType); IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), Ref, {{"X-Jupiter-IoHash", Hash.ToHexString()}}); PutRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); for (const auto& Need : Needs) { Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); } } Result.RawHash = Hash; } return Result; } FinalizeRefResult CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) { ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); HttpClient::Response Response = m_CacheClient->m_HttpClient.Post( fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); FinalizeRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); for (const auto& Need : Needs) { Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); } } } return Result; } CloudCacheResult CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("JupiterClient::PutBlob"); HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); return detail::ConvertResponse(Response); } CloudCacheResult CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); Blob.SetContentType(ZenContentType::kCompressedBinary); HttpClient::Response Response = m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); return detail::ConvertResponse(Response); } CloudCacheResult CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); HttpClient::Response Response = m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Payload, ZenContentType::kCompressedBinary); return detail::ConvertResponse(Response); } CloudCacheResult CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) { ZEN_TRACE_CPU("JupiterClient::PutObject"); Object.SetContentType(ZenContentType::kCbObject); HttpClient::Response Response = m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); return detail::ConvertResponse(Response); } CloudCacheResult CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::RefExists"); HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); return detail::ConvertResponse(Response); } GetObjectReferencesResult CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kCbObject)}); GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { const CbObject ReferencesResponse = Response.AsObject(); for (auto& Item : ReferencesResponse["references"sv]) { Result.References.insert(Item.AsHash()); } } return Result; } CloudCacheResult CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key) { return CacheTypeExists(Namespace, "blobs"sv, Key); } CloudCacheResult CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) { return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); } CloudCacheResult CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key) { return CacheTypeExists(Namespace, "objects"sv, Key); } CloudCacheExistsResult CloudCacheSession::BlobExists(std::string_view Namespace, const std::set& Keys) { return CacheTypeExists(Namespace, "blobs"sv, Keys); } CloudCacheExistsResult CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set& Keys) { return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); } CloudCacheExistsResult CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set& Keys) { return CacheTypeExists(Namespace, "objects"sv, Keys); } std::vector CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector& ChunkHashes) { // ExtendableStringBuilder<256> Uri; // Uri << m_CacheClient->ServiceUrl(); // Uri << "/api/v1/s/" << Namespace; ZEN_UNUSED(Namespace, BucketId, ChunkHashes); return {}; } CloudCacheResult CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); return detail::ConvertResponse(Response); } CloudCacheExistsResult CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set& Keys) { ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); ExtendableStringBuilder<256> Body; Body << "["; for (const auto& Key : Keys) { Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; } Body << "]"; IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); Payload.SetContentType(ZenContentType::kJSON); HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), Payload, {HttpClient::Accept(ZenContentType::kCbObject)}); CloudCacheExistsResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { const CbObject ExistsResponse = Response.AsObject(); for (auto& Item : ExistsResponse["needs"sv]) { Result.Needs.insert(Item.AsHash()); } } return Result; } /** * An access token provider that holds a token that will never change. */ class StaticTokenProvider final : public CloudCacheTokenProvider { public: StaticTokenProvider(CloudCacheAccessToken Token) : m_Token(std::move(Token)) {} virtual ~StaticTokenProvider() = default; virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Token; } private: CloudCacheAccessToken m_Token; }; std::unique_ptr CloudCacheTokenProvider::CreateFromStaticToken(CloudCacheAccessToken Token) { return std::make_unique(std::move(Token)); } class OAuthClientCredentialsTokenProvider final : public CloudCacheTokenProvider { public: OAuthClientCredentialsTokenProvider(const CloudCacheTokenProvider::OAuthClientCredentialsParams& Params) { m_Url = std::string(Params.Url); m_ClientId = std::string(Params.ClientId); m_ClientSecret = std::string(Params.ClientSecret); } virtual ~OAuthClientCredentialsTokenProvider() = default; virtual CloudCacheAccessToken AcquireAccessToken() final override { using namespace std::chrono; std::string Body = fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}", m_ClientId, m_ClientSecret); cpr::Response Response = cpr::Post(cpr::Url{m_Url}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{std::move(Body)}); if (Response.error || Response.status_code != 200) { return {}; } std::string JsonError; json11::Json Json = json11::Json::parse(Response.text, JsonError); if (JsonError.empty() == false) { return {}; } std::string Token = Json["access_token"].string_value(); int64_t ExpiresInSeconds = static_cast(Json["expires_in"].int_value()); CloudCacheAccessToken::TimePoint ExpireTime = CloudCacheAccessToken::Clock::now() + seconds(ExpiresInSeconds); return {.Value = fmt::format("Bearer {}", Token), .ExpireTime = ExpireTime}; } private: std::string m_Url; std::string m_ClientId; std::string m_ClientSecret; }; std::unique_ptr CloudCacheTokenProvider::CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params) { return std::make_unique(Params); } class CallbackTokenProvider final : public CloudCacheTokenProvider { public: CallbackTokenProvider(std::function&& Callback) : m_Callback(std::move(Callback)) {} virtual ~CallbackTokenProvider() = default; virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Callback(); } private: std::function m_Callback; }; std::unique_ptr CloudCacheTokenProvider::CreateFromCallback(std::function&& Callback) { return std::make_unique(std::move(Callback)); } static std::optional> GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider) { if (TokenProvider == nullptr) { return {}; } auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken { CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken(); return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime}; }; return ProviderFunc; } CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr TokenProvider) : m_Log(zen::logging::Get("jupiter")) , m_DefaultDdcNamespace(Options.DdcNamespace) , m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) , m_ComputeCluster(Options.ComputeCluster) , m_TokenProvider(std::move(TokenProvider)) , m_HttpClient(Options.ServiceUrl, HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, .Timeout = Options.Timeout, .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()), .AssumeHttp2 = Options.AssumeHttp2, .AllowResume = Options.AllowResume, .RetryCount = Options.RetryCount}) { ZEN_ASSERT(m_TokenProvider.get() != nullptr); } CloudCacheClient::~CloudCacheClient() { } } // namespace zen