From 82a032af24dfefa508c384536e6b5b7dbe65ccb8 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Fri, 1 Oct 2021 13:29:02 +0200 Subject: Improved error handling for upstream endpoints. --- zenserver/upstream/upstreamcache.cpp | 86 ++++++++++++++++++++++++------------ 1 file changed, 57 insertions(+), 29 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 03054b542..58a5b1ff3 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -186,16 +186,23 @@ namespace detail { } } - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -206,16 +213,23 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -429,16 +443,23 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -450,16 +471,23 @@ namespace detail { const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -575,7 +603,7 @@ namespace detail { struct UpstreamStats { - static constexpr uint64_t MaxSampleCount = 100ull; + static constexpr uint64_t MaxSampleCount = 1000ull; UpstreamStats(bool Enabled) : m_Enabled(Enabled) {} -- cgit v1.2.3 From be6e8143f2ea0c4acc87608f651058627b2c229c Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Fri, 1 Oct 2021 14:45:59 +0200 Subject: Added cache HIT/MISS stat counters. --- zenserver/upstream/upstreamcache.cpp | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 58a5b1ff3..b1966e299 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -612,11 +612,6 @@ struct UpstreamStats const GetUpstreamCacheResult& Result, const std::vector>& Endpoints) { - if (!m_Enabled) - { - return; - } - UpstreamEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) @@ -634,7 +629,7 @@ struct UpstreamStats Stats.MissCount++; } - if (m_SampleCount++ % MaxSampleCount) + if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } @@ -645,11 +640,6 @@ struct UpstreamStats const PutUpstreamCacheResult& Result, const std::vector>& Endpoints) { - if (!m_Enabled) - { - return; - } - UpstreamEndpointStats& Stats = Endpoint.Stats(); if (Result.Success) { @@ -662,7 +652,7 @@ struct UpstreamStats Stats.ErrorCount++; } - if (m_SampleCount++ % MaxSampleCount) + if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } -- cgit v1.2.3 From 22d25f59c7ead3de0b5d335684242b7364bce8f1 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Sat, 2 Oct 2021 16:31:52 +0200 Subject: Added support for choosing best ZEN upstream endpoint based on latency. --- zenserver/upstream/upstreamcache.cpp | 100 ++++++++++++++++++++++++++++++++--- 1 file changed, 93 insertions(+), 7 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index b1966e299..a183e7ebf 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -111,6 +111,8 @@ namespace detail { virtual ~JupiterUpstreamEndpoint() = default; + virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } + virtual bool IsHealthy() const override { return m_HealthOk.load(); } virtual UpstreamEndpointHealth CheckHealth() override @@ -400,22 +402,70 @@ namespace detail { class ZenUpstreamEndpoint final : public UpstreamEndpoint { + struct ZenEndpoint + { + std::string Url; + std::string Reason; + double Latency{}; + bool Ok = false; + + bool operator<(const ZenEndpoint& RHS) const { return Ok && RHS.Ok ? Latency < RHS.Latency : Ok; } + }; + public: - ZenUpstreamEndpoint(std::string_view ServiceUrl) + ZenUpstreamEndpoint(std::span Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN") { - using namespace fmt::literals; - m_DisplayName = "Zen - {}"_format(ServiceUrl); - m_Client = new ZenStructuredCacheClient(ServiceUrl); + for (const auto& Url : Urls) + { + m_Endpoints.push_back({.Url = Url}); + } } ~ZenUpstreamEndpoint() = default; + virtual UpstreamEndpointHealth Initialize() override + { + using namespace fmt::literals; + + const ZenEndpoint& Ep = GetEndpoint(); + if (Ep.Ok) + { + m_ServiceUrl = Ep.Url; + m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); + m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + + m_HealthOk = true; + return {.Ok = true}; + } + + m_HealthOk = false; + return {.Reason = Ep.Reason}; + } + virtual bool IsHealthy() const override { return m_HealthOk; } virtual UpstreamEndpointHealth CheckHealth() override { + using namespace fmt::literals; + try { + if (m_Client.IsNull()) + { + const ZenEndpoint& Ep = GetEndpoint(); + if (Ep.Ok) + { + m_ServiceUrl = Ep.Url; + m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); + m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + + m_HealthOk = true; + return {.Ok = true}; + } + + return {.Reason = Ep.Reason}; + } + ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; @@ -591,6 +641,42 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + const ZenEndpoint& GetEndpoint() + { + for (ZenEndpoint& Ep : m_Endpoints) + { + ZenStructuredCacheClient Client(Ep.Url); + ZenStructuredCacheSession Session(Client); + const int32_t SampleCount = 2; + + Ep.Ok = false; + Ep.Latency = {}; + + for (int32_t Sample = 0; Sample < SampleCount; ++Sample) + { + ZenCacheResult Result = Session.CheckHealth(); + Ep.Ok = Result.Success; + Ep.Reason = std::move(Result.Reason); + Ep.Latency += Result.ElapsedSeconds; + } + Ep.Latency /= double(SampleCount); + } + + std::sort(std::begin(m_Endpoints), std::end(m_Endpoints)); + + for (const auto& Ep : m_Endpoints) + { + ZEN_INFO("ping ZEN endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason); + } + + return m_Endpoints.front(); + } + + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; + std::string m_ServiceUrl; + std::vector m_Endpoints; std::string m_DisplayName; RefPtr m_Client; UpstreamEndpointStats m_Stats; @@ -711,7 +797,7 @@ public: { for (auto& Endpoint : m_Endpoints) { - const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); + const UpstreamEndpointHealth Health = Endpoint->Initialize(); if (Health.Ok) { ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName()); @@ -993,9 +1079,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) } std::unique_ptr -MakeZenUpstreamEndpoint(std::string_view Url) +MakeZenUpstreamEndpoint(std::span Urls) { - return std::make_unique(Url); + return std::make_unique(Urls); } } // namespace zen -- cgit v1.2.3 From 20ac7384f8ca558f1fb933eda846604792240ea0 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Tue, 5 Oct 2021 22:25:53 +0200 Subject: Merged from upstream --- zenserver/upstream/upstreamcache.cpp | 68 ++---------------------------------- 1 file changed, 2 insertions(+), 66 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index a183e7ebf..ba5991b3f 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -23,7 +24,6 @@ #include #include -#include #include #include @@ -33,70 +33,6 @@ using namespace std::literals; namespace detail { - template - class BlockingQueue - { - public: - BlockingQueue() = default; - - ~BlockingQueue() { CompleteAdding(); } - - void Enqueue(T&& Item) - { - { - std::lock_guard Lock(m_Lock); - m_Queue.emplace_back(std::move(Item)); - m_Size++; - } - - m_NewItemSignal.notify_one(); - } - - bool WaitAndDequeue(T& Item) - { - if (m_CompleteAdding.load()) - { - return false; - } - - std::unique_lock Lock(m_Lock); - m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); - - if (!m_Queue.empty()) - { - Item = std::move(m_Queue.front()); - m_Queue.pop_front(); - m_Size--; - - return true; - } - - return false; - } - - void CompleteAdding() - { - if (!m_CompleteAdding.load()) - { - m_CompleteAdding.store(true); - m_NewItemSignal.notify_all(); - } - } - - std::size_t Size() const - { - std::unique_lock Lock(m_Lock); - return m_Queue.size(); - } - - private: - mutable std::mutex m_Lock; - std::condition_variable m_NewItemSignal; - std::deque m_Queue; - std::atomic_bool m_CompleteAdding{false}; - std::atomic_uint32_t m_Size; - }; - class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: @@ -1029,7 +965,7 @@ private: spdlog::logger& Log() { return m_Log; } - using UpstreamQueue = detail::BlockingQueue; + using UpstreamQueue = BlockingQueue; struct RunState { -- cgit v1.2.3 From cc372979a47512f927ba8fdae6c3151df7211f45 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Tue, 5 Oct 2021 22:31:34 +0200 Subject: clang-format --- zenserver/upstream/upstreamcache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index ba5991b3f..9ed392faf 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -4,11 +4,11 @@ #include "jupiter.h" #include "zen.h" +#include #include #include #include #include -#include #include #include #include -- cgit v1.2.3 From fa48ebf89e06edc9d3bdd26b119417df20902bdd Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Wed, 6 Oct 2021 13:59:18 +0200 Subject: Support for asynchronous HTTP response processing (#19) This change introduces WriteResponseAsync which can be used to move potentially slow request handler code (like upstream lookups) off the I/O service thread to ensure we are always able to serve as many HTTP requests as possible. The current implementation defaults to 16 async worker threads and there is currently no back-pressure. - Added RequestStats - Metrics for network requests. Aggregates tracking of duration, payload sizes into a single class for ease of use - Added some metrics on upstream communication Co-authored-by: Per Larsson --- zenserver/upstream/upstreamcache.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 9ed392faf..5b2629f72 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -41,7 +41,7 @@ namespace detail { , m_UseLegacyDdc(Options.UseLegacyDdc) { using namespace fmt::literals; - m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); + m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl); m_Client = new CloudCacheClient(Options); } -- cgit v1.2.3