aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-09-23 17:24:56 +0200
committerPer Larsson <[email protected]>2021-09-23 17:24:56 +0200
commitcbf8832318cd47d6379884d622dbceca8dea3e8c (patch)
tree62310fd3b33f40cc9bbb9e454a05eb85a5fe1ca2 /zenserver/upstream/upstreamcache.cpp
parentMerge branch 'main' of https://github.com/EpicGames/zen (diff)
downloadzen-cbf8832318cd47d6379884d622dbceca8dea3e8c.tar.xz
zen-cbf8832318cd47d6379884d622dbceca8dea3e8c.zip
Simpler upstream stats. Enabled with --upstream-stats.
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp203
1 files changed, 111 insertions, 92 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 9d43462c0..f056c1c76 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -153,6 +153,7 @@ namespace detail {
CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ Result.Bytes += AttachmentResult.Bytes;
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
Result.ErrorCode = AttachmentResult.ErrorCode;
@@ -176,7 +177,6 @@ namespace detail {
Package.Save(Writer);
Result.Response = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- Result.Bytes = MemStream.Size();
}
}
}
@@ -247,21 +247,26 @@ namespace detail {
}
else
{
+ bool Success = false;
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
{
- CloudCacheResult Result;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
{
- Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ Result.Success)
+ {
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ Success = true;
+ break;
+ }
}
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
-
- if (!Result.Success)
+ if (!Success)
{
return {.Reason = "Failed to upload payload",
.Bytes = TotalBytes,
@@ -270,29 +275,38 @@ namespace detail {
}
}
- CloudCacheResult Result;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
+ Success = false;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
{
- Result =
- Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject);
+ if (CloudCacheResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ RecordValue,
+ ZenContentType::kCbObject);
+ Result.Success)
+ {
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+ Success = true;
+ break;
+ }
}
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
-
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success};
}
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Reason = std::string(Err.what()), .Success = false};
}
}
+ virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
+
private:
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
+ UpstreamEndpointStats m_Stats;
std::atomic_bool m_HealthOk{false};
};
@@ -473,9 +487,12 @@ namespace detail {
}
}
+ virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
+
private:
std::string m_DisplayName;
RefPtr<ZenStructuredCacheClient> m_Client;
+ UpstreamEndpointStats m_Stats;
std::atomic_bool m_HealthOk{false};
};
@@ -483,87 +500,95 @@ namespace detail {
//////////////////////////////////////////////////////////////////////////
-class UpstreamStats final
+struct UpstreamStats
{
static constexpr uint64_t MaxSampleCount = 100ull;
- struct StatCounters
- {
- int64_t Bytes = {};
- int64_t Count = {};
- double Seconds = {};
- };
-
- using StatsMap = std::unordered_map<const UpstreamEndpoint*, StatCounters>;
-
- struct EndpointStats
- {
- mutable std::mutex Lock;
- StatsMap Counters;
- };
-
-public:
- UpstreamStats() : m_Log(logging::Get("upstream")) {}
+ UpstreamStats(bool Enabled) : m_Enabled(Enabled) {}
- void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result)
+ void Add(spdlog::logger& Logger,
+ UpstreamEndpoint& Endpoint,
+ const GetUpstreamCacheResult& Result,
+ const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- std::unique_lock Lock(m_DownStats.Lock);
+ if (!m_Enabled)
+ {
+ return;
+ }
- auto& Counters = m_DownStats.Counters[&Endpoint];
- Counters.Bytes += Result.Bytes;
- Counters.Seconds += Result.ElapsedSeconds;
- Counters.Count++;
+ UpstreamEndpointStats& Stats = Endpoint.Stats();
+ if (Result.Success)
+ {
+ Stats.HitCount++;
+ Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
+ Stats.SecondsDown.fetch_add(Result.ElapsedSeconds);
+ }
+ else
+ {
+ Stats.MissCount++;
+ }
- if (Counters.Count >= MaxSampleCount)
+ if (m_SampleCount++ % MaxSampleCount)
{
- LogStats("STATS - (downstream):"sv, m_DownStats.Counters);
- Counters = StatCounters{};
+ Dump(Logger, Endpoints);
}
}
- void Add(const UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result)
+ void Add(spdlog::logger& Logger,
+ UpstreamEndpoint& Endpoint,
+ const PutUpstreamCacheResult& Result,
+ const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- std::unique_lock Lock(m_UpStats.Lock);
+ if (!m_Enabled)
+ {
+ return;
+ }
- auto& Counters = m_UpStats.Counters[&Endpoint];
- Counters.Bytes += Result.Bytes;
- Counters.Seconds += Result.ElapsedSeconds;
- Counters.Count++;
+ UpstreamEndpointStats& Stats = Endpoint.Stats();
+ if (Result.Success)
+ {
+ Stats.UpCount++;
+ Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
+ Stats.SecondsUp.fetch_add(Result.ElapsedSeconds);
+ }
- if (Counters.Count >= MaxSampleCount)
+ if (m_SampleCount++ % MaxSampleCount)
{
- LogStats("STATS - (upstream):"sv, m_UpStats.Counters);
- Counters = StatCounters{};
+ Dump(Logger, Endpoints);
}
}
-private:
- void LogStats(std::string_view What, const std::unordered_map<const UpstreamEndpoint*, StatCounters>& EndpointStats)
+ void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints)
{
- for (const auto& Kv : EndpointStats)
+ for (auto& Ep : Endpoints)
{
- const UpstreamEndpoint& Endpoint = *Kv.first;
- const StatCounters& Counters = Kv.second;
- const double TotalMb = double(Counters.Bytes) / 1024.0 / 1024.0;
-
- ZEN_UNUSED(Endpoint);
-
- ZEN_INFO("{} Endpoint: {}, Bytes: {:.2f} MB, Time: {:.2f} s, Speed: {:.2f} MB/s, Avg: {:.2f} ms/request, Samples: {}",
- What,
- Kv.first->DisplayName(),
- TotalMb,
- Counters.Seconds,
- TotalMb / Counters.Seconds,
- (Counters.Seconds * 1000.0) / double(Counters.Count),
- Counters.Count);
+ // These stats will not be totally correct as the numbers are not captured atomically
+
+ UpstreamEndpointStats& Stats = Ep->Stats();
+ const uint64_t HitCount = Stats.HitCount;
+ const uint64_t MissCount = Stats.MissCount;
+ const double DownBytes = Stats.DownBytes;
+ const double SecondsDown = Stats.SecondsDown;
+ const double UpBytes = Stats.UpBytes;
+ const double SecondsUp = Stats.SecondsUp;
+
+ const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0;
+ const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0;
+ const uint64_t TotalCount = HitCount + MissCount;
+ const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) * 100.0 : 0.0;
+
+ Logger.info("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
+ Ep->DisplayName(),
+ HitRate,
+ DownBytes,
+ DownSpeed,
+ UpBytes,
+ UpSpeed);
}
}
- spdlog::logger& Log() { return m_Log; }
-
- spdlog::logger& m_Log;
- EndpointStats m_UpStats;
- EndpointStats m_DownStats;
+ bool m_Enabled;
+ std::atomic_uint64_t m_SampleCount = {};
};
//////////////////////////////////////////////////////////////////////////
@@ -576,6 +601,7 @@ public:
, m_Options(Options)
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
+ , m_Stats(Options.StatsEnabled)
{
}
@@ -621,9 +647,11 @@ public:
{
if (Endpoint->IsHealthy())
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ const GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type);
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+
+ if (Result.Success)
{
- m_Stats.Add(*Endpoint, Result);
return Result;
}
}
@@ -641,9 +669,11 @@ public:
{
if (Endpoint->IsHealthy())
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey);
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+
+ if (Result.Success)
{
- m_Stats.Add(*Endpoint, Result);
return Result;
}
}
@@ -707,18 +737,7 @@ private:
if (Endpoint->IsHealthy())
{
const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
- if (Result.Success)
- {
- m_Stats.Add(*Endpoint, Result);
- }
- else
- {
- ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- Endpoint->DisplayName(),
- Result.Reason);
- }
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
}
}
}