aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/jupiter.cpp
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-11-12 08:57:38 +0100
committerMartin Ridgers <[email protected]>2021-11-12 08:57:38 +0100
commit96888168970c947636e9663cbf7c3dd219d1aeab (patch)
tree55824a08801932ad75100868204254a6372af7e3 /zenserver/upstream/jupiter.cpp
parentFixed dlopen/close() unresolved symbols (diff)
parentHorde remote execute (#25) (diff)
downloadzen-96888168970c947636e9663cbf7c3dd219d1aeab.tar.xz
zen-96888168970c947636e9663cbf7c3dd219d1aeab.zip
Merged main
Diffstat (limited to 'zenserver/upstream/jupiter.cpp')
-rw-r--r--zenserver/upstream/jupiter.cpp215
1 files changed, 151 insertions, 64 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 11f8a3171..af416c182 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -181,9 +181,14 @@ CloudCacheSession::GetBlob(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
- const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+ const IoBuffer Buffer =
+ Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}
@@ -213,6 +218,10 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -245,6 +254,10 @@ CloudCacheSession::GetObject(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -570,67 +583,41 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
CloudCacheResult
CloudCacheSession::BlobExists(const IoHash& Key)
{
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
- if (!AccessToken.IsValid())
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
-
- cpr::Session& Session = m_SessionState->Session;
-
- Session.SetOption(cpr::Url{Uri.c_str()});
-
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
-
- if (Response.error)
- {
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
- }
- else if (!VerifyAccessToken(Response.status_code))
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return CacheTypeExists("blobs"sv, Key);
}
CloudCacheResult
CloudCacheSession::CompressedBlobExists(const IoHash& Key)
{
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
- if (!AccessToken.IsValid())
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
-
- cpr::Session& Session = m_SessionState->Session;
+ return CacheTypeExists("compressed-blobs"sv, Key);
+}
- Session.SetOption(cpr::Url{Uri.c_str()});
+CloudCacheResult
+CloudCacheSession::ObjectExists(const IoHash& Key)
+{
+ return CacheTypeExists("objects"sv, Key);
+}
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
+CloudCacheExistsResult
+CloudCacheSession::BlobExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("blobs"sv, Keys);
+}
- if (Response.error)
- {
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
- }
- else if (!VerifyAccessToken(Response.status_code))
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
+CloudCacheExistsResult
+CloudCacheSession::CompressedBlobExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("compressed-blobs"sv, Keys);
+}
- return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+CloudCacheExistsResult
+CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("objects"sv, Keys);
}
CloudCacheResult
-CloudCacheSession::ObjectExists(const IoHash& Key)
+CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -639,14 +626,16 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId;
- cpr::Session& Session = m_SessionState->Session;
+ auto& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()});
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
if (Response.error)
{
@@ -661,7 +650,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
}
CloudCacheResult
-CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData)
+CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -670,13 +659,12 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds;
auto& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
- Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
ZEN_DEBUG("POST {}", Response);
@@ -697,7 +685,7 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa
}
CloudCacheResult
-CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
+CloudCacheSession::GetObjectTree(const IoHash& Key)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -706,15 +694,15 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() << "/tree";
- auto& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
+ cpr::Response Response = Session.Get();
+ ZEN_DEBUG("GET {}", Response);
if (Response.error)
{
@@ -760,6 +748,92 @@ CloudCacheSession::VerifyAccessToken(long StatusCode)
return true;
}
+CloudCacheResult
+CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
+
+ cpr::Response Response = Session.Head();
+ ZEN_DEBUG("HEAD {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+}
+
+CloudCacheExistsResult
+CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}};
+ }
+
+ ExtendableStringBuilder<256> Query;
+ for (const auto& Key : Keys)
+ {
+ Query << (Query.Size() != 0 ? "&id=" : "id=") << Key.ToHexString();
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exists?" << Query;
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}};
+ }
+
+ CloudCacheExistsResult Result{
+ CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}};
+
+ if (Result.Success)
+ {
+ IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer);
+ for (auto& Item : ExistsResponse["id"sv])
+ {
+ if (Item.IsHash())
+ {
+ Result.Have.insert(Item.AsHash());
+ }
+ }
+ }
+
+ return Result;
+}
+
//////////////////////////////////////////////////////////////////////////
//
// ServiceUrl: https://jupiter.devtools.epicgames.com
@@ -777,7 +851,14 @@ CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options)
, m_BlobStoreNamespace(Options.BlobStoreNamespace)
, m_OAuthClientId(Options.OAuthClientId)
, m_OAuthSecret(Options.OAuthSecret)
+, m_AccessToken(Options.AccessToken)
{
+ if (!Options.AccessToken.empty())
+ {
+ // If an access token was provided, OAuth settings are not used.
+ return;
+ }
+
if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv))
{
ZEN_WARN("bad provider specification: '{}' - must be fully qualified", Options.OAuthProvider);
@@ -827,6 +908,12 @@ CloudCacheClient::AcquireAccessToken()
{
using namespace std::chrono;
+ // If an access token was provided, return it instead of querying OAuth
+ if (!m_AccessToken.empty())
+ {
+ return {m_AccessToken, steady_clock::time_point::max()};
+ }
+
ExtendableStringBuilder<128> OAuthFormData;
OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret;