From c07e8c760ed56db387b6612ed8392d6e78301479 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Tue, 7 Sep 2021 15:11:11 +0200 Subject: Return stats from Zen/Jupiter HTTP client. --- zenserver/upstream/upstreamcache.cpp | 183 +++++++++++++++++++++++++++++++---- 1 file changed, 164 insertions(+), 19 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index f7a91acb5..04ac40a39 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -16,8 +16,10 @@ #include #include +#include #include #include +#include namespace zen { @@ -122,7 +124,10 @@ namespace detail { Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type); } - return {.Value = Result.Value, .Success = Result.Success}; + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -136,7 +141,11 @@ namespace detail { { zen::CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); - return {.Value = Result.Value, .Success = Result.Success}; + + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -173,10 +182,13 @@ namespace detail { } } - return {.Success = Result.Success}; + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } else { + int64_t TotalBytes = 0; + double TotalElapsedSeconds = 0.0; + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { CloudCacheResult Result; @@ -185,24 +197,29 @@ namespace detail { Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); } + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) { - return {.Reason = "Failed to upload payload", .Success = false}; + return {.Reason = "Failed to upload payload", + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = false}; } } + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - CloudCacheResult Result; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); - } - - return {.Success = Result.Success}; + Result = + Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); } + + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } } catch (std::exception& e) @@ -243,7 +260,11 @@ namespace detail { { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); - return {.Value = Result.Response, .Success = Result.Success}; + + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -258,7 +279,11 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); - return {.Value = Result.Response, .Success = Result.Success}; + + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -277,6 +302,8 @@ namespace detail { { zen::ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; + int64_t TotalBytes = 0; + double TotalElapsedSeconds = 0.0; for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { @@ -289,9 +316,15 @@ namespace detail { Payloads[Idx]); } + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) { - return {.Reason = "Failed to upload payload", .Success = false}; + return {.Reason = "Failed to upload payload", + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = false}; } } @@ -301,7 +334,10 @@ namespace detail { Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); } - return {.Success = Result.Success}; + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } catch (std::exception& e) { @@ -318,6 +354,108 @@ namespace detail { ////////////////////////////////////////////////////////////////////////// +class UpstreamStats final +{ + using clock = std::chrono::steady_clock; + using time_point = std::chrono::time_point; + + struct StatCounters + { + int64_t Bytes = {}; + int64_t Count = {}; + double Seconds = {}; + }; + + struct EndpointStats + { + mutable std::mutex Lock; + std::unordered_map Counters; + }; + +public: + UpstreamStats() : m_Log(zen::logging::Get("upstreamstats")) { m_LastTime = clock::now(); } + + void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result) + { + std::unique_lock Lock(m_DownStats.Lock); + + auto& Counters = m_DownStats.Counters[&Endpoint]; + Counters.Bytes += Result.Bytes; + Counters.Seconds += Result.ElapsedSeconds; + Counters.Count++; + + LogStats(); + } + + void Add(const UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result) + { + std::unique_lock Lock(m_UpStats.Lock); + + auto& Counters = m_UpStats.Counters[&Endpoint]; + Counters.Bytes += Result.Bytes; + Counters.Seconds += Result.ElapsedSeconds; + Counters.Count++; + + LogStats(); + } + +private: + void LogStats() + { + using std::chrono::duration_cast; + using std::chrono::seconds; + + const seconds Duration = duration_cast(clock::now() - m_LastTime); + if (Duration < seconds(5)) + { + return; + } + + { + std::unique_lock Lock(m_DownStats.Lock); + for (const auto& Kv : m_DownStats.Counters) + { + const UpstreamEndpoint& Endpoint = *Kv.first; + const StatCounters& Counters = Kv.second; + + m_Log.info("Endpoint: {}, Total downloaded: {}Kb, Total time: {}s, Count: {}, Speed: {} Kb/s, Avg: {} (Req/s)", + Kv.first->DisplayName(), + double(Counters.Bytes) / 1024.0, + Counters.Seconds, + Counters.Count, + (double(Counters.Bytes) / 1024.0) / Counters.Seconds, + double(Counters.Count) / Counters.Seconds); + } + } + + { + std::unique_lock Lock(m_UpStats.Lock); + for (const auto& Kv : m_UpStats.Counters) + { + const UpstreamEndpoint& Endpoint = *Kv.first; + const StatCounters& Counters = Kv.second; + + m_Log.info("Endpoint: {}, Total uploaded: {}Kb, Total time: {}s, Count: {}, Speed: {} Kb/s, Avg: {} (Req/s)", + Kv.first->DisplayName(), + double(Counters.Bytes) / 1024.0, + Counters.Seconds, + Counters.Count, + (double(Counters.Bytes) / 1024.0) / Counters.Seconds, + double(Counters.Count) / Counters.Seconds); + } + } + + m_LastTime = clock::now(); + } + + spdlog::logger& m_Log; + EndpointStats m_UpStats; + EndpointStats m_DownStats; + time_point m_LastTime = {}; +}; + +////////////////////////////////////////////////////////////////////////// + class DefaultUpstreamCache final : public UpstreamCache { public: @@ -362,6 +500,7 @@ public: { if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) { + m_Stats.Add(*Endpoint, Result); return Result; } } @@ -375,6 +514,7 @@ public: { if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) { + m_Stats.Add(*Endpoint, Result); return Result; } } @@ -425,7 +565,11 @@ private: for (auto& Endpoint : m_Endpoints) { - Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + Result.Success) + { + m_Stats.Add(*Endpoint, Result); + } } } @@ -477,6 +621,7 @@ private: ::ZenCacheStore& m_CacheStore; CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; + UpstreamStats m_Stats; std::vector> m_Endpoints; std::vector m_UpstreamThreads; std::atomic_bool m_IsRunning{false}; -- cgit v1.2.3 From 6356b19b0f717f40d388b930c63d4811e757fe50 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Tue, 7 Sep 2021 20:01:06 +0200 Subject: Updated upstream stats calculation. --- zenserver/upstream/upstreamcache.cpp | 83 ++++++++++++++---------------------- 1 file changed, 31 insertions(+), 52 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 04ac40a39..c1002a6da 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -16,8 +16,8 @@ #include #include -#include #include +#include #include #include @@ -356,8 +356,7 @@ namespace detail { class UpstreamStats final { - using clock = std::chrono::steady_clock; - using time_point = std::chrono::time_point; + static constexpr uint64_t MaxSampleCount = 100ull; struct StatCounters { @@ -366,14 +365,16 @@ class UpstreamStats final double Seconds = {}; }; + using StatsMap = std::unordered_map; + struct EndpointStats { - mutable std::mutex Lock; - std::unordered_map Counters; + mutable std::mutex Lock; + StatsMap Counters; }; public: - UpstreamStats() : m_Log(zen::logging::Get("upstreamstats")) { m_LastTime = clock::now(); } + UpstreamStats() : m_Log(zen::logging::Get("upstream")) {} void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result) { @@ -384,7 +385,11 @@ public: Counters.Seconds += Result.ElapsedSeconds; Counters.Count++; - LogStats(); + if (Counters.Count >= MaxSampleCount) + { + LogStats("STATS - (downstream):"sv, m_DownStats.Counters); + Counters = StatCounters{}; + } } void Add(const UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result) @@ -396,62 +401,36 @@ public: Counters.Seconds += Result.ElapsedSeconds; Counters.Count++; - LogStats(); + if (Counters.Count >= MaxSampleCount) + { + LogStats("STATS - (upstream):"sv, m_UpStats.Counters); + Counters = StatCounters{}; + } } private: - void LogStats() + void LogStats(std::string_view What, const std::unordered_map& EndpointStats) { - using std::chrono::duration_cast; - using std::chrono::seconds; - - const seconds Duration = duration_cast(clock::now() - m_LastTime); - if (Duration < seconds(5)) + for (const auto& Kv : EndpointStats) { - return; + const UpstreamEndpoint& Endpoint = *Kv.first; + const StatCounters& Counters = Kv.second; + const double TotalMb = double(Counters.Bytes) / 1024.0 / 1024.0; + + m_Log.info("{} Endpoint: {}, Bytes: {:.2f} MB, Time: {:.2f} s, Speed: {:.2f} MB/s, Avg: {:.2f} ms/request, Samples: {}", + What, + Kv.first->DisplayName(), + TotalMb, + Counters.Seconds, + TotalMb / Counters.Seconds, + (Counters.Seconds * 1000.0) / double(Counters.Count), + Counters.Count); } - - { - std::unique_lock Lock(m_DownStats.Lock); - for (const auto& Kv : m_DownStats.Counters) - { - const UpstreamEndpoint& Endpoint = *Kv.first; - const StatCounters& Counters = Kv.second; - - m_Log.info("Endpoint: {}, Total downloaded: {}Kb, Total time: {}s, Count: {}, Speed: {} Kb/s, Avg: {} (Req/s)", - Kv.first->DisplayName(), - double(Counters.Bytes) / 1024.0, - Counters.Seconds, - Counters.Count, - (double(Counters.Bytes) / 1024.0) / Counters.Seconds, - double(Counters.Count) / Counters.Seconds); - } - } - - { - std::unique_lock Lock(m_UpStats.Lock); - for (const auto& Kv : m_UpStats.Counters) - { - const UpstreamEndpoint& Endpoint = *Kv.first; - const StatCounters& Counters = Kv.second; - - m_Log.info("Endpoint: {}, Total uploaded: {}Kb, Total time: {}s, Count: {}, Speed: {} Kb/s, Avg: {} (Req/s)", - Kv.first->DisplayName(), - double(Counters.Bytes) / 1024.0, - Counters.Seconds, - Counters.Count, - (double(Counters.Bytes) / 1024.0) / Counters.Seconds, - double(Counters.Count) / Counters.Seconds); - } - } - - m_LastTime = clock::now(); } spdlog::logger& m_Log; EndpointStats m_UpStats; EndpointStats m_DownStats; - time_point m_LastTime = {}; }; ////////////////////////////////////////////////////////////////////////// -- cgit v1.2.3 From e0f6f284fc3bb0996628db058130760ae1d650c6 Mon Sep 17 00:00:00 2001 From: Per Larsson Date: Tue, 7 Sep 2021 20:10:46 +0200 Subject: Minor fixup and formatting. --- zenserver/upstream/upstreamcache.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'zenserver/upstream/upstreamcache.cpp') diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index c1002a6da..9e566749a 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -186,7 +186,7 @@ namespace detail { } else { - int64_t TotalBytes = 0; + int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) @@ -302,7 +302,7 @@ namespace detail { { zen::ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; - int64_t TotalBytes = 0; + int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) -- cgit v1.2.3