diff options
| author | Per Larsson <[email protected]> | 2021-08-31 15:01:46 +0200 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-08-31 15:16:22 +0200 |
| commit | fd3946f2b2b013af01fdf60f67afb655c38c1901 (patch) | |
| tree | eca4abed5d71a157e185699f4e9668a92b756ca8 /zenserver/upstream | |
| parent | Removed unused packages from vcpkg.json (diff) | |
| download | zen-fd3946f2b2b013af01fdf60f67afb655c38c1901.tar.xz zen-fd3946f2b2b013af01fdf60f67afb655c38c1901.zip | |
Asynchronous upstream caching to Jupiter
Co-authored-by: Stefan Boberg <[email protected]>
Diffstat (limited to 'zenserver/upstream')
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 235 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 38 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 347 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 82 |
4 files changed, 624 insertions, 78 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 977bcc712..09be2c776 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -3,6 +3,7 @@ #include "jupiter.h" #include "cache/structuredcachestore.h" +#include "diag/logging.h" #include <fmt/format.h> #include <zencore/compactbinary.h> @@ -25,7 +26,6 @@ # pragma comment(lib, "Wldap32.lib") #endif -#include <spdlog/spdlog.h> #include <json11.hpp> using namespace std::literals; @@ -51,9 +51,47 @@ namespace detail { CloudCacheClient& OwnerClient; cpr::Session Session; }; + + void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response) + { + std::string_view ContentType = "unknown"sv; + if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) + { + ContentType = It->second; + } + + const double Bytes = Verb == "GET"sv ? Response.downloaded_bytes : Response.uploaded_bytes; + + const bool IsBinary = + ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || ContentType == "application/octet-stream"; + + if (IsBinary) + { + Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}' '{}' Bytes, Reason: '{}'", + Verb, + Response.url.str(), + Response.status_code, + Response.elapsed, + ContentType, + Bytes, + Response.reason); + } + else + { + Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}': '{}', Reason: '{}'", + Verb, + Response.url.str(), + Response.status_code, + Response.elapsed, + ContentType, + Response.text, + Response.reason); + } + } + } // namespace detail -CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_CacheClient(OuterClient) +CloudCacheSession::CloudCacheSession(CloudCacheClient* OuterClient) : m_Log(OuterClient->Logger()), m_CacheClient(OuterClient) { m_SessionState = m_CacheClient->AllocSessionState(); } @@ -63,94 +101,163 @@ CloudCacheSession::~CloudCacheSession() m_CacheClient->FreeSessionState(m_SessionState); } -#define TESTING_PREFIX "aaaaa" - -IoBuffer -CloudCacheSession::Get(std::string_view BucketId, std::string_view Key) +CloudCacheResult +CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) { + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); + ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key << ".raw"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw"; - auto& Session = m_SessionState->Session; - Session.SetUrl(cpr::Url{Uri.c_str()}); + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}}); cpr::Response Response = Session.Get(); - if (!Response.error) + detail::Log(m_Log, "GET"sv, Response); + + if (Response.status_code == 200) { - return IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; } - return {}; + return {.Success = false}; } -void -CloudCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data) +CloudCacheResult +CloudCacheSession::GetDerivedData(std::string_view BucketId, const IoHash& Key) { + return GetDerivedData(BucketId, Key.ToHexString()); +} + +CloudCacheResult +CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key) +{ + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); + ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" + << Key.ToHexString(); - auto& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-cb"}}); - IoHash Hash = IoHash::HashMemory(Data.Data(), Data.Size()); + cpr::Response Response = Session.Get(); + detail::Log(m_Log, "GET"sv, Response); + if (Response.status_code == 200) + { + return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; + } + + return {.Success = false}; +} + +CloudCacheResult +CloudCacheSession::GetCompressedBlob(const IoHash& Key) +{ std::string Auth; m_CacheClient->AcquireAccessToken(Auth); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}}); - Session.SetOption(cpr::Body{(const char*)Data.Data(), Data.Size()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Accept", "application/x-ue-comp"}}); - cpr::Response Response = Session.Put(); + cpr::Response Response = Session.Get(); + detail::Log(m_Log, "GET"sv, Response); - if (Response.error) + if (Response.status_code == 200) { - spdlog::warn("PUT failed: '{}'", Response.error.message); + return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; } + + return {.Success = false}; } -void -CloudCacheSession::Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data) +CloudCacheResult +CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData) { + IoHash Hash = IoHash::HashMemory(DerivedData.Data(), DerivedData.Size()); + + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); + ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/c/ddc/" << m_CacheClient->Namespace() << "/" << BucketId << "/" TESTING_PREFIX << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; auto& Session = m_SessionState->Session; + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption( + cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/octet-stream"}}); + Session.SetBody(cpr::Body{(const char*)DerivedData.Data(), DerivedData.Size()}); + + cpr::Response Response = Session.Put(); + detail::Log(m_Log, "PUT"sv, Response); + + return {.Success = Response.status_code == 200}; +} + +CloudCacheResult +CloudCacheSession::PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData) +{ + return PutDerivedData(BucketId, Key.ToHexString(), DerivedData); +} + +CloudCacheResult +CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref) +{ + IoHash Hash = IoHash::HashMemory(Ref.Data(), Ref.Size()); + std::string Auth; m_CacheClient->AcquireAccessToken(Auth); + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" + << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption( + cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); - if (Data.Value.GetContentType() == ZenContentType::kCbObject) - { - CbObjectView Cbo(Data.Value.Data()); - const IoHash Hash = Cbo.GetHash(); - const MemoryView DataView = Cbo.GetView(); + cpr::Response Response = Session.Put(); + detail::Log(m_Log, "PUT"sv, Response); - Session.SetOption( - cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + return {.Success = Response.status_code == 200}; +} - Session.SetOption(cpr::Body{reinterpret_cast<const char*>(DataView.GetData()), DataView.GetSize()}); - } - else - { - const IoHash Hash = IoHash::HashMemory(Data.Value.Data(), Data.Value.Size()); +CloudCacheResult +CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) +{ + std::string Auth; + m_CacheClient->AcquireAccessToken(Auth); - Session.SetOption( - cpr::Header{{"Authorization", Auth}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - Session.SetOption(cpr::Body{reinterpret_cast<const char*>(Data.Value.Data()), Data.Value.Size()}); - } + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", Auth}, {"Content-Type", "application/x-ue-comp"}}); + Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); cpr::Response Response = Session.Put(); + detail::Log(m_Log, "PUT"sv, Response); - if (Response.error) - { - spdlog::warn("PUT failed: '{}'", Response.error.message); - } + return {.Success = Response.status_code == 200}; } std::vector<IoHash> @@ -158,7 +265,7 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& { ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/s/" << m_CacheClient->Namespace(); + Uri << "/api/v1/s/" << m_CacheClient->DdcNamespace(); ZEN_UNUSED(BucketId, ChunkHashes); @@ -167,17 +274,6 @@ CloudCacheSession::Filter(std::string_view BucketId, const std::vector<IoHash>& ////////////////////////////////////////////////////////////////////////// -IoBuffer -CloudCacheSession::Get(std::string_view BucketId, const IoHash& Key) -{ - StringBuilder<64> KeyString; - Key.ToHexString(KeyString); - - return Get(BucketId, KeyString); -} - -////////////////////////////////////////////////////////////////////////// - std::string CloudCacheAccessToken::GetAuthorizationHeaderValue() { @@ -197,27 +293,30 @@ CloudCacheAccessToken::SetToken(std::string_view Token) ////////////////////////////////////////////////////////////////////////// // // ServiceUrl: https://jupiter.devtools.epicgames.com -// Namespace: ue4.ddc +// DdcNamespace: ue4.ddc // OAuthClientId: 0oao91lrhqPiAlaGD0x7 // OAuthProvider: https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token // OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d // CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, - std::string_view Namespace, + std::string_view DdcNamespace, + std::string_view BlobStoreNamespace, std::string_view OAuthProvider, std::string_view OAuthClientId, std::string_view OAuthSecret) -: m_ServiceUrl(ServiceUrl) +: m_Log(zen::logging::Get("jupiter")) +, m_ServiceUrl(ServiceUrl) , m_OAuthFullUri(OAuthProvider) -, m_Namespace(Namespace) +, m_DdcNamespace(DdcNamespace) +, m_BlobStoreNamespace(BlobStoreNamespace) , m_DefaultBucket("default") , m_OAuthClientId(OAuthClientId) , m_OAuthSecret(OAuthSecret) { if (!OAuthProvider.starts_with("http://"sv) && !OAuthProvider.starts_with("https://"sv)) { - spdlog::warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str()); + m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str()); m_IsValid = false; return; @@ -229,7 +328,7 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, if (SchemePos == std::string::npos) { - spdlog::warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl); + m_Log.warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl); m_IsValid = false; return; @@ -239,7 +338,7 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, if (DomainEnd == std::string::npos) { - spdlog::warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl); + m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl); m_IsValid = false; return; diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 2ed458142..61d1bd99c 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -6,6 +6,8 @@ #include <zencore/refcount.h> #include <zencore/thread.h> +#include <spdlog/spdlog.h> + #include <atomic> #include <list> #include <memory> @@ -39,6 +41,12 @@ private: std::atomic<uint32_t> m_Serial; }; +struct CloudCacheResult +{ + IoBuffer Value; + bool Success = false; +}; + /** * Context for performing Jupiter operations * @@ -52,17 +60,20 @@ public: CloudCacheSession(CloudCacheClient* OuterClient); ~CloudCacheSession(); - // Key-value cache operations - IoBuffer Get(std::string_view BucketId, std::string_view Key); - void Put(std::string_view BucketId, std::string_view Key, IoBuffer Data); + CloudCacheResult GetDerivedData(std::string_view BucketId, std::string_view Key); + CloudCacheResult GetDerivedData(std::string_view BucketId, const IoHash& Key); + CloudCacheResult GetRef(std::string_view BucketId, const IoHash& Key); + CloudCacheResult GetCompressedBlob(const IoHash& Key); - // Structured cache operations - IoBuffer Get(std::string_view BucketId, const IoHash& Key); - void Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data); + CloudCacheResult PutDerivedData(std::string_view BucketId, std::string_view Key, IoBuffer DerivedData); + CloudCacheResult PutDerivedData(std::string_view BucketId, const IoHash& Key, IoBuffer DerivedData); + CloudCacheResult PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Ref); + CloudCacheResult PutCompressedBlob(const IoHash& Key, IoBuffer Blob); std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); private: + spdlog::logger& m_Log; RefPtr<CloudCacheClient> m_CacheClient; detail::CloudCacheSessionState* m_SessionState; }; @@ -74,28 +85,35 @@ class CloudCacheClient : public RefCounted { public: CloudCacheClient(std::string_view ServiceUrl, - std::string_view Namespace, + std::string_view DdcNamespace, + std::string_view BlobStoreNamespace, std::string_view OAuthProvider, std::string_view OAuthClientId, std::string_view OAuthSecret); ~CloudCacheClient(); bool AcquireAccessToken(std::string& AuthorizationHeaderValue); - std::string_view Namespace() const { return m_Namespace; } + std::string_view DdcNamespace() const { return m_DdcNamespace; } + std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } std::string_view DefaultBucket() const { return m_DefaultBucket; } std::string_view ServiceUrl() const { return m_ServiceUrl; } + bool IsValid() const { return m_IsValid; } + + spdlog::logger& Logger() { return m_Log; } private: - bool m_IsValid = false; + spdlog::logger& m_Log; std::string m_ServiceUrl; std::string m_OAuthDomain; std::string m_OAuthUriPath; std::string m_OAuthFullUri; - std::string m_Namespace; + std::string m_DdcNamespace; + std::string m_BlobStoreNamespace; std::string m_DefaultBucket; std::string m_OAuthClientId; std::string m_OAuthSecret; CloudCacheAccessToken m_AccessToken; + bool m_IsValid = false; RwLock m_SessionStateLock; std::list<detail::CloudCacheSessionState*> m_SessionStateCache; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp new file mode 100644 index 000000000..ecd51a706 --- /dev/null +++ b/zenserver/upstream/upstreamcache.cpp @@ -0,0 +1,347 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "upstreamcache.h" +#include "jupiter.h" +#include "zen.h" + +#include <zencore/fmtutils.h> +#include <zencore/timer.h> +#include <zenstore/cas.h> +#include <zenstore/cidstore.h> + +#include "cache/structuredcachestore.h" +#include "diag/logging.h" + +#include <atomic> +#include <deque> +#include <thread> + +namespace zen { + +using namespace std::literals; + +namespace detail { + + template<typename T> + class BlockingQueue + { + public: + BlockingQueue() = default; + + ~BlockingQueue() { CompleteAdding(); } + + void Enqueue(T&& Item) + { + { + std::lock_guard Lock(m_Lock); + m_Queue.emplace_back(std::move(Item)); + } + + m_NewItemSignal.notify_one(); + } + + bool WaitAndDequeue(T& Item) + { + if (m_CompleteAdding.load()) + { + return false; + } + + std::unique_lock Lock(m_Lock); + m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); + + if (!m_Queue.empty()) + { + Item = std::move(m_Queue.front()); + m_Queue.pop_front(); + + return true; + } + + return false; + } + + void CompleteAdding() + { + if (!m_CompleteAdding.load()) + { + m_CompleteAdding.store(true); + m_NewItemSignal.notify_all(); + } + } + + std::size_t Num() const + { + std::unique_lock Lock(m_Lock); + return m_Queue.size(); + } + + private: + mutable std::mutex m_Lock; + std::condition_variable m_NewItemSignal; + std::deque<T> m_Queue; + std::atomic_bool m_CompleteAdding{false}; + }; + +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +class DefaultUpstreamCache final : public UpstreamCache +{ +public: + DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) + : m_Log(zen::logging::Get("upstream")) + , m_Options(Options) + , m_CacheStore(CacheStore) + , m_CidStore(CidStore) + { + if (m_Options.JupiterEnabled) + { + m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint, + m_Options.JupiterDdcNamespace, + m_Options.JupiterBlobStoreNamespace, + m_Options.JupiterOAuthProvider, + m_Options.JupiterOAuthClientId, + m_Options.JupiterOAuthSecret); + + std::string TmpAuthHeader; + if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid()) + { + m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'", + m_Options.JupiterEndpoint, + m_Options.JupiterDdcNamespace, + m_Options.JupiterBlobStoreNamespace); + } + else + { + m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint); + } + } + + m_IsRunning = m_CloudClient && m_CloudClient->IsValid(); + + if (m_IsRunning) + { + m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount); + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) + { + m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); + } + } + else + { + m_Log.warn("upstream disabled, no valid endpoints"); + } + } + + virtual ~DefaultUpstreamCache() { Shutdown(); } + + virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + { + if (m_CloudClient && m_CloudClient->IsValid()) + { + zen::Stopwatch Timer; + + try + { + CloudCacheSession Session(m_CloudClient); + CloudCacheResult Result; + + if (Type == ZenContentType::kBinary) + { + Result = Session.GetDerivedData(CacheKey.Bucket, CacheKey.Hash); + } + else + { + Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash); + } + + return {.Value = Result.Value, .Success = Result.Success}; + } + catch (std::exception& e) + { + m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'", + CacheKey.Bucket, + CacheKey.Hash, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + } + } + + return {}; + } + + virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + if (m_CloudClient && m_CloudClient->IsValid()) + { + zen::Stopwatch Timer; + + try + { + CloudCacheSession Session(m_CloudClient); + + CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + return {.Payload = Result.Value, .Success = Result.Success}; + } + catch (std::exception& e) + { + m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'", + PayloadKey.CacheKey.Bucket, + PayloadKey.CacheKey.Hash, + PayloadKey.PayloadId, + zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), + e.what()); + } + } + + return {}; + } + + virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + { + if (m_IsRunning.load()) + { + m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + return {.Success = true}; + } + + return {}; + } + +private: + void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) + { + const uint32_t MaxAttempts = 3; + + if (m_CloudClient && m_CloudClient->IsValid()) + { + CloudCacheSession Session(m_CloudClient); + ZenCacheValue CacheValue; + if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) + { + m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash); + return; + } + + if (CacheRecord.Type == ZenContentType::kBinary) + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); + } + + if (!Result.Success) + { + m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + MaxAttempts); + } + } + else + { + ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject); + + CloudCacheResult Result; + for (const IoHash& PayloadId : CacheRecord.PayloadIds) + { + Result.Success = false; + if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + { + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCompressedBlob(PayloadId, Payload); + } + } + + if (!Result.Success) + { + m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts); + break; + } + } + + if (Result.Success) + { + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); + } + } + + if (!Result.Success) + { + m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); + } + } + } + } + + void ProcessUpstreamQueue() + { + for (;;) + { + UpstreamCacheRecord CacheRecord; + if (m_UpstreamQueue.WaitAndDequeue(CacheRecord)) + { + try + { + ProcessCacheRecord(std::move(CacheRecord)); + } + catch (std::exception& e) + { + m_Log.warn("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what()); + } + } + + if (!m_IsRunning.load()) + { + break; + } + } + } + + void Shutdown() + { + if (m_IsRunning.load()) + { + m_IsRunning.store(false); + m_UpstreamQueue.CompleteAdding(); + + for (std::thread& Thread : m_UpstreamThreads) + { + Thread.join(); + } + + m_UpstreamThreads.clear(); + } + } + + using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; + + spdlog::logger& m_Log; + UpstreamCacheOptions m_Options; + ::ZenCacheStore& m_CacheStore; + CidStore& m_CidStore; + UpstreamQueue m_UpstreamQueue; + RefPtr<CloudCacheClient> m_CloudClient; + RefPtr<ZenStructuredCacheClient> m_ZenClient; + std::vector<std::thread> m_UpstreamThreads; + std::atomic_bool m_IsRunning{false}; +}; + +////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<UpstreamCache> +MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) +{ + return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore); +} + +} // namespace zen diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h new file mode 100644 index 000000000..23a542151 --- /dev/null +++ b/zenserver/upstream/upstreamcache.h @@ -0,0 +1,82 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/zencore.h> + +#include <memory> + +class ZenCacheStore; + +namespace zen { + +class CidStore; + +struct UpstreamCacheKey +{ + std::string Bucket; + IoHash Hash; +}; + +struct UpstreamPayloadKey +{ + UpstreamCacheKey CacheKey; + IoHash PayloadId; +}; + +struct UpstreamCacheRecord +{ + ZenContentType Type = ZenContentType::kBinary; + UpstreamCacheKey CacheKey; + std::vector<IoHash> PayloadIds; +}; + +struct UpstreamCacheOptions +{ + uint32_t ThreadCount = 4; + bool JupiterEnabled = true; + std::string_view JupiterEndpoint; + std::string_view JupiterDdcNamespace; + std::string_view JupiterBlobStoreNamespace; + std::string_view JupiterOAuthProvider; + std::string_view JupiterOAuthClientId; + std::string_view JupiterOAuthSecret; +}; + +/** + * Manages one or more upstream cache endpoints. + */ +class UpstreamCache +{ +public: + virtual ~UpstreamCache() = default; + + struct GetCacheRecordResult + { + IoBuffer Value; + bool Success = false; + }; + + virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + + struct GetCachePayloadResult + { + IoBuffer Payload; + bool Success = false; + }; + + virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; + + struct EnqueueResult + { + bool Success = false; + }; + + virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; +}; + +std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore); + +} // namespace zen |