aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-09-08 16:14:51 +0200
committerStefan Boberg <[email protected]>2021-09-08 16:14:51 +0200
commit5f1146d00ff545ddb00e0f26784ee468b09b0efb (patch)
tree49c0007460c1fc924647f1f072971c08d3956f36
parentChanged logging defaults to be more suitable for production (diff)
parentMinor fixup and formatting. (diff)
downloadzen-5f1146d00ff545ddb00e0f26784ee468b09b0efb.tar.xz
zen-5f1146d00ff545ddb00e0f26784ee468b09b0efb.zip
Merge branch 'main' of https://github.com/EpicGames/zen
-rw-r--r--zenserver/upstream/jupiter.cpp33
-rw-r--r--zenserver/upstream/jupiter.h6
-rw-r--r--zenserver/upstream/upstreamcache.cpp162
-rw-r--r--zenserver/upstream/upstreamcache.h8
-rw-r--r--zenserver/upstream/zen.cpp20
-rw-r--r--zenserver/upstream/zen.h4
6 files changed, 177 insertions, 56 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 9a013963a..815e9014e 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -80,15 +80,12 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
Session.SetOption(cpr::Header{{"Authorization", Auth}});
cpr::Response Response = Session.Get();
-
m_Log.debug("GET {}", Response);
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- if (Response.status_code == 200)
- {
- return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true};
- }
-
- return {.Success = false};
+ return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}
CloudCacheResult
@@ -116,12 +113,10 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
cpr::Response Response = Session.Get();
m_Log.debug("GET {}", Response);
- if (Response.status_code == 200)
- {
- return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true};
- }
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Success = false};
+ return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}
CloudCacheResult
@@ -141,12 +136,10 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
cpr::Response Response = Session.Get();
m_Log.debug("GET {}", Response);
- if (Response.status_code == 200)
- {
- return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true};
- }
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {.Success = false};
+ return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}
CloudCacheResult
@@ -170,7 +163,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Response Response = Session.Put();
m_Log.debug("PUT {}", Response);
- return {.Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}
CloudCacheResult
@@ -201,7 +194,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
cpr::Response Response = Session.Put();
m_Log.debug("PUT {}", Response);
- return {.Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}
CloudCacheResult
@@ -222,7 +215,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
cpr::Response Response = Session.Put();
m_Log.debug("PUT {}", Response);
- return {.Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}
std::vector<IoHash>
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index f3af88e77..efe9d07ba 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -43,8 +43,10 @@ private:
struct CloudCacheResult
{
- IoBuffer Value;
- bool Success = false;
+ IoBuffer Response;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ bool Success = false;
};
/**
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index f7a91acb5..9e566749a 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -17,7 +17,9 @@
#include <algorithm>
#include <atomic>
#include <deque>
+#include <limits>
#include <thread>
+#include <unordered_map>
namespace zen {
@@ -122,7 +124,10 @@ namespace detail {
Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type);
}
- return {.Value = Result.Value, .Success = Result.Success};
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
}
catch (std::exception& e)
{
@@ -136,7 +141,11 @@ namespace detail {
{
zen::CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
- return {.Value = Result.Value, .Success = Result.Success};
+
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
}
catch (std::exception& e)
{
@@ -173,10 +182,13 @@ namespace detail {
}
}
- return {.Success = Result.Success};
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success};
}
else
{
+ int64_t TotalBytes = 0ull;
+ double TotalElapsedSeconds = 0.0;
+
for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
{
CloudCacheResult Result;
@@ -185,24 +197,29 @@ namespace detail {
Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
}
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
if (!Result.Success)
{
- return {.Reason = "Failed to upload payload", .Success = false};
+ return {.Reason = "Failed to upload payload",
+ .Bytes = TotalBytes,
+ .ElapsedSeconds = TotalElapsedSeconds,
+ .Success = false};
}
}
+ CloudCacheResult Result;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
{
- CloudCacheResult Result;
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++)
- {
- Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- RecordValue,
- ZenContentType::kCbObject);
- }
-
- return {.Success = Result.Success};
+ Result =
+ Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject);
}
+
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
}
}
catch (std::exception& e)
@@ -243,7 +260,11 @@ namespace detail {
{
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
- return {.Value = Result.Response, .Success = Result.Success};
+
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
}
catch (std::exception& e)
{
@@ -258,7 +279,11 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result =
Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
- return {.Value = Result.Response, .Success = Result.Success};
+
+ return {.Value = Result.Response,
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
}
catch (std::exception& e)
{
@@ -277,6 +302,8 @@ namespace detail {
{
zen::ZenStructuredCacheSession Session(*m_Client);
ZenCacheResult Result;
+ int64_t TotalBytes = 0ull;
+ double TotalElapsedSeconds = 0.0;
for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
{
@@ -289,9 +316,15 @@ namespace detail {
Payloads[Idx]);
}
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
if (!Result.Success)
{
- return {.Reason = "Failed to upload payload", .Success = false};
+ return {.Reason = "Failed to upload payload",
+ .Bytes = TotalBytes,
+ .ElapsedSeconds = TotalElapsedSeconds,
+ .Success = false};
}
}
@@ -301,7 +334,10 @@ namespace detail {
Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
}
- return {.Success = Result.Success};
+ TotalBytes += Result.Bytes;
+ TotalElapsedSeconds += Result.ElapsedSeconds;
+
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
}
catch (std::exception& e)
{
@@ -318,6 +354,87 @@ namespace detail {
//////////////////////////////////////////////////////////////////////////
+class UpstreamStats final
+{
+ static constexpr uint64_t MaxSampleCount = 100ull;
+
+ struct StatCounters
+ {
+ int64_t Bytes = {};
+ int64_t Count = {};
+ double Seconds = {};
+ };
+
+ using StatsMap = std::unordered_map<const UpstreamEndpoint*, StatCounters>;
+
+ struct EndpointStats
+ {
+ mutable std::mutex Lock;
+ StatsMap Counters;
+ };
+
+public:
+ UpstreamStats() : m_Log(zen::logging::Get("upstream")) {}
+
+ void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result)
+ {
+ std::unique_lock Lock(m_DownStats.Lock);
+
+ auto& Counters = m_DownStats.Counters[&Endpoint];
+ Counters.Bytes += Result.Bytes;
+ Counters.Seconds += Result.ElapsedSeconds;
+ Counters.Count++;
+
+ if (Counters.Count >= MaxSampleCount)
+ {
+ LogStats("STATS - (downstream):"sv, m_DownStats.Counters);
+ Counters = StatCounters{};
+ }
+ }
+
+ void Add(const UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result)
+ {
+ std::unique_lock Lock(m_UpStats.Lock);
+
+ auto& Counters = m_UpStats.Counters[&Endpoint];
+ Counters.Bytes += Result.Bytes;
+ Counters.Seconds += Result.ElapsedSeconds;
+ Counters.Count++;
+
+ if (Counters.Count >= MaxSampleCount)
+ {
+ LogStats("STATS - (upstream):"sv, m_UpStats.Counters);
+ Counters = StatCounters{};
+ }
+ }
+
+private:
+ void LogStats(std::string_view What, const std::unordered_map<const UpstreamEndpoint*, StatCounters>& EndpointStats)
+ {
+ for (const auto& Kv : EndpointStats)
+ {
+ const UpstreamEndpoint& Endpoint = *Kv.first;
+ const StatCounters& Counters = Kv.second;
+ const double TotalMb = double(Counters.Bytes) / 1024.0 / 1024.0;
+
+ m_Log.info("{} Endpoint: {}, Bytes: {:.2f} MB, Time: {:.2f} s, Speed: {:.2f} MB/s, Avg: {:.2f} ms/request, Samples: {}",
+ What,
+ Kv.first->DisplayName(),
+ TotalMb,
+ Counters.Seconds,
+ TotalMb / Counters.Seconds,
+ (Counters.Seconds * 1000.0) / double(Counters.Count),
+ Counters.Count);
+ }
+ }
+
+ spdlog::logger& m_Log;
+ EndpointStats m_UpStats;
+ EndpointStats m_DownStats;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
class DefaultUpstreamCache final : public UpstreamCache
{
public:
@@ -362,6 +479,7 @@ public:
{
if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
{
+ m_Stats.Add(*Endpoint, Result);
return Result;
}
}
@@ -375,6 +493,7 @@ public:
{
if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
{
+ m_Stats.Add(*Endpoint, Result);
return Result;
}
}
@@ -425,7 +544,11 @@ private:
for (auto& Endpoint : m_Endpoints)
{
- Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ }
}
}
@@ -477,6 +600,7 @@ private:
::ZenCacheStore& m_CacheStore;
CidStore& m_CidStore;
UpstreamQueue m_UpstreamQueue;
+ UpstreamStats m_Stats;
std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints;
std::vector<std::thread> m_UpstreamThreads;
std::atomic_bool m_IsRunning{false};
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index d8359bc2c..327778452 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -43,13 +43,17 @@ struct GetUpstreamCacheResult
{
IoBuffer Value;
std::string Reason;
- bool Success = false;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ bool Success = false;
};
struct PutUpstreamCacheResult
{
std::string Reason;
- bool Success;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ bool Success = false;
};
/**
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 7ed4eead0..7cdaa0036 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -394,12 +394,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
cpr::Response Response = Session.Get();
m_Log.debug("GET {}", Response);
- if (Response.status_code == 200)
- {
- return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true};
- }
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {};
+ return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}
ZenCacheResult
@@ -416,12 +414,10 @@ ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHa
cpr::Response Response = Session.Get();
m_Log.debug("GET {}", Response);
- if (Response.status_code == 200)
- {
- return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true};
- }
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
- return {};
+ return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}
ZenCacheResult
@@ -440,7 +436,7 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
cpr::Response Response = Session.Put();
m_Log.debug("PUT {}", Response);
- return {.Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}
ZenCacheResult
@@ -458,7 +454,7 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
cpr::Response Response = Session.Put();
m_Log.debug("PUT {}", Response);
- return {.Success = Response.status_code == 200};
+ return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}
} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 5df6da4a3..c4bff8980 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -88,7 +88,9 @@ namespace detail {
struct ZenCacheResult
{
IoBuffer Response;
- bool Success = false;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ bool Success = false;
};
/** Zen Structured Cache session