diff options
| author | Per Larsson <[email protected]> | 2021-09-03 15:37:19 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-09-03 15:37:19 +0200 |
| commit | c04fa527593da17c719c4d899ec19caeb6480a94 (patch) | |
| tree | fa65fd7585d55712bad546d76a0fc3518023a905 /zenserver/upstream | |
| parent | oops: Fixed AssertException implementation namespace (diff) | |
| download | zen-c04fa527593da17c719c4d899ec19caeb6480a94.tar.xz zen-c04fa527593da17c719c4d899ec19caeb6480a94.zip | |
Zen upstream support (#7)
Diffstat (limited to 'zenserver/upstream')
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 38 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 19 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 416 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 64 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 130 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 25 |
6 files changed, 477 insertions, 215 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index a59b9d317..7e22b9af9 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -52,7 +52,7 @@ namespace detail { cpr::Session Session; }; - void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response) + static void Log(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response) { std::string_view ContentType = "unknown"sv; if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) @@ -299,24 +299,18 @@ CloudCacheAccessToken::SetToken(std::string_view Token) // OAuthSecret: -GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d // -CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, - std::string_view DdcNamespace, - std::string_view BlobStoreNamespace, - std::string_view OAuthProvider, - std::string_view OAuthClientId, - std::string_view OAuthSecret) +CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options) : m_Log(zen::logging::Get("jupiter")) -, m_ServiceUrl(ServiceUrl) -, m_OAuthFullUri(OAuthProvider) -, m_DdcNamespace(DdcNamespace) -, m_BlobStoreNamespace(BlobStoreNamespace) -, m_DefaultBucket("default") -, m_OAuthClientId(OAuthClientId) -, m_OAuthSecret(OAuthSecret) +, m_ServiceUrl(Options.ServiceUrl) +, m_OAuthFullUri(Options.OAuthProvider) +, m_DdcNamespace(Options.DdcNamespace) +, m_BlobStoreNamespace(Options.BlobStoreNamespace) +, m_OAuthClientId(Options.OAuthClientId) +, m_OAuthSecret(Options.OAuthSecret) { - if (!OAuthProvider.starts_with("http://"sv) && !OAuthProvider.starts_with("https://"sv)) + if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv)) { - m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(OAuthProvider).c_str()); + m_Log.warn("bad provider specification: '{}' - must be fully qualified"_format(Options.OAuthProvider).c_str()); m_IsValid = false; return; @@ -324,28 +318,28 @@ CloudCacheClient::CloudCacheClient(std::string_view ServiceUrl, // Split into host and Uri substrings - auto SchemePos = OAuthProvider.find("://"sv); + auto SchemePos = Options.OAuthProvider.find("://"sv); if (SchemePos == std::string::npos) { - m_Log.warn("Bad service URL passed to cloud cache client: '{}'", ServiceUrl); + m_Log.warn("Bad service URL passed to cloud cache client: '{}'", Options.ServiceUrl); m_IsValid = false; return; } - auto DomainEnd = OAuthProvider.find('/', /* also skip the :// */ SchemePos + 3); + auto DomainEnd = Options.OAuthProvider.find('/', /* also skip the :// */ SchemePos + 3); if (DomainEnd == std::string::npos) { - m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", ServiceUrl); + m_Log.warn("Bad service URL passed to cloud cache client: '{}' no path delimiter found", Options.ServiceUrl); m_IsValid = false; return; } - m_OAuthDomain = OAuthProvider.substr(SchemePos + 3, DomainEnd - SchemePos - 3); // epicgames.okta.com - m_OAuthUriPath = OAuthProvider.substr(DomainEnd + 1); // oauth2/..../v1/token + m_OAuthDomain = Options.OAuthProvider.substr(SchemePos + 3, DomainEnd - SchemePos - 3); // epicgames.okta.com + m_OAuthUriPath = Options.OAuthProvider.substr(DomainEnd + 1); // oauth2/..../v1/token } CloudCacheClient::~CloudCacheClient() diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 61d1bd99c..ba5e0a65a 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -78,24 +78,28 @@ private: detail::CloudCacheSessionState* m_SessionState; }; +struct CloudCacheClientOptions +{ + std::string_view ServiceUrl; + std::string_view DdcNamespace; + std::string_view BlobStoreNamespace; + std::string_view OAuthProvider; + std::string_view OAuthClientId; + std::string_view OAuthSecret; +}; + /** * Jupiter upstream cache client */ class CloudCacheClient : public RefCounted { public: - CloudCacheClient(std::string_view ServiceUrl, - std::string_view DdcNamespace, - std::string_view BlobStoreNamespace, - std::string_view OAuthProvider, - std::string_view OAuthClientId, - std::string_view OAuthSecret); + CloudCacheClient(const CloudCacheClientOptions& Options); ~CloudCacheClient(); bool AcquireAccessToken(std::string& AuthorizationHeaderValue); std::string_view DdcNamespace() const { return m_DdcNamespace; } std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } - std::string_view DefaultBucket() const { return m_DefaultBucket; } std::string_view ServiceUrl() const { return m_ServiceUrl; } bool IsValid() const { return m_IsValid; } @@ -109,7 +113,6 @@ private: std::string m_OAuthFullUri; std::string m_DdcNamespace; std::string m_BlobStoreNamespace; - std::string m_DefaultBucket; std::string m_OAuthClientId; std::string m_OAuthSecret; CloudCacheAccessToken m_AccessToken; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index ecd51a706..40d7ebd26 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -12,6 +12,9 @@ #include "cache/structuredcachestore.h" #include "diag/logging.h" +#include <fmt/format.h> + +#include <algorithm> #include <atomic> #include <deque> #include <thread> @@ -83,70 +86,32 @@ namespace detail { std::atomic_bool m_CompleteAdding{false}; }; -} // namespace detail - -////////////////////////////////////////////////////////////////////////// - -class DefaultUpstreamCache final : public UpstreamCache -{ -public: - DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) - : m_Log(zen::logging::Get("upstream")) - , m_Options(Options) - , m_CacheStore(CacheStore) - , m_CidStore(CidStore) + class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint { - if (m_Options.JupiterEnabled) + public: + JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) { - m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint, - m_Options.JupiterDdcNamespace, - m_Options.JupiterBlobStoreNamespace, - m_Options.JupiterOAuthProvider, - m_Options.JupiterOAuthClientId, - m_Options.JupiterOAuthSecret); - - std::string TmpAuthHeader; - if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid()) - { - m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'", - m_Options.JupiterEndpoint, - m_Options.JupiterDdcNamespace, - m_Options.JupiterBlobStoreNamespace); - } - else - { - m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint); - } + using namespace fmt::literals; + m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); + m_Client = new CloudCacheClient(Options); } - m_IsRunning = m_CloudClient && m_CloudClient->IsValid(); + virtual ~JupiterUpstreamEndpoint() = default; - if (m_IsRunning) + virtual bool Initialize() override { - m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount); - for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) - { - m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); - } + //TODO: Test and authenticate Jupiter client connection + return !m_Client->ServiceUrl().empty(); } - else - { - m_Log.warn("upstream disabled, no valid endpoints"); - } - } - virtual ~DefaultUpstreamCache() { Shutdown(); } + virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override - { - if (m_CloudClient && m_CloudClient->IsValid()) + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { - zen::Stopwatch Timer; - try { - CloudCacheSession Session(m_CloudClient); - CloudCacheResult Result; + zen::CloudCacheSession Session(m_Client); + CloudCacheResult Result; if (Type == ZenContentType::kBinary) { @@ -161,126 +126,293 @@ public: } catch (std::exception& e) { - m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'", - CacheKey.Bucket, - CacheKey.Hash, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + return {.Reason = std::string(e.what()), .Success = false}; } } - return {}; - } + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + try + { + zen::CloudCacheSession Session(m_Client); + const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + return {.Value = Result.Value, .Success = Result.Success}; + } + catch (std::exception& e) + { + return {.Reason = std::string(e.what()), .Success = false}; + } + } - virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override - { - if (m_CloudClient && m_CloudClient->IsValid()) + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, + IoBuffer RecordValue, + std::span<IoBuffer const> Payloads) override { - zen::Stopwatch Timer; + ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + const uint32_t MaxAttempts = 3; try { - CloudCacheSession Session(m_CloudClient); + CloudCacheSession Session(m_Client); + + if (CacheRecord.Type == ZenContentType::kBinary) + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue); + } - CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); - return {.Payload = Result.Value, .Success = Result.Success}; + return {.Success = Result.Success}; + } + else + { + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + } + + if (!Result.Success) + { + return {.Reason = "Failed to upload payload", .Success = false}; + } + } + + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue); + } + + return {.Success = Result.Success}; + } + } } catch (std::exception& e) { - m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'", - PayloadKey.CacheKey.Bucket, - PayloadKey.CacheKey.Hash, - PayloadKey.PayloadId, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + return {.Reason = std::string(e.what()), .Success = false}; } } - return {}; - } + private: + std::string m_DisplayName; + RefPtr<CloudCacheClient> m_Client; + }; - virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint { - if (m_IsRunning.load()) + public: + ZenUpstreamEndpoint(std::string_view ServiceUrl) { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); - return {.Success = true}; + using namespace fmt::literals; + m_DisplayName = "Zen - '{}'"_format(ServiceUrl); + m_Client = new ZenStructuredCacheClient(ServiceUrl); } - return {}; - } + ~ZenUpstreamEndpoint() = default; -private: - void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) - { - const uint32_t MaxAttempts = 3; + virtual bool Initialize() override + { + //TODO: Test and authenticate Zen client connection + return !m_Client->ServiceUrl().empty(); + } - if (m_CloudClient && m_CloudClient->IsValid()) + virtual std::string_view DisplayName() const override { return m_DisplayName; } + + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { - CloudCacheSession Session(m_CloudClient); - ZenCacheValue CacheValue; - if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) + try { - m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash); - return; + ZenStructuredCacheSession Session(*m_Client); + const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + return {.Value = Result.Response, .Success = Result.Success}; } - - if (CacheRecord.Type == ZenContentType::kBinary) + catch (std::exception& e) { - CloudCacheResult Result; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); - } + return {.Reason = std::string(e.what()), .Success = false}; + } + } - if (!Result.Success) - { - m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - MaxAttempts); - } + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + try + { + ZenStructuredCacheSession Session(*m_Client); + const ZenCacheResult Result = + Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); + return {.Value = Result.Response, .Success = Result.Success}; } - else + catch (std::exception& e) { - ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject); + return {.Reason = std::string(e.what()), .Success = false}; + } + } + + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, + IoBuffer RecordValue, + std::span<IoBuffer const> Payloads) override + { + ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + const uint32_t MaxAttempts = 3; - CloudCacheResult Result; - for (const IoHash& PayloadId : CacheRecord.PayloadIds) + try + { + zen::ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result; + + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { Result.Success = false; - if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutCompressedBlob(PayloadId, Payload); - } + Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + CacheRecord.PayloadIds[Idx], + Payloads[Idx]); } if (!Result.Success) { - m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts); - break; + return {.Reason = "Failed to upload payload", .Success = false}; } } - if (Result.Success) + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); - } + Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); } - if (!Result.Success) - { - m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); - } + return {.Success = Result.Success}; + } + catch (std::exception& e) + { + return {.Reason = std::string(e.what()), .Success = false}; + } + } + + private: + std::string m_DisplayName; + RefPtr<zen::ZenStructuredCacheClient> m_Client; + }; + +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +class DefaultUpstreamCache final : public UpstreamCache +{ +public: + DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) + : m_Log(zen::logging::Get("upstream")) + , m_Options(Options) + , m_CacheStore(CacheStore) + , m_CidStore(CidStore) + { + ZEN_ASSERT(m_Options.ThreadCount > 0); + } + + virtual ~DefaultUpstreamCache() { Shutdown(); } + + virtual bool Initialize() override + { + auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) { + const bool Ok = Endpoint->Initialize(); + m_Log.info("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED"); + return !Ok; + }); + + m_Endpoints.erase(NewEnd, std::end(m_Endpoints)); + m_IsRunning = !m_Endpoints.empty(); + + if (m_IsRunning) + { + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) + { + m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); + } + } + + return m_IsRunning; + } + + virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } + + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + { + for (auto& Endpoint : m_Endpoints) + { + if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + { + return Result; + } + } + + return {}; + } + + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + for (auto& Endpoint : m_Endpoints) + { + if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + { + return Result; + } + } + + return {}; + } + + virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + { + if (m_IsRunning.load()) + { + m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + return {.Success = true}; + } + + return {}; + } + +private: + void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) + { + ZenCacheValue CacheValue; + std::vector<IoBuffer> Payloads; + + if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) + { + m_Log.warn("process upstream FAILED, '{}/{}', cache record doesn't exist", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash); + return; + } + + for (const IoHash& PayloadId : CacheRecord.PayloadIds) + { + if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + { + Payloads.push_back(Payload); + } + else + { + m_Log.warn("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + PayloadId); + return; } } + + for (auto& Endpoint : m_Endpoints) + { + Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + } } void ProcessUpstreamQueue() @@ -320,20 +452,20 @@ private: } m_UpstreamThreads.clear(); + m_Endpoints.clear(); } } using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; - spdlog::logger& m_Log; - UpstreamCacheOptions m_Options; - ::ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; - UpstreamQueue m_UpstreamQueue; - RefPtr<CloudCacheClient> m_CloudClient; - RefPtr<ZenStructuredCacheClient> m_ZenClient; - std::vector<std::thread> m_UpstreamThreads; - std::atomic_bool m_IsRunning{false}; + spdlog::logger& m_Log; + UpstreamCacheOptions m_Options; + ::ZenCacheStore& m_CacheStore; + CidStore& m_CidStore; + UpstreamQueue m_UpstreamQueue; + std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints; + std::vector<std::thread> m_UpstreamThreads; + std::atomic_bool m_IsRunning{false}; }; ////////////////////////////////////////////////////////////////////////// @@ -344,4 +476,16 @@ MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheSto return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore); } +std::unique_ptr<UpstreamEndpoint> +MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) +{ + return std::make_unique<detail::JupiterUpstreamEndpoint>(Options); +} + +std::unique_ptr<UpstreamEndpoint> +MakeZenUpstreamEndpoint(std::string_view Url) +{ + return std::make_unique<detail::ZenUpstreamEndpoint>(Url); +} + } // namespace zen diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 23a542151..d8359bc2c 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -13,6 +13,7 @@ class ZenCacheStore; namespace zen { class CidStore; +struct CloudCacheClientOptions; struct UpstreamCacheKey { @@ -35,14 +36,41 @@ struct UpstreamCacheRecord struct UpstreamCacheOptions { - uint32_t ThreadCount = 4; - bool JupiterEnabled = true; - std::string_view JupiterEndpoint; - std::string_view JupiterDdcNamespace; - std::string_view JupiterBlobStoreNamespace; - std::string_view JupiterOAuthProvider; - std::string_view JupiterOAuthClientId; - std::string_view JupiterOAuthSecret; + uint32_t ThreadCount = 4; +}; + +struct GetUpstreamCacheResult +{ + IoBuffer Value; + std::string Reason; + bool Success = false; +}; + +struct PutUpstreamCacheResult +{ + std::string Reason; + bool Success; +}; + +/** + * The upstream endpont is responsible for handling upload/downloading of cache records. + */ +class UpstreamEndpoint +{ +public: + virtual ~UpstreamEndpoint() = default; + + virtual bool Initialize() = 0; + + virtual std::string_view DisplayName() const = 0; + + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; + + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, + IoBuffer RecordValue, + std::span<IoBuffer const> Payloads) = 0; }; /** @@ -53,21 +81,13 @@ class UpstreamCache public: virtual ~UpstreamCache() = default; - struct GetCacheRecordResult - { - IoBuffer Value; - bool Success = false; - }; + virtual bool Initialize() = 0; - virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; - struct GetCachePayloadResult - { - IoBuffer Payload; - bool Success = false; - }; + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; - virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; struct EnqueueResult { @@ -79,4 +99,8 @@ public: std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore); +std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options); + +std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::string_view Url); + } // namespace zen diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index e9102ad45..3d4999e5d 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -8,6 +8,7 @@ #include <zencore/stream.h> #include "cache/structuredcachestore.h" +#include "diag/logging.h" // cpr //////////////////////////////////////////////////////////////////// // @@ -20,7 +21,6 @@ #include <cpr/cpr.h> #pragma warning(pop) -#include <spdlog/spdlog.h> #include <xxhash.h> #include <gsl/gsl-lite.hpp> @@ -322,6 +322,45 @@ namespace detail { ZenStructuredCacheClient& OwnerClient; cpr::Session Session; }; + + static void LogResponse(spdlog::logger& Log, std::string_view Verb, const cpr::Response& Response) + { + using namespace std::literals; + + std::string_view ContentType = "unknown"sv; + if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) + { + ContentType = It->second; + } + + const uint64_t Bytes = Verb == "GET"sv ? Response.downloaded_bytes : Response.uploaded_bytes; + + const bool IsBinary = + ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || ContentType == "application/octet-stream"; + + if (IsBinary) + { + Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}' '{}' Bytes, Reason: '{}'", + Verb, + Response.url.str(), + Response.status_code, + Response.elapsed, + ContentType, + Bytes, + Response.reason); + } + else + { + Log.debug("{} '{}', Status: '{}', Elapsed: '{}', Content-Type: '{}': '{}', Reason: '{}'", + Verb, + Response.url.str(), + Response.status_code, + Response.elapsed, + ContentType, + Response.text, + Response.reason); + } + } } // namespace detail ////////////////////////////////////////////////////////////////////////// @@ -364,50 +403,99 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) ////////////////////////////////////////////////////////////////////////// -ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) : m_Client(OuterClient) +using namespace std::literals; + +ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) +: m_Log(zen::logging::Get("zenclient"sv)) +, m_Client(OuterClient) { + m_SessionState = m_Client.AllocSessionState(); } ZenStructuredCacheSession::~ZenStructuredCacheSession() { + m_Client.FreeSessionState(m_SessionState); } -IoBuffer -ZenStructuredCacheSession::Get(std::string_view BucketId, std::string_view Key) +ZenCacheResult +ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type) { - ZEN_UNUSED(BucketId, Key); + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Accept", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); + + cpr::Response Response = Session.Get(); + detail::LogResponse(m_Log, "GET"sv, Response); + + if (Response.status_code == 200) + { + return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true}; + } return {}; } -void -ZenStructuredCacheSession::Put(std::string_view BucketId, std::string_view Key, IoBuffer Data) +ZenCacheResult +ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId) { - ZEN_UNUSED(BucketId, Key, Data); -} + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); -// Structured cache operations + cpr::Session& Session = m_SessionState->Session; -IoBuffer -ZenStructuredCacheSession::Get(std::string_view BucketId, const IoHash& Key) -{ - ZEN_UNUSED(BucketId, Key); + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); + + cpr::Response Response = Session.Get(); + detail::LogResponse(m_Log, "GET"sv, Response); + + if (Response.status_code == 200) + { + return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true}; + } return {}; } -IoBuffer -ZenStructuredCacheSession::Get(std::string_view BucketId, const IoHash& Key, const IoHash& ContentId) +ZenCacheResult +ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type) { - ZEN_UNUSED(BucketId, Key, ContentId); + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); - return {}; + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader( + cpr::Header{{"Content-Type", Type == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"}}); + Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()}); + + cpr::Response Response = Session.Put(); + detail::LogResponse(m_Log, "PUT"sv, Response); + + return {.Success = Response.status_code == 200}; } -void -ZenStructuredCacheSession::Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data) +ZenCacheResult +ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload) { - ZEN_UNUSED(BucketId, Key, Data); + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); + + cpr::Session& Session = m_SessionState->Session; + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}}); + Session.SetBody(cpr::Body{static_cast<const char*>(Payload.Data()), Payload.Size()}); + + cpr::Response Response = Session.Put(); + detail::LogResponse(m_Log, "PUT"sv, Response); + + return {.Success = Response.status_code == 200}; } } // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 1d7d5752e..5df6da4a3 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -20,6 +20,10 @@ struct ZenCacheValue; +namespace spdlog { +class logger; +} + namespace zen { class CbObjectWriter; @@ -81,6 +85,12 @@ namespace detail { struct ZenCacheSessionState; } +struct ZenCacheResult +{ + IoBuffer Response; + bool Success = false; +}; + /** Zen Structured Cache session * * This provides a context in which cache queries can be performed @@ -93,16 +103,13 @@ public: ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient); ~ZenStructuredCacheSession(); - // Key-value cache operations - IoBuffer Get(std::string_view BucketId, std::string_view Key); - void Put(std::string_view BucketId, std::string_view Key, IoBuffer Data); - - // Structured cache operations - IoBuffer Get(std::string_view BucketId, const IoHash& Key); - IoBuffer Get(std::string_view BucketId, const IoHash& Key, const IoHash& ContentId); - void Put(std::string_view BucketId, const IoHash& Key, ZenCacheValue Data); + ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); + ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId); + ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); + ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload); private: + spdlog::logger& m_Log; ZenStructuredCacheClient& m_Client; detail::ZenCacheSessionState* m_SessionState; }; @@ -118,6 +125,8 @@ public: ZenStructuredCacheClient(std::string_view ServiceUrl); ~ZenStructuredCacheClient(); + std::string_view ServiceUrl() const { return m_ServiceUrl; } + private: std::string m_ServiceUrl; |