diff options
| author | Martin Ridgers <[email protected]> | 2021-09-21 11:06:13 +0200 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-09-21 11:06:13 +0200 |
| commit | 68c951e0f440ffd483795dced737e88152c1a581 (patch) | |
| tree | 5c0910ca2a85b45fb05dba3ce457b7d156213894 /zenserver/upstream/upstreamcache.cpp | |
| parent | Merge main into linux-mac (diff) | |
| parent | Trigger storage scrubbing pass at startup (diff) | |
| download | zen-68c951e0f440ffd483795dced737e88152c1a581.tar.xz zen-68c951e0f440ffd483795dced737e88152c1a581.zip | |
Merged main into linux-mac
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 97 |
1 files changed, 58 insertions, 39 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 38d30a795..d6b6d44be 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -94,7 +94,7 @@ namespace detail { std::atomic_bool m_CompleteAdding{false}; }; - class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint + class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc) @@ -108,8 +108,9 @@ namespace detail { virtual bool Initialize() override { - // TODO: Test and authenticate Jupiter client connection - return !m_Client->ServiceUrl().empty(); + CloudCacheSession Session(m_Client); + const CloudCacheResult Result = Session.Authenticate(); + return Result.Success; } virtual std::string_view DisplayName() const override { return m_DisplayName; } @@ -118,8 +119,8 @@ namespace detail { { try { - zen::CloudCacheSession Session(m_Client); - CloudCacheResult Result; + CloudCacheSession Session(m_Client); + CloudCacheResult Result; if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { @@ -134,7 +135,7 @@ namespace detail { { CbPackage Package; - const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All); + const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); @@ -183,7 +184,7 @@ namespace detail { { try { - zen::CloudCacheSession Session(m_Client); + CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); return {.Value = Result.Response, @@ -278,7 +279,7 @@ namespace detail { RefPtr<CloudCacheClient> m_Client; }; - class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint + class ZenUpstreamEndpoint final : public UpstreamEndpoint { public: ZenUpstreamEndpoint(std::string_view ServiceUrl) @@ -292,8 +293,20 @@ namespace detail { virtual bool Initialize() override { - // TODO: Test and authenticate Zen client connection - return !m_Client->ServiceUrl().empty(); + try + { + ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result; + for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt) + { + Result = Session.SayHello(); + } + return Result.Success; + } + catch (std::exception&) + { + return false; + } } virtual std::string_view DisplayName() const override { return m_DisplayName; } @@ -344,14 +357,14 @@ namespace detail { try { - zen::ZenStructuredCacheSession Session(*m_Client); - ZenCacheResult Result; - int64_t TotalBytes = 0ull; - double TotalElapsedSeconds = 0.0; + ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result; + int64_t TotalBytes = 0ull; + double TotalElapsedSeconds = 0.0; if (CacheRecord.Type == ZenContentType::kCbPackage) { - zen::CbPackage Package; + CbPackage Package; Package.SetObject(CbObject(SharedBuffer(RecordValue))); for (const IoBuffer& Payload : Payloads) @@ -427,8 +440,8 @@ namespace detail { } private: - std::string m_DisplayName; - RefPtr<zen::ZenStructuredCacheClient> m_Client; + std::string m_DisplayName; + RefPtr<ZenStructuredCacheClient> m_Client; }; } // namespace detail @@ -455,7 +468,7 @@ class UpstreamStats final }; public: - UpstreamStats() : m_Log(zen::logging::Get("upstream")) {} + UpstreamStats() : m_Log(logging::Get("upstream")) {} void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result) { @@ -523,8 +536,8 @@ private: class DefaultUpstreamCache final : public UpstreamCache { public: - DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) - : m_Log(zen::logging::Get("upstream")) + DefaultUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) + : m_Log(logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) , m_CidStore(CidStore) @@ -559,12 +572,15 @@ public: virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { - for (auto& Endpoint : m_Endpoints) + if (m_Options.ReadUpstream) { - if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + for (auto& Endpoint : m_Endpoints) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } @@ -573,12 +589,15 @@ public: virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override { - for (auto& Endpoint : m_Endpoints) + if (m_Options.ReadUpstream) { - if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + for (auto& Endpoint : m_Endpoints) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } @@ -587,7 +606,7 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { - if (m_IsRunning.load()) + if (m_IsRunning.load() && m_Options.WriteUpstream) { if (!m_UpstreamThreads.empty()) { @@ -697,21 +716,21 @@ private: spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - UpstreamCacheOptions m_Options; - ::ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; - UpstreamQueue m_UpstreamQueue; - UpstreamStats m_Stats; - std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints; - std::vector<std::thread> m_UpstreamThreads; - std::atomic_bool m_IsRunning{false}; + spdlog::logger& m_Log; + UpstreamCacheOptions m_Options; + ZenCacheStore& m_CacheStore; + CidStore& m_CidStore; + UpstreamQueue m_UpstreamQueue; + UpstreamStats m_Stats; + std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; + std::vector<std::thread> m_UpstreamThreads; + std::atomic_bool m_IsRunning{false}; }; ////////////////////////////////////////////////////////////////////////// std::unique_ptr<UpstreamCache> -MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) +MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) { return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore); } |