aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-10-07 08:29:50 +0200
committerMartin Ridgers <[email protected]>2021-10-07 08:29:50 +0200
commit03232621d183f22e12e798a753e4a606763e63d6 (patch)
tree5701d202392dd4ab947139e4046a44ab9bc6cdf7 /zenserver/upstream/upstreamcache.cpp
parentMerged main (diff)
parentOnly enable the MSVC debug output sink for sessions when the --debug mode is ... (diff)
downloadzen-03232621d183f22e12e798a753e4a606763e63d6.tar.xz
zen-03232621d183f22e12e798a753e4a606763e63d6.zip
Merged main
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp270
1 files changed, 155 insertions, 115 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 03054b542..5b2629f72 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -4,6 +4,7 @@
#include "jupiter.h"
#include "zen.h"
+#include <zencore/blockingqueue.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
@@ -23,7 +24,6 @@
#include <algorithm>
#include <atomic>
-#include <deque>
#include <thread>
#include <unordered_map>
@@ -33,70 +33,6 @@ using namespace std::literals;
namespace detail {
- template<typename T>
- class BlockingQueue
- {
- public:
- BlockingQueue() = default;
-
- ~BlockingQueue() { CompleteAdding(); }
-
- void Enqueue(T&& Item)
- {
- {
- std::lock_guard Lock(m_Lock);
- m_Queue.emplace_back(std::move(Item));
- m_Size++;
- }
-
- m_NewItemSignal.notify_one();
- }
-
- bool WaitAndDequeue(T& Item)
- {
- if (m_CompleteAdding.load())
- {
- return false;
- }
-
- std::unique_lock Lock(m_Lock);
- m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); });
-
- if (!m_Queue.empty())
- {
- Item = std::move(m_Queue.front());
- m_Queue.pop_front();
- m_Size--;
-
- return true;
- }
-
- return false;
- }
-
- void CompleteAdding()
- {
- if (!m_CompleteAdding.load())
- {
- m_CompleteAdding.store(true);
- m_NewItemSignal.notify_all();
- }
- }
-
- std::size_t Size() const
- {
- std::unique_lock Lock(m_Lock);
- return m_Queue.size();
- }
-
- private:
- mutable std::mutex m_Lock;
- std::condition_variable m_NewItemSignal;
- std::deque<T> m_Queue;
- std::atomic_bool m_CompleteAdding{false};
- std::atomic_uint32_t m_Size;
- };
-
class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
@@ -105,12 +41,14 @@ namespace detail {
, m_UseLegacyDdc(Options.UseLegacyDdc)
{
using namespace fmt::literals;
- m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl);
+ m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl);
m_Client = new CloudCacheClient(Options);
}
virtual ~JupiterUpstreamEndpoint() = default;
+ virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
+
virtual bool IsHealthy() const override { return m_HealthOk.load(); }
virtual UpstreamEndpointHealth CheckHealth() override
@@ -186,16 +124,23 @@ namespace detail {
}
}
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -206,16 +151,23 @@ namespace detail {
CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -386,22 +338,70 @@ namespace detail {
class ZenUpstreamEndpoint final : public UpstreamEndpoint
{
+ struct ZenEndpoint
+ {
+ std::string Url;
+ std::string Reason;
+ double Latency{};
+ bool Ok = false;
+
+ bool operator<(const ZenEndpoint& RHS) const { return Ok && RHS.Ok ? Latency < RHS.Latency : Ok; }
+ };
+
public:
- ZenUpstreamEndpoint(std::string_view ServiceUrl)
+ ZenUpstreamEndpoint(std::span<std::string const> Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN")
{
- using namespace fmt::literals;
- m_DisplayName = "Zen - {}"_format(ServiceUrl);
- m_Client = new ZenStructuredCacheClient(ServiceUrl);
+ for (const auto& Url : Urls)
+ {
+ m_Endpoints.push_back({.Url = Url});
+ }
}
~ZenUpstreamEndpoint() = default;
+ virtual UpstreamEndpointHealth Initialize() override
+ {
+ using namespace fmt::literals;
+
+ const ZenEndpoint& Ep = GetEndpoint();
+ if (Ep.Ok)
+ {
+ m_ServiceUrl = Ep.Url;
+ m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
+ m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+
+ m_HealthOk = true;
+ return {.Ok = true};
+ }
+
+ m_HealthOk = false;
+ return {.Reason = Ep.Reason};
+ }
+
virtual bool IsHealthy() const override { return m_HealthOk; }
virtual UpstreamEndpointHealth CheckHealth() override
{
+ using namespace fmt::literals;
+
try
{
+ if (m_Client.IsNull())
+ {
+ const ZenEndpoint& Ep = GetEndpoint();
+ if (Ep.Ok)
+ {
+ m_ServiceUrl = Ep.Url;
+ m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
+ m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+
+ m_HealthOk = true;
+ return {.Ok = true};
+ }
+
+ return {.Reason = Ep.Reason};
+ }
+
ZenStructuredCacheSession Session(*m_Client);
ZenCacheResult Result;
@@ -429,16 +429,23 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -450,16 +457,23 @@ namespace detail {
const ZenCacheResult Result =
Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Value = Result.Response,
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds,
- .Success = Result.Success};
+ if (Result.ErrorCode == 0)
+ {
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
+ }
+ else
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
}
catch (std::exception& Err)
{
- return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
}
}
@@ -563,6 +577,42 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ const ZenEndpoint& GetEndpoint()
+ {
+ for (ZenEndpoint& Ep : m_Endpoints)
+ {
+ ZenStructuredCacheClient Client(Ep.Url);
+ ZenStructuredCacheSession Session(Client);
+ const int32_t SampleCount = 2;
+
+ Ep.Ok = false;
+ Ep.Latency = {};
+
+ for (int32_t Sample = 0; Sample < SampleCount; ++Sample)
+ {
+ ZenCacheResult Result = Session.CheckHealth();
+ Ep.Ok = Result.Success;
+ Ep.Reason = std::move(Result.Reason);
+ Ep.Latency += Result.ElapsedSeconds;
+ }
+ Ep.Latency /= double(SampleCount);
+ }
+
+ std::sort(std::begin(m_Endpoints), std::end(m_Endpoints));
+
+ for (const auto& Ep : m_Endpoints)
+ {
+ ZEN_INFO("ping ZEN endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason);
+ }
+
+ return m_Endpoints.front();
+ }
+
+ spdlog::logger& Log() { return m_Log; }
+
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
+ std::vector<ZenEndpoint> m_Endpoints;
std::string m_DisplayName;
RefPtr<ZenStructuredCacheClient> m_Client;
UpstreamEndpointStats m_Stats;
@@ -575,7 +625,7 @@ namespace detail {
struct UpstreamStats
{
- static constexpr uint64_t MaxSampleCount = 100ull;
+ static constexpr uint64_t MaxSampleCount = 1000ull;
UpstreamStats(bool Enabled) : m_Enabled(Enabled) {}
@@ -584,11 +634,6 @@ struct UpstreamStats
const GetUpstreamCacheResult& Result,
const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- if (!m_Enabled)
- {
- return;
- }
-
UpstreamEndpointStats& Stats = Endpoint.Stats();
if (Result.Error)
@@ -606,7 +651,7 @@ struct UpstreamStats
Stats.MissCount++;
}
- if (m_SampleCount++ % MaxSampleCount)
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
{
Dump(Logger, Endpoints);
}
@@ -617,11 +662,6 @@ struct UpstreamStats
const PutUpstreamCacheResult& Result,
const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- if (!m_Enabled)
- {
- return;
- }
-
UpstreamEndpointStats& Stats = Endpoint.Stats();
if (Result.Success)
{
@@ -634,7 +674,7 @@ struct UpstreamStats
Stats.ErrorCount++;
}
- if (m_SampleCount++ % MaxSampleCount)
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
{
Dump(Logger, Endpoints);
}
@@ -693,7 +733,7 @@ public:
{
for (auto& Endpoint : m_Endpoints)
{
- const UpstreamEndpointHealth Health = Endpoint->CheckHealth();
+ const UpstreamEndpointHealth Health = Endpoint->Initialize();
if (Health.Ok)
{
ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName());
@@ -925,7 +965,7 @@ private:
spdlog::logger& Log() { return m_Log; }
- using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>;
+ using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>;
struct RunState
{
@@ -975,9 +1015,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
}
std::unique_ptr<UpstreamEndpoint>
-MakeZenUpstreamEndpoint(std::string_view Url)
+MakeZenUpstreamEndpoint(std::span<std::string const> Urls)
{
- return std::make_unique<detail::ZenUpstreamEndpoint>(Url);
+ return std::make_unique<detail::ZenUpstreamEndpoint>(Urls);
}
} // namespace zen