diff options
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 416 |
1 files changed, 280 insertions, 136 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index ecd51a706..40d7ebd26 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -12,6 +12,9 @@ #include "cache/structuredcachestore.h" #include "diag/logging.h" +#include <fmt/format.h> + +#include <algorithm> #include <atomic> #include <deque> #include <thread> @@ -83,70 +86,32 @@ namespace detail { std::atomic_bool m_CompleteAdding{false}; }; -} // namespace detail - -////////////////////////////////////////////////////////////////////////// - -class DefaultUpstreamCache final : public UpstreamCache -{ -public: - DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) - : m_Log(zen::logging::Get("upstream")) - , m_Options(Options) - , m_CacheStore(CacheStore) - , m_CidStore(CidStore) + class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint { - if (m_Options.JupiterEnabled) + public: + JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) { - m_CloudClient = new CloudCacheClient(m_Options.JupiterEndpoint, - m_Options.JupiterDdcNamespace, - m_Options.JupiterBlobStoreNamespace, - m_Options.JupiterOAuthProvider, - m_Options.JupiterOAuthClientId, - m_Options.JupiterOAuthSecret); - - std::string TmpAuthHeader; - if (m_CloudClient->AcquireAccessToken(TmpAuthHeader) && m_CloudClient->IsValid()) - { - m_Log.info("using Jupiter endpoint: '{}', DDC namespace: '{}', Blob Store namespace: '{}'", - m_Options.JupiterEndpoint, - m_Options.JupiterDdcNamespace, - m_Options.JupiterBlobStoreNamespace); - } - else - { - m_Log.warn("failed to initialized Jupiter at '{}'", m_Options.JupiterEndpoint); - } + using namespace fmt::literals; + m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); + m_Client = new CloudCacheClient(Options); } - m_IsRunning = m_CloudClient && m_CloudClient->IsValid(); + virtual ~JupiterUpstreamEndpoint() = default; - if (m_IsRunning) + virtual bool Initialize() override { - m_Log.info("using '{}' upstream thread(s)", m_Options.ThreadCount); - for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) - { - m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); - } + //TODO: Test and authenticate Jupiter client connection + return !m_Client->ServiceUrl().empty(); } - else - { - m_Log.warn("upstream disabled, no valid endpoints"); - } - } - virtual ~DefaultUpstreamCache() { Shutdown(); } + virtual std::string_view DisplayName() const override { return m_DisplayName; } - virtual GetCacheRecordResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override - { - if (m_CloudClient && m_CloudClient->IsValid()) + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { - zen::Stopwatch Timer; - try { - CloudCacheSession Session(m_CloudClient); - CloudCacheResult Result; + zen::CloudCacheSession Session(m_Client); + CloudCacheResult Result; if (Type == ZenContentType::kBinary) { @@ -161,126 +126,293 @@ public: } catch (std::exception& e) { - m_Log.warn("get cache record ({}/{}) FAILED after {:5}: '{}'", - CacheKey.Bucket, - CacheKey.Hash, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + return {.Reason = std::string(e.what()), .Success = false}; } } - return {}; - } + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + try + { + zen::CloudCacheSession Session(m_Client); + const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + return {.Value = Result.Value, .Success = Result.Success}; + } + catch (std::exception& e) + { + return {.Reason = std::string(e.what()), .Success = false}; + } + } - virtual GetCachePayloadResult GetCachePayload(UpstreamPayloadKey PayloadKey) override - { - if (m_CloudClient && m_CloudClient->IsValid()) + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, + IoBuffer RecordValue, + std::span<IoBuffer const> Payloads) override { - zen::Stopwatch Timer; + ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + const uint32_t MaxAttempts = 3; try { - CloudCacheSession Session(m_CloudClient); + CloudCacheSession Session(m_Client); + + if (CacheRecord.Type == ZenContentType::kBinary) + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue); + } - CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); - return {.Payload = Result.Value, .Success = Result.Success}; + return {.Success = Result.Success}; + } + else + { + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + } + + if (!Result.Success) + { + return {.Reason = "Failed to upload payload", .Success = false}; + } + } + + { + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + { + Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue); + } + + return {.Success = Result.Success}; + } + } } catch (std::exception& e) { - m_Log.warn("get cache payload ({}/{}/{}) FAILED after {:5}: '{}'", - PayloadKey.CacheKey.Bucket, - PayloadKey.CacheKey.Hash, - PayloadKey.PayloadId, - zen::NiceTimeSpanMs(Timer.getElapsedTimeMs()), - e.what()); + return {.Reason = std::string(e.what()), .Success = false}; } } - return {}; - } + private: + std::string m_DisplayName; + RefPtr<CloudCacheClient> m_Client; + }; - virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint { - if (m_IsRunning.load()) + public: + ZenUpstreamEndpoint(std::string_view ServiceUrl) { - m_UpstreamQueue.Enqueue(std::move(CacheRecord)); - return {.Success = true}; + using namespace fmt::literals; + m_DisplayName = "Zen - '{}'"_format(ServiceUrl); + m_Client = new ZenStructuredCacheClient(ServiceUrl); } - return {}; - } + ~ZenUpstreamEndpoint() = default; -private: - void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) - { - const uint32_t MaxAttempts = 3; + virtual bool Initialize() override + { + //TODO: Test and authenticate Zen client connection + return !m_Client->ServiceUrl().empty(); + } - if (m_CloudClient && m_CloudClient->IsValid()) + virtual std::string_view DisplayName() const override { return m_DisplayName; } + + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { - CloudCacheSession Session(m_CloudClient); - ZenCacheValue CacheValue; - if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) + try { - m_Log.warn("process upstream FAILED, '{}/{}' doesn't exist in cache", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash); - return; + ZenStructuredCacheSession Session(*m_Client); + const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + return {.Value = Result.Response, .Success = Result.Success}; } - - if (CacheRecord.Type == ZenContentType::kBinary) + catch (std::exception& e) { - CloudCacheResult Result; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutDerivedData(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); - } + return {.Reason = std::string(e.what()), .Success = false}; + } + } - if (!Result.Success) - { - m_Log.warn("upload (binary) '{}/{}' FAILED after '{}' attempts", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - MaxAttempts); - } + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + try + { + ZenStructuredCacheSession Session(*m_Client); + const ZenCacheResult Result = + Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); + return {.Value = Result.Response, .Success = Result.Success}; } - else + catch (std::exception& e) { - ZEN_ASSERT(CacheRecord.Type == ZenContentType::kCbObject); + return {.Reason = std::string(e.what()), .Success = false}; + } + } + + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, + IoBuffer RecordValue, + std::span<IoBuffer const> Payloads) override + { + ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); + const uint32_t MaxAttempts = 3; - CloudCacheResult Result; - for (const IoHash& PayloadId : CacheRecord.PayloadIds) + try + { + zen::ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result; + + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { Result.Success = false; - if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutCompressedBlob(PayloadId, Payload); - } + Result = Session.PutCachePayload(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + CacheRecord.PayloadIds[Idx], + Payloads[Idx]); } if (!Result.Success) { - m_Log.warn("upload payload '{}' FAILED after '{}' attempts", PayloadId, MaxAttempts); - break; + return {.Reason = "Failed to upload payload", .Success = false}; } } - if (Result.Success) + Result.Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - Result.Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue.Value); - } + Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); } - if (!Result.Success) - { - m_Log.warn("upload cache record '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); - } + return {.Success = Result.Success}; + } + catch (std::exception& e) + { + return {.Reason = std::string(e.what()), .Success = false}; + } + } + + private: + std::string m_DisplayName; + RefPtr<zen::ZenStructuredCacheClient> m_Client; + }; + +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +class DefaultUpstreamCache final : public UpstreamCache +{ +public: + DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) + : m_Log(zen::logging::Get("upstream")) + , m_Options(Options) + , m_CacheStore(CacheStore) + , m_CidStore(CidStore) + { + ZEN_ASSERT(m_Options.ThreadCount > 0); + } + + virtual ~DefaultUpstreamCache() { Shutdown(); } + + virtual bool Initialize() override + { + auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) { + const bool Ok = Endpoint->Initialize(); + m_Log.info("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED"); + return !Ok; + }); + + m_Endpoints.erase(NewEnd, std::end(m_Endpoints)); + m_IsRunning = !m_Endpoints.empty(); + + if (m_IsRunning) + { + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) + { + m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); + } + } + + return m_IsRunning; + } + + virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } + + virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + { + for (auto& Endpoint : m_Endpoints) + { + if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + { + return Result; + } + } + + return {}; + } + + virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + { + for (auto& Endpoint : m_Endpoints) + { + if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + { + return Result; + } + } + + return {}; + } + + virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override + { + if (m_IsRunning.load()) + { + m_UpstreamQueue.Enqueue(std::move(CacheRecord)); + return {.Success = true}; + } + + return {}; + } + +private: + void ProcessCacheRecord(UpstreamCacheRecord CacheRecord) + { + ZenCacheValue CacheValue; + std::vector<IoBuffer> Payloads; + + if (!m_CacheStore.Get(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, CacheValue)) + { + m_Log.warn("process upstream FAILED, '{}/{}', cache record doesn't exist", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash); + return; + } + + for (const IoHash& PayloadId : CacheRecord.PayloadIds) + { + if (IoBuffer Payload = m_CidStore.FindChunkByCid(PayloadId)) + { + Payloads.push_back(Payload); + } + else + { + m_Log.warn("process upstream FAILED, '{}/{}/{}', payload doesn't exist in CAS", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + PayloadId); + return; } } + + for (auto& Endpoint : m_Endpoints) + { + Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + } } void ProcessUpstreamQueue() @@ -320,20 +452,20 @@ private: } m_UpstreamThreads.clear(); + m_Endpoints.clear(); } } using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; - spdlog::logger& m_Log; - UpstreamCacheOptions m_Options; - ::ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; - UpstreamQueue m_UpstreamQueue; - RefPtr<CloudCacheClient> m_CloudClient; - RefPtr<ZenStructuredCacheClient> m_ZenClient; - std::vector<std::thread> m_UpstreamThreads; - std::atomic_bool m_IsRunning{false}; + spdlog::logger& m_Log; + UpstreamCacheOptions m_Options; + ::ZenCacheStore& m_CacheStore; + CidStore& m_CidStore; + UpstreamQueue m_UpstreamQueue; + std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints; + std::vector<std::thread> m_UpstreamThreads; + std::atomic_bool m_IsRunning{false}; }; ////////////////////////////////////////////////////////////////////////// @@ -344,4 +476,16 @@ MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheSto return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore); } +std::unique_ptr<UpstreamEndpoint> +MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) +{ + return std::make_unique<detail::JupiterUpstreamEndpoint>(Options); +} + +std::unique_ptr<UpstreamEndpoint> +MakeZenUpstreamEndpoint(std::string_view Url) +{ + return std::make_unique<detail::ZenUpstreamEndpoint>(Url); +} + } // namespace zen |