diff options
Diffstat (limited to 'zenserver/upstream/jupiter.cpp')
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 215 |
1 files changed, 151 insertions, 64 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 7a36b5841..4caa5c8df 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -182,9 +182,14 @@ CloudCacheSession::GetBlob(const IoHash& Key) { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + const IoBuffer Buffer = + Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } @@ -214,6 +219,10 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -246,6 +255,10 @@ CloudCacheSession::GetObject(const IoHash& Key) { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -571,67 +584,41 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) CloudCacheResult CloudCacheSession::BlobExists(const IoHash& Key) { - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - if (!AccessToken.IsValid()) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - - cpr::Session& Session = m_SessionState->Session; - - Session.SetOption(cpr::Url{Uri.c_str()}); - - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return CacheTypeExists("blobs"sv, Key); } CloudCacheResult CloudCacheSession::CompressedBlobExists(const IoHash& Key) { - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - if (!AccessToken.IsValid()) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - - cpr::Session& Session = m_SessionState->Session; + return CacheTypeExists("compressed-blobs"sv, Key); +} - Session.SetOption(cpr::Url{Uri.c_str()}); +CloudCacheResult +CloudCacheSession::ObjectExists(const IoHash& Key) +{ + return CacheTypeExists("objects"sv, Key); +} - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); +CloudCacheExistsResult +CloudCacheSession::BlobExists(const std::set<IoHash>& Keys) +{ + return CacheTypeExists("blobs"sv, Keys); +} - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } +CloudCacheExistsResult +CloudCacheSession::CompressedBlobExists(const std::set<IoHash>& Keys) +{ + return CacheTypeExists("compressed-blobs"sv, Keys); +} - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +CloudCacheExistsResult +CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys) +{ + return CacheTypeExists("objects"sv, Keys); } CloudCacheResult -CloudCacheSession::ObjectExists(const IoHash& Key) +CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) @@ -640,14 +627,16 @@ CloudCacheSession::ObjectExists(const IoHash& Key) } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; - cpr::Session& Session = m_SessionState->Session; + auto& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); if (Response.error) { @@ -662,7 +651,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key) } CloudCacheResult -CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) +CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) @@ -671,13 +660,12 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; auto& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); @@ -698,7 +686,7 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa } CloudCacheResult -CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) +CloudCacheSession::GetObjectTree(const IoHash& Key) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) @@ -707,15 +695,15 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() << "/tree"; - auto& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->Session; Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); if (Response.error) { @@ -761,6 +749,92 @@ CloudCacheSession::VerifyAccessToken(long StatusCode) return true; } +CloudCacheResult +CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + + cpr::Response Response = Session.Head(); + ZEN_DEBUG("HEAD {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +} + +CloudCacheExistsResult +CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; + } + + ExtendableStringBuilder<256> Query; + for (const auto& Key : Keys) + { + Query << (Query.Size() != 0 ? "&id=" : "id=") << Key.ToHexString(); + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exists?" << Query; + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; + } + + CloudCacheExistsResult Result{ + CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}}; + + if (Result.Success) + { + IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); + for (auto& Item : ExistsResponse["id"sv]) + { + if (Item.IsHash()) + { + Result.Have.insert(Item.AsHash()); + } + } + } + + return Result; +} + ////////////////////////////////////////////////////////////////////////// // // ServiceUrl: https://jupiter.devtools.epicgames.com @@ -778,7 +852,14 @@ CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options) , m_BlobStoreNamespace(Options.BlobStoreNamespace) , m_OAuthClientId(Options.OAuthClientId) , m_OAuthSecret(Options.OAuthSecret) +, m_AccessToken(Options.AccessToken) { + if (!Options.AccessToken.empty()) + { + // If an access token was provided, OAuth settings are not used. + return; + } + if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv)) { ZEN_WARN("bad provider specification: '{}' - must be fully qualified", Options.OAuthProvider); @@ -828,6 +909,12 @@ CloudCacheClient::AcquireAccessToken() { using namespace std::chrono; + // If an access token was provided, return it instead of querying OAuth + if (!m_AccessToken.empty()) + { + return {m_AccessToken, steady_clock::time_point::max()}; + } + ExtendableStringBuilder<128> OAuthFormData; OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret; |