// Copyright Epic Games, Inc. All Rights Reserved. #include "jupiter.h" #include "cache/structuredcachestore.h" #include "diag/logging.h" #include #include #include #include #include #include // For some reason, these don't seem to stick, so we disable the warnings //# define _SILENCE_CXX17_C_HEADER_DEPRECATION_WARNING 1 //# define _SILENCE_ALL_CXX17_DEPRECATION_WARNINGS 1 #pragma warning(push) #pragma warning(disable : 4004) #pragma warning(disable : 4996) #include #pragma warning(pop) #if ZEN_PLATFORM_WINDOWS # pragma comment(lib, "Crypt32.lib") # pragma comment(lib, "Wldap32.lib") #endif #include using namespace std::literals; using namespace fmt::literals; namespace zen { namespace detail { struct CloudCacheSessionState { CloudCacheSessionState(CloudCacheClient& Client) : OwnerClient(Client) {} ~CloudCacheSessionState() {} void Reset() { std::string Auth; OwnerClient.AcquireAccessToken(Auth); Session.SetBody({}); Session.SetOption(cpr::Header{{"Authorization", Auth}}); } CloudCacheClient& OwnerClient; cpr::Session Session; }; void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response) { std::string_view ContentType = "unknown"sv; if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) { ContentType = It->second; } const double Bytes = Verb == "GET"sv ? Response.downloaded_bytes : Response.uploaded_bytes; const bool IsBinary = ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || ContentType == "application/octet-stream"; if (IsBinary) { Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}' '{}' Bytes, Reason: '{}'", Verb, Response.url.str(), Response.status_code, Response.elapsed, ContentType, Bytes, Response.reason); } else { Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}': '{}', Reason: '{}'", Verb, Response.url.str(), Response.status_code, Response.elapsed, ContentType, Response.text, Response.reason); } } } // namespace detail CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_Log(OuterClient->Logger()), m_CacheClient(OuterClient) { m_SessionState = m_CacheClient->AllocSessionState(); } CloudCacheSession::~CloudCacheSession() { m_CacheClient->FreeSessionState(m_SessionState); } CloudCacheResult CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) { std::string Auth; m_CacheClient->AcquireAccessToken(Auth); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw"; cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", Auth}}); cpr::Response Response = Session.Get(); detail::Log(m_Log, "GET"sv, Response); if (Response.status_code == 200) { return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; } return {.Success = false}; } CloudCacheResult CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key) { return GetDerivedData(BucketId, Key.ToHexString()); } CloudCacheResult CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key) { std::string Auth; m_CacheClient->AcquireAccessToken(Auth); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-cb"}}); cpr::Response Response = Session.Get(); detail::Log(m_Log, "GET"sv, Response); if (Response.status_code == 200) { return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; } return {.Success = false}; } CloudCacheResult CloudCacheSession::GetCompressedBlob(const IoHash& Key) { std::string Auth; m_CacheClient->AcquireAccessToken(Auth); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-comp"}}); cpr::Response Response = Session.Get(); detail::Log(m_Log, "GET"sv, Response); if (Response.status_code == 200) { return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; } return {.Success = false}; } CloudCacheResult CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) { IoHash Hash = IoHash::HashMemory(DerivedData.Data(), DerivedData.Size()); std::string Auth; m_CacheClient->AcquireAccessToken(Auth); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; auto& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption( cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}}); Session.SetBody(cpr::Body{(const char*)DerivedData.Data(), DerivedData.Size()}); cpr::Response Response = Session.Put(); detail::Log(m_Log, "PUT"sv, Response); return {.Success = Response.status_code == 200}; } CloudCacheResult CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData) { return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); } CloudCacheResult CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref) { IoHash Hash = IoHash::HashMemory(Ref.Data(), Ref.Size()); std::string Auth; m_CacheClient->AcquireAccessToken(Auth); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption( cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); cpr::Response Response = Session.Put(); detail::Log(m_Log, "PUT"sv, Response); return {.Success = Response.status_code == 200}; } CloudCacheResult CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) { std::string Auth; m_CacheClient->AcquireAccessToken(Auth); ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Content-Type", "application/x-ue-comp"}}); Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); cpr::Response Response = Session.Put(); detail::Log(m_Log, "PUT"sv, Response); return {.Success = Response.status_code == 200}; } std::vector CloudCacheSession::Filter(std::string_view BucketId, const std::vector& ChunkHashes) { ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl(); Uri << "/api/v1/s/" << m_CacheClient->DdcNamespace(); ZEN_UNUSED(BucketId, ChunkHashes); return {}; } ////////////////////////////////////////////////////////////////////////// std::string CloudCacheAccessToken::GetAuthorizationHeaderValue() { RwLock::SharedLockScope _(m_Lock); return "Bearer {}"_format(m_Token); } inline void CloudCacheAccessToken::SetToken(std::string_view Token) { RwLock::ExclusiveLockScope _(m_Lock); m_Token = Token; ++m_Serial; } ////////////////////////////////////////////////////////////////////////// // // ServiceUrl: https://jupiter.devtools.epicgames.com // DdcNamespace: ue4.ddc // OAuthClientId: 0oao91lrhqPiAlaGD0x7 // OAuthProvider: https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token // OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d // CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, std::string_view DdcNamespace, std::string_view BlobStoreNamespace, std::string_view OAuthProvider, std::string_view OAuthClientId, std::string_view OAuthSecret) : m_Log(zen::logging::Get("jupiter")) , m_ServiceUrl(ServiceUrl) , m_OAuthFullUri(OAuthProvider) , m_DdcNamespace(DdcNamespace) , m_BlobStoreNamespace(BlobStoreNamespace) , m_DefaultBucket("default") , m_OAuthClientId(OAuthClientId) , m_OAuthSecret(OAuthSecret) { if (!OAuthProvider.starts_with("http://"sv) && !OAuthProvider.starts_with("https://"sv)) { m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str()); m_IsValid = false; return; } // Split into host and Uri substrings auto SchemePos = OAuthProvider.find("://"sv); if (SchemePos == std::string::npos) { m_Log.warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl); m_IsValid = false; return; } auto DomainEnd = OAuthProvider.find('/', /* also skip the :// */ SchemePos + 3); if (DomainEnd == std::string::npos) { m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl); m_IsValid = false; return; } m_OAuthDomain = OAuthProvider.substr(SchemePos + 3, DomainEnd - SchemePos - 3); // epicgames.okta.com m_OAuthUriPath = OAuthProvider.substr(DomainEnd + 1); // oauth2/..../v1/token } CloudCacheClient::~CloudCacheClient() { RwLock::ExclusiveLockScope _(m_SessionStateLock); for (auto State : m_SessionStateCache) { delete State; } } bool CloudCacheClient::AcquireAccessToken(std::string& AuthorizationHeaderValue) { // TODO: check for expiration if (!m_IsValid) { ExtendableStringBuilder<128> OAuthFormData; OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret; const uint32_t CurrentSerial = m_AccessToken.GetSerial(); static RwLock AuthMutex; RwLock::ExclusiveLockScope _(AuthMutex); // Protect against redundant authentication operations if (m_AccessToken.GetSerial() != CurrentSerial) { // TODO: this could verify that the token is actually valid and retry if not? return true; } std::string data{OAuthFormData}; cpr::Response Response = cpr::Post(cpr::Url{m_OAuthFullUri}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{data}); std::string Body{std::move(Response.text)}; // Parse JSON response std::string JsonError; json11::Json JsonResponse = json11::Json::parse(Body, /* out */ JsonError); if (!JsonError.empty()) { spdlog::warn("failed to parse OAuth response: '{}'", JsonError); return false; } std::string AccessToken = JsonResponse["access_token"].string_value(); int ExpiryTimeSeconds = JsonResponse["expires_in"].int_value(); m_AccessToken.SetToken(AccessToken); m_IsValid = true; } AuthorizationHeaderValue = m_AccessToken.GetAuthorizationHeaderValue(); return true; } detail::CloudCacheSessionState* CloudCacheClient::AllocSessionState() { detail::CloudCacheSessionState* State = nullptr; if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty()) { State = m_SessionStateCache.front(); m_SessionStateCache.pop_front(); } if (State == nullptr) { State = new detail::CloudCacheSessionState(*this); } State->Reset(); return State; } void CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State) { RwLock::ExclusiveLockScope _(m_SessionStateLock); m_SessionStateCache.push_front(State); } } // namespace zen