diff options
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 162 |
1 files changed, 143 insertions, 19 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index f7a91acb5..9e566749a 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -17,7 +17,9 @@ #include <algorithm> #include <atomic> #include <deque> +#include <limits> #include <thread> +#include <unordered_map> namespace zen { @@ -122,7 +124,10 @@ namespace detail { Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type); } - return {.Value = Result.Value, .Success = Result.Success}; + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -136,7 +141,11 @@ namespace detail { { zen::CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); - return {.Value = Result.Value, .Success = Result.Success}; + + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -173,10 +182,13 @@ namespace detail { } } - return {.Success = Result.Success}; + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } else { + int64_t TotalBytes = 0ull; + double TotalElapsedSeconds = 0.0; + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { CloudCacheResult Result; @@ -185,24 +197,29 @@ namespace detail { Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); } + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) { - return {.Reason = "Failed to upload payload", .Success = false}; + return {.Reason = "Failed to upload payload", + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = false}; } } + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - CloudCacheResult Result; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); - } - - return {.Success = Result.Success}; + Result = + Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); } + + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } } catch (std::exception& e) @@ -243,7 +260,11 @@ namespace detail { { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); - return {.Value = Result.Response, .Success = Result.Success}; + + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -258,7 +279,11 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); - return {.Value = Result.Response, .Success = Result.Success}; + + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -277,6 +302,8 @@ namespace detail { { zen::ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; + int64_t TotalBytes = 0ull; + double TotalElapsedSeconds = 0.0; for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { @@ -289,9 +316,15 @@ namespace detail { Payloads[Idx]); } + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) { - return {.Reason = "Failed to upload payload", .Success = false}; + return {.Reason = "Failed to upload payload", + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = false}; } } @@ -301,7 +334,10 @@ namespace detail { Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); } - return {.Success = Result.Success}; + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } catch (std::exception& e) { @@ -318,6 +354,87 @@ namespace detail { ////////////////////////////////////////////////////////////////////////// +class UpstreamStats final +{ + static constexpr uint64_t MaxSampleCount = 100ull; + + struct StatCounters + { + int64_t Bytes = {}; + int64_t Count = {}; + double Seconds = {}; + }; + + using StatsMap = std::unordered_map<const UpstreamEndpoint*, StatCounters>; + + struct EndpointStats + { + mutable std::mutex Lock; + StatsMap Counters; + }; + +public: + UpstreamStats() : m_Log(zen::logging::Get("upstream")) {} + + void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result) + { + std::unique_lock Lock(m_DownStats.Lock); + + auto& Counters = m_DownStats.Counters[&Endpoint]; + Counters.Bytes += Result.Bytes; + Counters.Seconds += Result.ElapsedSeconds; + Counters.Count++; + + if (Counters.Count >= MaxSampleCount) + { + LogStats("STATS - (downstream):"sv, m_DownStats.Counters); + Counters = StatCounters{}; + } + } + + void Add(const UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result) + { + std::unique_lock Lock(m_UpStats.Lock); + + auto& Counters = m_UpStats.Counters[&Endpoint]; + Counters.Bytes += Result.Bytes; + Counters.Seconds += Result.ElapsedSeconds; + Counters.Count++; + + if (Counters.Count >= MaxSampleCount) + { + LogStats("STATS - (upstream):"sv, m_UpStats.Counters); + Counters = StatCounters{}; + } + } + +private: + void LogStats(std::string_view What, const std::unordered_map<const UpstreamEndpoint*, StatCounters>& EndpointStats) + { + for (const auto& Kv : EndpointStats) + { + const UpstreamEndpoint& Endpoint = *Kv.first; + const StatCounters& Counters = Kv.second; + const double TotalMb = double(Counters.Bytes) / 1024.0 / 1024.0; + + m_Log.info("{} Endpoint: {}, Bytes: {:.2f} MB, Time: {:.2f} s, Speed: {:.2f} MB/s, Avg: {:.2f} ms/request, Samples: {}", + What, + Kv.first->DisplayName(), + TotalMb, + Counters.Seconds, + TotalMb / Counters.Seconds, + (Counters.Seconds * 1000.0) / double(Counters.Count), + Counters.Count); + } + } + + spdlog::logger& m_Log; + EndpointStats m_UpStats; + EndpointStats m_DownStats; +}; + +////////////////////////////////////////////////////////////////////////// + class DefaultUpstreamCache final : public UpstreamCache { public: @@ -362,6 +479,7 @@ public: { if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) { + m_Stats.Add(*Endpoint, Result); return Result; } } @@ -375,6 +493,7 @@ public: { if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) { + m_Stats.Add(*Endpoint, Result); return Result; } } @@ -425,7 +544,11 @@ private: for (auto& Endpoint : m_Endpoints) { - Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + Result.Success) + { + m_Stats.Add(*Endpoint, Result); + } } } @@ -477,6 +600,7 @@ private: ::ZenCacheStore& m_CacheStore; CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; + UpstreamStats m_Stats; std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints; std::vector<std::thread> m_UpstreamThreads; std::atomic_bool m_IsRunning{false}; |