aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/jupiter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver/upstream/jupiter.cpp')
-rw-r--r--zenserver/upstream/jupiter.cpp215
1 files changed, 151 insertions, 64 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 7a36b5841..4caa5c8df 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -182,9 +182,14 @@ CloudCacheSession::GetBlob(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
- const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+ const IoBuffer Buffer =
+ Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}
@@ -214,6 +219,10 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -246,6 +255,10 @@ CloudCacheSession::GetObject(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -571,67 +584,41 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
CloudCacheResult
CloudCacheSession::BlobExists(const IoHash& Key)
{
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
- if (!AccessToken.IsValid())
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
-
- cpr::Session& Session = m_SessionState->Session;
-
- Session.SetOption(cpr::Url{Uri.c_str()});
-
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
-
- if (Response.error)
- {
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
- }
- else if (!VerifyAccessToken(Response.status_code))
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return CacheTypeExists("blobs"sv, Key);
}
CloudCacheResult
CloudCacheSession::CompressedBlobExists(const IoHash& Key)
{
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
- if (!AccessToken.IsValid())
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
-
- cpr::Session& Session = m_SessionState->Session;
+ return CacheTypeExists("compressed-blobs"sv, Key);
+}
- Session.SetOption(cpr::Url{Uri.c_str()});
+CloudCacheResult
+CloudCacheSession::ObjectExists(const IoHash& Key)
+{
+ return CacheTypeExists("objects"sv, Key);
+}
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
+CloudCacheExistsResult
+CloudCacheSession::BlobExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("blobs"sv, Keys);
+}
- if (Response.error)
- {
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
- }
- else if (!VerifyAccessToken(Response.status_code))
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
+CloudCacheExistsResult
+CloudCacheSession::CompressedBlobExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("compressed-blobs"sv, Keys);
+}
- return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+CloudCacheExistsResult
+CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("objects"sv, Keys);
}
CloudCacheResult
-CloudCacheSession::ObjectExists(const IoHash& Key)
+CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -640,14 +627,16 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId;
- cpr::Session& Session = m_SessionState->Session;
+ auto& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()});
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
if (Response.error)
{
@@ -662,7 +651,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
}
CloudCacheResult
-CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData)
+CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -671,13 +660,12 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds;
auto& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
- Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
ZEN_DEBUG("POST {}", Response);
@@ -698,7 +686,7 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa
}
CloudCacheResult
-CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
+CloudCacheSession::GetObjectTree(const IoHash& Key)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -707,15 +695,15 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() << "/tree";
- auto& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->Session;
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
+ cpr::Response Response = Session.Get();
+ ZEN_DEBUG("GET {}", Response);
if (Response.error)
{
@@ -761,6 +749,92 @@ CloudCacheSession::VerifyAccessToken(long StatusCode)
return true;
}
+CloudCacheResult
+CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
+
+ cpr::Response Response = Session.Head();
+ ZEN_DEBUG("HEAD {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+}
+
+CloudCacheExistsResult
+CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}};
+ }
+
+ ExtendableStringBuilder<256> Query;
+ for (const auto& Key : Keys)
+ {
+ Query << (Query.Size() != 0 ? "&id=" : "id=") << Key.ToHexString();
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exists?" << Query;
+
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}};
+ }
+
+ CloudCacheExistsResult Result{
+ CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}};
+
+ if (Result.Success)
+ {
+ IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer);
+ for (auto& Item : ExistsResponse["id"sv])
+ {
+ if (Item.IsHash())
+ {
+ Result.Have.insert(Item.AsHash());
+ }
+ }
+ }
+
+ return Result;
+}
+
//////////////////////////////////////////////////////////////////////////
//
// ServiceUrl: https://jupiter.devtools.epicgames.com
@@ -778,7 +852,14 @@ CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options)
, m_BlobStoreNamespace(Options.BlobStoreNamespace)
, m_OAuthClientId(Options.OAuthClientId)
, m_OAuthSecret(Options.OAuthSecret)
+, m_AccessToken(Options.AccessToken)
{
+ if (!Options.AccessToken.empty())
+ {
+ // If an access token was provided, OAuth settings are not used.
+ return;
+ }
+
if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv))
{
ZEN_WARN("bad provider specification: '{}' - must be fully qualified", Options.OAuthProvider);
@@ -828,6 +909,12 @@ CloudCacheClient::AcquireAccessToken()
{
using namespace std::chrono;
+ // If an access token was provided, return it instead of querying OAuth
+ if (!m_AccessToken.empty())
+ {
+ return {m_AccessToken, steady_clock::time_point::max()};
+ }
+
ExtendableStringBuilder<128> OAuthFormData;
OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret;