From fd3946f2b2b013af01fdf60f67afb655c38c1901 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Tue, 31 Aug 2021 15:01:46 +0200 Subject: Asynchronous upstream caching to Jupiter Co-authored-by: Stefan Boberg --- zenserver/upstream/jupiter.cpp | 235 +++++++++++++++++++++++++++++------------ 1 file changed, 167 insertions(+), 68 deletions(-) (limited to 'zenserver/upstream/jupiter.cpp') diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 977bcc712..09be2c776 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -3,6 +3,7 @@ #include "jupiter.h" #include "cache/structuredcachestore.h" +#include "diag/logging.h" #include #include @@ -25,7 +26,6 @@ # pragma comment(lib, "Wldap32.lib") #endif -#include #include using namespace std::literals; @@ -51,9 +51,47 @@ namespace detail { CloudCacheClient& OwnerClient; cpr::Session Session; }; + + void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response) + { + std::string_view ContentType = "unknown"sv; + if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) + { + ContentType = It->second; + } + + const double Bytes = Verb == "GET"sv ? Response.downloaded_bytes : Response.uploaded_bytes; + + const bool IsBinary = + ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || ContentType == "application/octet-stream"; + + if (IsBinary) + { + Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}' '{}' Bytes, Reason: '{}'", + Verb, + Response.url.str(), + Response.status_code, + Response.elapsed, + ContentType, + Bytes, + Response.reason); + } + else + { + Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}': '{}', Reason: '{}'", + Verb, + Response.url.str(), + Response.status_code, + Response.elapsed, + ContentType, + Response.text, + Response.reason); + } + } + } // namespace detail -CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_CacheClient(OuterClient) +CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_Log(OuterClient->Logger()), m_CacheClient(OuterClient) { m_SessionState = m_CacheClient->AllocSessionState(); } @@ -63,94 +101,163 @@ CloudCacheSession::~CloudCacheSession() m_CacheClient->FreeSessionState(m_SessionState); } -#define TESTING_PREFIX "aaaaa" - -IoBuffer -CloudCacheSession::Get(std::string_view BucketId, std::string_view Key) +CloudCacheResult +CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) { + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); + ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key << ".raw"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw"; - auto& Session = m_SessionState->Session; - Session.SetUrl(cpr::Url{Uri.c_str()}); + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}}); cpr::Response Response = Session.Get(); - if (!Response.error) + detail::Log(m_Log, "GET"sv, Response); + + if (Response.status_code == 200) { - return IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; } - return {}; + return {.Success = false}; } -void -CloudCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data) +CloudCacheResult +CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key) { + return GetDerivedData(BucketId, Key.ToHexString()); +} + +CloudCacheResult +CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key) +{ + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); + ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" + << Key.ToHexString(); - auto& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-cb"}}); - IoHash Hash = IoHash::HashMemory(Data.Data(), Data.Size()); + cpr::Response Response = Session.Get(); + detail::Log(m_Log, "GET"sv, Response); + if (Response.status_code == 200) + { + return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; + } + + return {.Success = false}; +} + +CloudCacheResult +CloudCacheSession::GetCompressedBlob(const IoHash& Key) +{ std::string Auth; m_CacheClient->AcquireAccessToken(Auth); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}}); - Session.SetOption(cpr::Body{(const char*)Data.Data(), Data.Size()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-comp"}}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Get(); + detail::Log(m_Log, "GET"sv, Response); - if (Response.error) + if (Response.status_code == 200) { - spdlog::warn("PUT failed: '{}'", Response.error.message); + return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; } + + return {.Success = false}; } -void -CloudCacheSession::Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data) +CloudCacheResult +CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) { + IoHash Hash = IoHash::HashMemory(DerivedData.Data(), DerivedData.Size()); + + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); + ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; auto& Session = m_SessionState->Session; + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption( + cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}}); + Session.SetBody(cpr::Body{(const char*)DerivedData.Data(), DerivedData.Size()}); + + cpr::Response Response = Session.Put(); + detail::Log(m_Log, "PUT"sv, Response); + + return {.Success = Response.status_code == 200}; +} + +CloudCacheResult +CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData) +{ + return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); +} + +CloudCacheResult +CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref) +{ + IoHash Hash = IoHash::HashMemory(Ref.Data(), Ref.Size()); + std::string Auth; m_CacheClient->AcquireAccessToken(Auth); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" + << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption( + cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); - if (Data.Value.GetContentType() == ZenContentType::kCbObject) - { - CbObjectView Cbo(Data.Value.Data()); - const IoHash Hash = Cbo.GetHash(); - const MemoryView DataView = Cbo.GetView(); + cpr::Response Response = Session.Put(); + detail::Log(m_Log, "PUT"sv, Response); - Session.SetOption( - cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + return {.Success = Response.status_code == 200}; +} - Session.SetOption(cpr::Body{reinterpret_cast(DataView.GetData()), DataView.GetSize()}); - } - else - { - const IoHash Hash = IoHash::HashMemory(Data.Value.Data(), Data.Value.Size()); +CloudCacheResult +CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) +{ + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); - Session.SetOption( - cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - Session.SetOption(cpr::Body{reinterpret_cast(Data.Value.Data()), Data.Value.Size()}); - } + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Content-Type", "application/x-ue-comp"}}); + Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); cpr::Response Response = Session.Put(); + detail::Log(m_Log, "PUT"sv, Response); - if (Response.error) - { - spdlog::warn("PUT failed: '{}'", Response.error.message); - } + return {.Success = Response.status_code == 200}; } std::vector @@ -158,7 +265,7 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector& { ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/s/" << m_CacheClient->Namespace(); + Uri << "/api/v1/s/" << m_CacheClient->DdcNamespace(); ZEN_UNUSED(BucketId, ChunkHashes); @@ -167,17 +274,6 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector& ////////////////////////////////////////////////////////////////////////// -IoBuffer -CloudCacheSession::Get(std::string_view BucketId, const IoHash& Key) -{ - StringBuilder<64> KeyString; - Key.ToHexString(KeyString); - - return Get(BucketId, KeyString); -} - -////////////////////////////////////////////////////////////////////////// - std::string CloudCacheAccessToken::GetAuthorizationHeaderValue() { @@ -197,27 +293,30 @@ CloudCacheAccessToken::SetToken(std::string_view Token) ////////////////////////////////////////////////////////////////////////// // // ServiceUrl: https://jupiter.devtools.epicgames.com -// Namespace: ue4.ddc +// DdcNamespace: ue4.ddc // OAuthClientId: 0oao91lrhqPiAlaGD0x7 // OAuthProvider: https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token // OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d // CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, - std::string_view Namespace, + std::string_view DdcNamespace, + std::string_view BlobStoreNamespace, std::string_view OAuthProvider, std::string_view OAuthClientId, std::string_view OAuthSecret) -: m_ServiceUrl(ServiceUrl) +: m_Log(zen::logging::Get("jupiter")) +, m_ServiceUrl(ServiceUrl) , m_OAuthFullUri(OAuthProvider) -, m_Namespace(Namespace) +, m_DdcNamespace(DdcNamespace) +, m_BlobStoreNamespace(BlobStoreNamespace) , m_DefaultBucket("default") , m_OAuthClientId(OAuthClientId) , m_OAuthSecret(OAuthSecret) { if (!OAuthProvider.starts_with("http://"sv) && !OAuthProvider.starts_with("https://"sv)) { - spdlog::warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str()); + m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str()); m_IsValid = false; return; @@ -229,7 +328,7 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, if (SchemePos == std::string::npos) { - spdlog::warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl); + m_Log.warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl); m_IsValid = false; return; @@ -239,7 +338,7 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, if (DomainEnd == std::string::npos) { - spdlog::warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl); + m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl); m_IsValid = false; return; -- cgit v1.2.3