diff options
| author | Per Larsson <[email protected]> | 2021-09-23 17:24:56 +0200 |
|---|---|---|
| committer | Per Larsson <[email protected]> | 2021-09-23 17:24:56 +0200 |
| commit | cbf8832318cd47d6379884d622dbceca8dea3e8c (patch) | |
| tree | 62310fd3b33f40cc9bbb9e454a05eb85a5fe1ca2 /zenserver/upstream/upstreamcache.cpp | |
| parent | Merge branch 'main' of https://github.com/EpicGames/zen (diff) | |
| download | zen-cbf8832318cd47d6379884d622dbceca8dea3e8c.tar.xz zen-cbf8832318cd47d6379884d622dbceca8dea3e8c.zip | |
Simpler upstream stats. Enabled with --upstream-stats.
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 203 |
1 files changed, 111 insertions, 92 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 9d43462c0..f056c1c76 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -153,6 +153,7 @@ namespace detail { CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + Result.Bytes += AttachmentResult.Bytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; Result.ErrorCode = AttachmentResult.ErrorCode; @@ -176,7 +177,6 @@ namespace detail { Package.Save(Writer); Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - Result.Bytes = MemStream.Size(); } } } @@ -247,21 +247,26 @@ namespace detail { } else { + bool Success = false; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { - CloudCacheResult Result; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) { - Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + Result.Success) + { + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + Success = true; + break; + } } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - - if (!Result.Success) + if (!Success) { return {.Reason = "Failed to upload payload", .Bytes = TotalBytes, @@ -270,29 +275,38 @@ namespace detail { } } - CloudCacheResult Result; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) + Success = false; + for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) { - Result = - Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); + if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RecordValue, + ZenContentType::kCbObject); + Result.Success) + { + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + Success = true; + break; + } } - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success}; } } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Reason = std::string(Err.what()), .Success = false}; } } + virtual UpstreamEndpointStats& Stats() override { return m_Stats; } + private: bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; + UpstreamEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; }; @@ -473,9 +487,12 @@ namespace detail { } } + virtual UpstreamEndpointStats& Stats() override { return m_Stats; } + private: std::string m_DisplayName; RefPtr<ZenStructuredCacheClient> m_Client; + UpstreamEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; }; @@ -483,87 +500,95 @@ namespace detail { ////////////////////////////////////////////////////////////////////////// -class UpstreamStats final +struct UpstreamStats { static constexpr uint64_t MaxSampleCount = 100ull; - struct StatCounters - { - int64_t Bytes = {}; - int64_t Count = {}; - double Seconds = {}; - }; - - using StatsMap = std::unordered_map<const UpstreamEndpoint*, StatCounters>; - - struct EndpointStats - { - mutable std::mutex Lock; - StatsMap Counters; - }; - -public: - UpstreamStats() : m_Log(logging::Get("upstream")) {} + UpstreamStats(bool Enabled) : m_Enabled(Enabled) {} - void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result) + void Add(spdlog::logger& Logger, + UpstreamEndpoint& Endpoint, + const GetUpstreamCacheResult& Result, + const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - std::unique_lock Lock(m_DownStats.Lock); + if (!m_Enabled) + { + return; + } - auto& Counters = m_DownStats.Counters[&Endpoint]; - Counters.Bytes += Result.Bytes; - Counters.Seconds += Result.ElapsedSeconds; - Counters.Count++; + UpstreamEndpointStats& Stats = Endpoint.Stats(); + if (Result.Success) + { + Stats.HitCount++; + Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); + Stats.SecondsDown.fetch_add(Result.ElapsedSeconds); + } + else + { + Stats.MissCount++; + } - if (Counters.Count >= MaxSampleCount) + if (m_SampleCount++ % MaxSampleCount) { - LogStats("STATS - (downstream):"sv, m_DownStats.Counters); - Counters = StatCounters{}; + Dump(Logger, Endpoints); } } - void Add(const UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result) + void Add(spdlog::logger& Logger, + UpstreamEndpoint& Endpoint, + const PutUpstreamCacheResult& Result, + const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - std::unique_lock Lock(m_UpStats.Lock); + if (!m_Enabled) + { + return; + } - auto& Counters = m_UpStats.Counters[&Endpoint]; - Counters.Bytes += Result.Bytes; - Counters.Seconds += Result.ElapsedSeconds; - Counters.Count++; + UpstreamEndpointStats& Stats = Endpoint.Stats(); + if (Result.Success) + { + Stats.UpCount++; + Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); + Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); + } - if (Counters.Count >= MaxSampleCount) + if (m_SampleCount++ % MaxSampleCount) { - LogStats("STATS - (upstream):"sv, m_UpStats.Counters); - Counters = StatCounters{}; + Dump(Logger, Endpoints); } } -private: - void LogStats(std::string_view What, const std::unordered_map<const UpstreamEndpoint*, StatCounters>& EndpointStats) + void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - for (const auto& Kv : EndpointStats) + for (auto& Ep : Endpoints) { - const UpstreamEndpoint& Endpoint = *Kv.first; - const StatCounters& Counters = Kv.second; - const double TotalMb = double(Counters.Bytes) / 1024.0 / 1024.0; - - ZEN_UNUSED(Endpoint); - - ZEN_INFO("{} Endpoint: {}, Bytes: {:.2f} MB, Time: {:.2f} s, Speed: {:.2f} MB/s, Avg: {:.2f} ms/request, Samples: {}", - What, - Kv.first->DisplayName(), - TotalMb, - Counters.Seconds, - TotalMb / Counters.Seconds, - (Counters.Seconds * 1000.0) / double(Counters.Count), - Counters.Count); + // These stats will not be totally correct as the numbers are not captured atomically + + UpstreamEndpointStats& Stats = Ep->Stats(); + const uint64_t HitCount = Stats.HitCount; + const uint64_t MissCount = Stats.MissCount; + const double DownBytes = Stats.DownBytes; + const double SecondsDown = Stats.SecondsDown; + const double UpBytes = Stats.UpBytes; + const double SecondsUp = Stats.SecondsUp; + + const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0; + const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0; + const uint64_t TotalCount = HitCount + MissCount; + const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0; + + Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", + Ep->DisplayName(), + HitRate, + DownBytes, + DownSpeed, + UpBytes, + UpSpeed); } } - spdlog::logger& Log() { return m_Log; } - - spdlog::logger& m_Log; - EndpointStats m_UpStats; - EndpointStats m_DownStats; + bool m_Enabled; + std::atomic_uint64_t m_SampleCount = {}; }; ////////////////////////////////////////////////////////////////////////// @@ -576,6 +601,7 @@ public: , m_Options(Options) , m_CacheStore(CacheStore) , m_CidStore(CidStore) + , m_Stats(Options.StatsEnabled) { } @@ -621,9 +647,11 @@ public: { if (Endpoint->IsHealthy()) { - if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + const GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (Result.Success) { - m_Stats.Add(*Endpoint, Result); return Result; } } @@ -641,9 +669,11 @@ public: { if (Endpoint->IsHealthy()) { - if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (Result.Success) { - m_Stats.Add(*Endpoint, Result); return Result; } } @@ -707,18 +737,7 @@ private: if (Endpoint->IsHealthy()) { const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); - if (Result.Success) - { - m_Stats.Add(*Endpoint, Result); - } - else - { - ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - Endpoint->DisplayName(), - Result.Reason); - } + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); } } } |