aboutsummaryrefslogtreecommitdiff
path: root/zenserver/upstream/upstreamcache.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-09-07 15:11:11 +0200
committerPer Larsson <[email protected]>2021-09-07 15:11:11 +0200
commitc07e8c760ed56db387b6612ed8392d6e78301479 (patch)
treea77c19e56932b121ecd1f3e0dabced5843810897 /zenserver/upstream/upstreamcache.cpp
parentMerge branch 'main' of https://github.com/EpicGames/zen (diff)
downloadzen-c07e8c760ed56db387b6612ed8392d6e78301479.tar.xz
zen-c07e8c760ed56db387b6612ed8392d6e78301479.zip
Return stats from Zen/Jupiter HTTP client.
Diffstat (limited to 'zenserver/upstream/upstreamcache.cpp')
-rw-r--r--zenserver/upstream/upstreamcache.cpp183
1 files changed, 164 insertions, 19 deletions
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index f7a91acb5..04ac40a39 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -16,8 +16,10 @@
#include <algorithm>
#include <atomic>
+#include <chrono>
#include <deque>
#include <thread>
+#include <unordered_map>
namespace zen {
@@ -122,7 +124,10 @@ namespace detail {
Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type);
}
- return {.Value = Result.Value, .Success = Result.Success};
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
}
catch (std::exception& e)
{
@@ -136,7 +141,11 @@ namespace detail {
{
zen::CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
- return {.Value = Result.Value, .Success = Result.Success};
+
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
}
catch (std::exception& e)
{
@@ -173,10 +182,13 @@ namespace detail {
}
}
- return {.Success = Result.Success};
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success};
}
else
{
+ int64_t TotalBytes = 0;
+ double TotalElapsedSeconds = 0.0;
+
for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
{
CloudCacheResult Result;
@@ -185,24 +197,29 @@ namespace detail {
Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
}
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
if (!Result.Success)
{
- return {.Reason = "Failed to upload payload", .Success = false};
+ return {.Reason = "Failed to upload payload",
+ .Bytes = TotalBytes,
+ .ElapsedSeconds = TotalElapsedSeconds,
+ .Success = false};
}
}
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- CloudCacheResult Result;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- RecordValue,
- ZenContentType::kCbObject);
- }
-
- return {.Success = Result.Success};
+ Result =
+ Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject);
}
+
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
}
}
catch (std::exception& e)
@@ -243,7 +260,11 @@ namespace detail {
{
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
- return {.Value = Result.Response, .Success = Result.Success};
+
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
}
catch (std::exception& e)
{
@@ -258,7 +279,11 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result =
Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
- return {.Value = Result.Response, .Success = Result.Success};
+
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
}
catch (std::exception& e)
{
@@ -277,6 +302,8 @@ namespace detail {
{
zen::ZenStructuredCacheSession Session(*m_Client);
ZenCacheResult Result;
+ int64_t TotalBytes = 0;
+ double TotalElapsedSeconds = 0.0;
for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
{
@@ -289,9 +316,15 @@ namespace detail {
Payloads[Idx]);
}
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
if (!Result.Success)
{
- return {.Reason = "Failed to upload payload", .Success = false};
+ return {.Reason = "Failed to upload payload",
+ .Bytes = TotalBytes,
+ .ElapsedSeconds = TotalElapsedSeconds,
+ .Success = false};
}
}
@@ -301,7 +334,10 @@ namespace detail {
Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
}
- return {.Success = Result.Success};
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
}
catch (std::exception& e)
{
@@ -318,6 +354,108 @@ namespace detail {
//////////////////////////////////////////////////////////////////////////
+class UpstreamStats final
+{
+ using clock = std::chrono::steady_clock;
+ using time_point = std::chrono::time_point<clock>;
+
+ struct StatCounters
+ {
+ int64_t Bytes = {};
+ int64_t Count = {};
+ double Seconds = {};
+ };
+
+ struct EndpointStats
+ {
+ mutable std::mutex Lock;
+ std::unordered_map<const UpstreamEndpoint*, StatCounters> Counters;
+ };
+
+public:
+ UpstreamStats() : m_Log(zen::logging::Get("upstreamstats")) { m_LastTime = clock::now(); }
+
+ void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result)
+ {
+ std::unique_lock Lock(m_DownStats.Lock);
+
+ auto& Counters = m_DownStats.Counters[&Endpoint];
+ Counters.Bytes += Result.Bytes;
+ Counters.Seconds += Result.ElapsedSeconds;
+ Counters.Count++;
+
+ LogStats();
+ }
+
+ void Add(const UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result)
+ {
+ std::unique_lock Lock(m_UpStats.Lock);
+
+ auto& Counters = m_UpStats.Counters[&Endpoint];
+ Counters.Bytes += Result.Bytes;
+ Counters.Seconds += Result.ElapsedSeconds;
+ Counters.Count++;
+
+ LogStats();
+ }
+
+private:
+ void LogStats()
+ {
+ using std::chrono::duration_cast;
+ using std::chrono::seconds;
+
+ const seconds Duration = duration_cast<seconds>(clock::now() - m_LastTime);
+ if (Duration < seconds(5))
+ {
+ return;
+ }
+
+ {
+ std::unique_lock Lock(m_DownStats.Lock);
+ for (const auto& Kv : m_DownStats.Counters)
+ {
+ const UpstreamEndpoint& Endpoint = *Kv.first;
+ const StatCounters& Counters = Kv.second;
+
+ m_Log.info("Endpoint: {}, Total downloaded: {}Kb, Total time: {}s, Count: {}, Speed: {} Kb/s, Avg: {} (Req/s)",
+ Kv.first->DisplayName(),
+ double(Counters.Bytes) / 1024.0,
+ Counters.Seconds,
+ Counters.Count,
+ (double(Counters.Bytes) / 1024.0) / Counters.Seconds,
+ double(Counters.Count) / Counters.Seconds);
+ }
+ }
+
+ {
+ std::unique_lock Lock(m_UpStats.Lock);
+ for (const auto& Kv : m_UpStats.Counters)
+ {
+ const UpstreamEndpoint& Endpoint = *Kv.first;
+ const StatCounters& Counters = Kv.second;
+
+ m_Log.info("Endpoint: {}, Total uploaded: {}Kb, Total time: {}s, Count: {}, Speed: {} Kb/s, Avg: {} (Req/s)",
+ Kv.first->DisplayName(),
+ double(Counters.Bytes) / 1024.0,
+ Counters.Seconds,
+ Counters.Count,
+ (double(Counters.Bytes) / 1024.0) / Counters.Seconds,
+ double(Counters.Count) / Counters.Seconds);
+ }
+ }
+
+ m_LastTime = clock::now();
+ }
+
+ spdlog::logger& m_Log;
+ EndpointStats m_UpStats;
+ EndpointStats m_DownStats;
+ time_point m_LastTime = {};
+};
+
+//////////////////////////////////////////////////////////////////////////
+
class DefaultUpstreamCache final : public UpstreamCache
{
public:
@@ -362,6 +500,7 @@ public:
{
if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
{
+ m_Stats.Add(*Endpoint, Result);
return Result;
}
}
@@ -375,6 +514,7 @@ public:
{
if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
{
+ m_Stats.Add(*Endpoint, Result);
return Result;
}
}
@@ -425,7 +565,11 @@ private:
for (auto& Endpoint : m_Endpoints)
{
- Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ }
}
}
@@ -477,6 +621,7 @@ private:
::ZenCacheStore& m_CacheStore;
CidStore& m_CidStore;
UpstreamQueue m_UpstreamQueue;
+ UpstreamStats m_Stats;
std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints;
std::vector<std::thread> m_UpstreamThreads;
std::atomic_bool m_IsRunning{false};