diff options
| author | Per Larsson <[email protected]> | 2021-09-22 21:21:15 +0200 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-09-22 21:21:15 +0200 |
| commit | ddb84cb54f7cf6777d2ccaed4338fff56b75922c (patch) | |
| tree | bffa905f41526a5ed0ddbefed45573a069a2d845 /zenserver/upstream/upstreamcache.cpp | |
| parent | Made icon resource path relative, as it should be (diff) | |
| download | zen-ddb84cb54f7cf6777d2ccaed4338fff56b75922c.tar.xz zen-ddb84cb54f7cf6777d2ccaed4338fff56b75922c.zip | |
Made upstream endpoints more resilient to failures by checking health/reconnecting at regular intervals.
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 206 |
1 files changed, 154 insertions, 52 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index d6b6d44be..a889fb984 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -23,7 +23,6 @@ #include <algorithm> #include <atomic> #include <deque> -#include <limits> #include <thread> #include <unordered_map> @@ -106,11 +105,23 @@ namespace detail { virtual ~JupiterUpstreamEndpoint() = default; - virtual bool Initialize() override + virtual bool IsHealthy() const override { return m_HealthOk.load(); } + + virtual UpstreamEndpointHealth CheckHealth() override { - CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.Authenticate(); - return Result.Success; + try + { + CloudCacheSession Session(m_Client); + const CloudCacheResult Result = Session.Authenticate(); + + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; + } + catch (std::exception& Err) + { + return {.Reason = Err.what(), .Ok = false}; + } } virtual std::string_view DisplayName() const override { return m_DisplayName; } @@ -143,6 +154,7 @@ namespace detail { CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; + Result.ErrorCode = AttachmentResult.ErrorCode; if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) { @@ -169,14 +181,16 @@ namespace detail { } } + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -187,14 +201,16 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -277,6 +293,7 @@ namespace detail { bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; + std::atomic_bool m_HealthOk{false}; }; class ZenUpstreamEndpoint final : public UpstreamEndpoint @@ -285,27 +302,33 @@ namespace detail { ZenUpstreamEndpoint(std::string_view ServiceUrl) { using namespace fmt::literals; - m_DisplayName = "Zen - '{}'"_format(ServiceUrl); + m_DisplayName = "Zen - {}"_format(ServiceUrl); m_Client = new ZenStructuredCacheClient(ServiceUrl); } ~ZenUpstreamEndpoint() = default; - virtual bool Initialize() override + virtual bool IsHealthy() const override { return m_HealthOk; } + + virtual UpstreamEndpointHealth CheckHealth() override { try { ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; + for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt) { Result = Session.SayHello(); } - return Result.Success; + + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), .Ok = m_HealthOk}; } - catch (std::exception&) + catch (std::exception& Err) { - return false; + return {.Reason = Err.what(), .Ok = false}; } } @@ -318,14 +341,16 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -337,14 +362,16 @@ namespace detail { const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -390,6 +417,8 @@ namespace detail { CacheRecord.CacheKey.Hash, PackagePayload, CacheRecord.Type); + + m_HealthOk = Result.ErrorCode == 0; } TotalBytes = Result.Bytes; @@ -406,6 +435,8 @@ namespace detail { CacheRecord.CacheKey.Hash, CacheRecord.PayloadIds[Idx], Payloads[Idx]); + + m_HealthOk = Result.ErrorCode == 0; } TotalBytes += Result.Bytes; @@ -425,6 +456,8 @@ namespace detail { { Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + + m_HealthOk = Result.ErrorCode == 0; } TotalBytes += Result.Bytes; @@ -435,6 +468,7 @@ namespace detail { } catch (std::exception& e) { + m_HealthOk = false; return {.Reason = std::string(e.what()), .Success = false}; } } @@ -442,6 +476,7 @@ namespace detail { private: std::string m_DisplayName; RefPtr<ZenStructuredCacheClient> m_Client; + std::atomic_bool m_HealthOk{false}; }; } // namespace detail @@ -548,27 +583,35 @@ public: virtual bool Initialize() override { - auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) { - const bool Ok = Endpoint->Initialize(); - ZEN_INFO("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED"); - return !Ok; - }); + for (auto& Endpoint : m_Endpoints) + { + const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); + if (Health.Ok) + { + ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName()); + } + else + { + ZEN_WARN("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } - m_Endpoints.erase(NewEnd, std::end(m_Endpoints)); - m_IsRunning = !m_Endpoints.empty(); + m_RunState.IsRunning = !m_Endpoints.empty(); - if (m_IsRunning) + if (m_RunState.IsRunning) { for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); } + + m_EndpointMonitorThread = std::thread(&DefaultUpstreamCache::MonitorEndpoints, this); } - return m_IsRunning; + return m_RunState.IsRunning; } - virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } + virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { @@ -576,10 +619,13 @@ public: { for (auto& Endpoint : m_Endpoints) { - if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + if (Endpoint->IsHealthy()) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } } @@ -593,10 +639,13 @@ public: { for (auto& Endpoint : m_Endpoints) { - if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + if (Endpoint->IsHealthy()) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } } @@ -606,7 +655,7 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { - if (m_IsRunning.load() && m_Options.WriteUpstream) + if (m_RunState.IsRunning && m_Options.WriteUpstream) { if (!m_UpstreamThreads.empty()) { @@ -655,18 +704,21 @@ private: for (auto& Endpoint : m_Endpoints) { - const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); - if (Result.Success) + if (Endpoint->IsHealthy()) { - m_Stats.Add(*Endpoint, Result); - } - else - { - ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - Endpoint->DisplayName(), - Result.Reason); + const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + if (Result.Success) + { + m_Stats.Add(*Endpoint, Result); + } + else + { + ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->DisplayName(), + Result.Reason); + } } } } @@ -688,33 +740,82 @@ private: } } - if (!m_IsRunning.load()) + if (!m_RunState.IsRunning) { break; } } } + void MonitorEndpoints() + { + for (;;) + { + { + std::unique_lock lk(m_RunState.Mutex); + if (m_RunState.ExitSignal.wait_for(lk, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) + { + break; + } + } + + for (auto& Endpoint : m_Endpoints) + { + if (!Endpoint->IsHealthy()) + { + if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + { + ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); + } + else + { + ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } + } + } + } + void Shutdown() { - if (m_IsRunning.load()) + if (m_RunState.Stop()) { - m_IsRunning.store(false); m_UpstreamQueue.CompleteAdding(); - for (std::thread& Thread : m_UpstreamThreads) { Thread.join(); } + m_EndpointMonitorThread.join(); m_UpstreamThreads.clear(); m_Endpoints.clear(); } } + spdlog::logger& Log() { return m_Log; } + using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; - spdlog::logger& Log() { return m_Log; } + struct RunState + { + std::mutex Mutex; + std::condition_variable ExitSignal; + std::atomic_bool IsRunning{false}; + + bool Stop() + { + bool Stopped = false; + { + std::lock_guard _(Mutex); + Stopped = IsRunning.exchange(false); + } + if (Stopped) + { + ExitSignal.notify_all(); + } + return Stopped; + } + }; spdlog::logger& m_Log; UpstreamCacheOptions m_Options; @@ -724,7 +825,8 @@ private: UpstreamStats m_Stats; std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; std::vector<std::thread> m_UpstreamThreads; - std::atomic_bool m_IsRunning{false}; + std::thread m_EndpointMonitorThread; + RunState m_RunState; }; ////////////////////////////////////////////////////////////////////////// |