aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/jupiter.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-08-31 15:01:46 +0200
committerPer Larsson <[email protected]>2021-08-31 15:16:22 +0200
commitfd3946f2b2b013af01fdf60f67afb655c38c1901 (patch)
treeeca4abed5d71a157e185699f4e9668a92b756ca8 /zenserver/upstream/jupiter.cpp
parentRemoved unused packages from vcpkg.json (diff)
downloadzen-fd3946f2b2b013af01fdf60f67afb655c38c1901.tar.xz
zen-fd3946f2b2b013af01fdf60f67afb655c38c1901.zip
Asynchronous upstream caching to Jupiter
Co-authored-by: Stefan Boberg <[email protected]>
Diffstat (limited to 'zenserver/upstream/jupiter.cpp')
-rw-r--r--zenserver/upstream/jupiter.cpp235
1 files changed, 167 insertions, 68 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 977bcc712..09be2c776 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -3,6 +3,7 @@
#include "jupiter.h"
#include "cache/structuredcachestore.h"
+#include "diag/logging.h"
#include <fmt/format.h>
#include <zencore/compactbinary.h>
@@ -25,7 +26,6 @@
# pragma comment(lib, "Wldap32.lib")
#endif
-#include <spdlog/spdlog.h>
#include <json11.hpp>
using namespace std::literals;
@@ -51,9 +51,47 @@ namespace detail {
CloudCacheClient& OwnerClient;
cpr::Session Session;
};
+
+ void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response)
+ {
+ std::string_view ContentType = "unknown"sv;
+ if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
+ {
+ ContentType = It->second;
+ }
+
+ const double Bytes = Verb == "GET"sv ? Response.downloaded_bytes : Response.uploaded_bytes;
+
+ const bool IsBinary =
+ ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || ContentType == "application/octet-stream";
+
+ if (IsBinary)
+ {
+ Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}' '{}' Bytes, Reason: '{}'",
+ Verb,
+ Response.url.str(),
+ Response.status_code,
+ Response.elapsed,
+ ContentType,
+ Bytes,
+ Response.reason);
+ }
+ else
+ {
+ Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}': '{}', Reason: '{}'",
+ Verb,
+ Response.url.str(),
+ Response.status_code,
+ Response.elapsed,
+ ContentType,
+ Response.text,
+ Response.reason);
+ }
+ }
+
} // namespace detail
-CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_CacheClient(OuterClient)
+CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_Log(OuterClient->Logger()), m_CacheClient(OuterClient)
{
m_SessionState = m_CacheClient->AllocSessionState();
}
@@ -63,94 +101,163 @@ CloudCacheSession::~CloudCacheSession()
m_CacheClient->FreeSessionState(m_SessionState);
}
-#define TESTING_PREFIX "aaaaa"
-
-IoBuffer
-CloudCacheSession::Get(std::string_view BucketId, std::string_view Key)
+CloudCacheResult
+CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key)
{
+ std::string Auth;
+ m_CacheClient->AcquireAccessToken(Auth);
+
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key << ".raw";
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw";
- auto& Session = m_SessionState->Session;
- Session.SetUrl(cpr::Url{Uri.c_str()});
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", Auth}});
cpr::Response Response = Session.Get();
- if (!Response.error)
+ detail::Log(m_Log, "GET"sv, Response);
+
+ if (Response.status_code == 200)
{
- return IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
+ return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true};
}
- return {};
+ return {.Success = false};
}
-void
-CloudCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data)
+CloudCacheResult
+CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key)
{
+ return GetDerivedData(BucketId, Key.ToHexString());
+}
+
+CloudCacheResult
+CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key)
+{
+ std::string Auth;
+ m_CacheClient->AcquireAccessToken(Auth);
+
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
+ << Key.ToHexString();
- auto& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-cb"}});
- IoHash Hash = IoHash::HashMemory(Data.Data(), Data.Size());
+ cpr::Response Response = Session.Get();
+ detail::Log(m_Log, "GET"sv, Response);
+ if (Response.status_code == 200)
+ {
+ return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true};
+ }
+
+ return {.Success = false};
+}
+
+CloudCacheResult
+CloudCacheSession::GetCompressedBlob(const IoHash& Key)
+{
std::string Auth;
m_CacheClient->AcquireAccessToken(Auth);
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(
- cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}});
- Session.SetOption(cpr::Body{(const char*)Data.Data(), Data.Size()});
+ Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-comp"}});
- cpr::Response Response = Session.Put();
+ cpr::Response Response = Session.Get();
+ detail::Log(m_Log, "GET"sv, Response);
- if (Response.error)
+ if (Response.status_code == 200)
{
- spdlog::warn("PUT failed: '{}'", Response.error.message);
+ return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true};
}
+
+ return {.Success = false};
}
-void
-CloudCacheSession::Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data)
+CloudCacheResult
+CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData)
{
+ IoHash Hash = IoHash::HashMemory(DerivedData.Data(), DerivedData.Size());
+
+ std::string Auth;
+ m_CacheClient->AcquireAccessToken(Auth);
+
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key;
auto& Session = m_SessionState->Session;
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(
+ cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}});
+ Session.SetBody(cpr::Body{(const char*)DerivedData.Data(), DerivedData.Size()});
+
+ cpr::Response Response = Session.Put();
+ detail::Log(m_Log, "PUT"sv, Response);
+
+ return {.Success = Response.status_code == 200};
+}
+
+CloudCacheResult
+CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData)
+{
+ return PutDerivedData(BucketId, Key.ToHexString(), DerivedData);
+}
+
+CloudCacheResult
+CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref)
+{
+ IoHash Hash = IoHash::HashMemory(Ref.Data(), Ref.Size());
+
std::string Auth;
m_CacheClient->AcquireAccessToken(Auth);
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
+ << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->Session;
+
Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(
+ cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()});
- if (Data.Value.GetContentType() == ZenContentType::kCbObject)
- {
- CbObjectView Cbo(Data.Value.Data());
- const IoHash Hash = Cbo.GetHash();
- const MemoryView DataView = Cbo.GetView();
+ cpr::Response Response = Session.Put();
+ detail::Log(m_Log, "PUT"sv, Response);
- Session.SetOption(
- cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
+ return {.Success = Response.status_code == 200};
+}
- Session.SetOption(cpr::Body{reinterpret_cast<const char*>(DataView.GetData()), DataView.GetSize()});
- }
- else
- {
- const IoHash Hash = IoHash::HashMemory(Data.Value.Data(), Data.Value.Size());
+CloudCacheResult
+CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
+{
+ std::string Auth;
+ m_CacheClient->AcquireAccessToken(Auth);
- Session.SetOption(
- cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
- Session.SetOption(cpr::Body{reinterpret_cast<const char*>(Data.Value.Data()), Data.Value.Size()});
- }
+ cpr::Session& Session = m_SessionState->Session;
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Content-Type", "application/x-ue-comp"}});
+ Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()});
cpr::Response Response = Session.Put();
+ detail::Log(m_Log, "PUT"sv, Response);
- if (Response.error)
- {
- spdlog::warn("PUT failed: '{}'", Response.error.message);
- }
+ return {.Success = Response.status_code == 200};
}
std::vector<IoHash>
@@ -158,7 +265,7 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>&
{
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/s/" << m_CacheClient->Namespace();
+ Uri << "/api/v1/s/" << m_CacheClient->DdcNamespace();
ZEN_UNUSED(BucketId, ChunkHashes);
@@ -167,17 +274,6 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>&
//////////////////////////////////////////////////////////////////////////
-IoBuffer
-CloudCacheSession::Get(std::string_view BucketId, const IoHash& Key)
-{
- StringBuilder<64> KeyString;
- Key.ToHexString(KeyString);
-
- return Get(BucketId, KeyString);
-}
-
-//////////////////////////////////////////////////////////////////////////
-
std::string
CloudCacheAccessToken::GetAuthorizationHeaderValue()
{
@@ -197,27 +293,30 @@ CloudCacheAccessToken::SetToken(std::string_view Token)
//////////////////////////////////////////////////////////////////////////
//
// ServiceUrl: https://jupiter.devtools.epicgames.com
-// Namespace: ue4.ddc
+// DdcNamespace: ue4.ddc
// OAuthClientId: 0oao91lrhqPiAlaGD0x7
// OAuthProvider: https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token
// OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d
//
CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl,
- std::string_view Namespace,
+ std::string_view DdcNamespace,
+ std::string_view BlobStoreNamespace,
std::string_view OAuthProvider,
std::string_view OAuthClientId,
std::string_view OAuthSecret)
-: m_ServiceUrl(ServiceUrl)
+: m_Log(zen::logging::Get("jupiter"))
+, m_ServiceUrl(ServiceUrl)
, m_OAuthFullUri(OAuthProvider)
-, m_Namespace(Namespace)
+, m_DdcNamespace(DdcNamespace)
+, m_BlobStoreNamespace(BlobStoreNamespace)
, m_DefaultBucket("default")
, m_OAuthClientId(OAuthClientId)
, m_OAuthSecret(OAuthSecret)
{
if (!OAuthProvider.starts_with("http://"sv) && !OAuthProvider.starts_with("https://"sv))
{
- spdlog::warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str());
+ m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str());
m_IsValid = false;
return;
@@ -229,7 +328,7 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl,
if (SchemePos == std::string::npos)
{
- spdlog::warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl);
+ m_Log.warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl);
m_IsValid = false;
return;
@@ -239,7 +338,7 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl,
if (DomainEnd == std::string::npos)
{
- spdlog::warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl);
+ m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl);
m_IsValid = false;
return;