diff options
| author | Stefan Boberg <[email protected]> | 2021-09-08 16:14:51 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-09-08 16:14:51 +0200 |
| commit | 5f1146d00ff545ddb00e0f26784ee468b09b0efb (patch) | |
| tree | 49c0007460c1fc924647f1f072971c08d3956f36 | |
| parent | Changed logging defaults to be more suitable for production (diff) | |
| parent | Minor fixup and formatting. (diff) | |
| download | zen-5f1146d00ff545ddb00e0f26784ee468b09b0efb.tar.xz zen-5f1146d00ff545ddb00e0f26784ee468b09b0efb.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 33 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 6 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 162 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 8 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 20 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 4 |
6 files changed, 177 insertions, 56 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 9a013963a..815e9014e 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -80,15 +80,12 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke Session.SetOption(cpr::Header{{"Authorization", Auth}}); cpr::Response Response = Session.Get(); - m_Log.debug("GET {}", Response); + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - if (Response.status_code == 200) - { - return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; - } - - return {.Success = false}; + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } CloudCacheResult @@ -116,12 +113,10 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte cpr::Response Response = Session.Get(); m_Log.debug("GET {}", Response); - if (Response.status_code == 200) - { - return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; - } + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Success = false}; + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } CloudCacheResult @@ -141,12 +136,10 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) cpr::Response Response = Session.Get(); m_Log.debug("GET {}", Response); - if (Response.status_code == 200) - { - return {IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), true}; - } + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {.Success = false}; + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } CloudCacheResult @@ -170,7 +163,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke cpr::Response Response = Session.Put(); m_Log.debug("PUT {}", Response); - return {.Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } CloudCacheResult @@ -201,7 +194,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer cpr::Response Response = Session.Put(); m_Log.debug("PUT {}", Response); - return {.Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } CloudCacheResult @@ -222,7 +215,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) cpr::Response Response = Session.Put(); m_Log.debug("PUT {}", Response); - return {.Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } std::vector<IoHash> diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index f3af88e77..efe9d07ba 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -43,8 +43,10 @@ private: struct CloudCacheResult { - IoBuffer Value; - bool Success = false; + IoBuffer Response; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + bool Success = false; }; /** diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index f7a91acb5..9e566749a 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -17,7 +17,9 @@ #include <algorithm> #include <atomic> #include <deque> +#include <limits> #include <thread> +#include <unordered_map> namespace zen { @@ -122,7 +124,10 @@ namespace detail { Result = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, Type); } - return {.Value = Result.Value, .Success = Result.Success}; + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -136,7 +141,11 @@ namespace detail { { zen::CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); - return {.Value = Result.Value, .Success = Result.Success}; + + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -173,10 +182,13 @@ namespace detail { } } - return {.Success = Result.Success}; + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } else { + int64_t TotalBytes = 0ull; + double TotalElapsedSeconds = 0.0; + for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { CloudCacheResult Result; @@ -185,24 +197,29 @@ namespace detail { Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); } + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) { - return {.Reason = "Failed to upload payload", .Success = false}; + return {.Reason = "Failed to upload payload", + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = false}; } } + CloudCacheResult Result; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - CloudCacheResult Result; - for (int32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) - { - Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); - } - - return {.Success = Result.Success}; + Result = + Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); } + + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } } catch (std::exception& e) @@ -243,7 +260,11 @@ namespace detail { { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); - return {.Value = Result.Response, .Success = Result.Success}; + + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -258,7 +279,11 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); - return {.Value = Result.Response, .Success = Result.Success}; + + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } catch (std::exception& e) { @@ -277,6 +302,8 @@ namespace detail { { zen::ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; + int64_t TotalBytes = 0ull; + double TotalElapsedSeconds = 0.0; for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) { @@ -289,9 +316,15 @@ namespace detail { Payloads[Idx]); } + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) { - return {.Reason = "Failed to upload payload", .Success = false}; + return {.Reason = "Failed to upload payload", + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = false}; } } @@ -301,7 +334,10 @@ namespace detail { Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); } - return {.Success = Result.Success}; + TotalBytes += Result.Bytes; + TotalElapsedSeconds += Result.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; } catch (std::exception& e) { @@ -318,6 +354,87 @@ namespace detail { ////////////////////////////////////////////////////////////////////////// +class UpstreamStats final +{ + static constexpr uint64_t MaxSampleCount = 100ull; + + struct StatCounters + { + int64_t Bytes = {}; + int64_t Count = {}; + double Seconds = {}; + }; + + using StatsMap = std::unordered_map<const UpstreamEndpoint*, StatCounters>; + + struct EndpointStats + { + mutable std::mutex Lock; + StatsMap Counters; + }; + +public: + UpstreamStats() : m_Log(zen::logging::Get("upstream")) {} + + void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result) + { + std::unique_lock Lock(m_DownStats.Lock); + + auto& Counters = m_DownStats.Counters[&Endpoint]; + Counters.Bytes += Result.Bytes; + Counters.Seconds += Result.ElapsedSeconds; + Counters.Count++; + + if (Counters.Count >= MaxSampleCount) + { + LogStats("STATS - (downstream):"sv, m_DownStats.Counters); + Counters = StatCounters{}; + } + } + + void Add(const UpstreamEndpoint& Endpoint, const PutUpstreamCacheResult& Result) + { + std::unique_lock Lock(m_UpStats.Lock); + + auto& Counters = m_UpStats.Counters[&Endpoint]; + Counters.Bytes += Result.Bytes; + Counters.Seconds += Result.ElapsedSeconds; + Counters.Count++; + + if (Counters.Count >= MaxSampleCount) + { + LogStats("STATS - (upstream):"sv, m_UpStats.Counters); + Counters = StatCounters{}; + } + } + +private: + void LogStats(std::string_view What, const std::unordered_map<const UpstreamEndpoint*, StatCounters>& EndpointStats) + { + for (const auto& Kv : EndpointStats) + { + const UpstreamEndpoint& Endpoint = *Kv.first; + const StatCounters& Counters = Kv.second; + const double TotalMb = double(Counters.Bytes) / 1024.0 / 1024.0; + + m_Log.info("{} Endpoint: {}, Bytes: {:.2f} MB, Time: {:.2f} s, Speed: {:.2f} MB/s, Avg: {:.2f} ms/request, Samples: {}", + What, + Kv.first->DisplayName(), + TotalMb, + Counters.Seconds, + TotalMb / Counters.Seconds, + (Counters.Seconds * 1000.0) / double(Counters.Count), + Counters.Count); + } + } + + spdlog::logger& m_Log; + EndpointStats m_UpStats; + EndpointStats m_DownStats; +}; + +////////////////////////////////////////////////////////////////////////// + class DefaultUpstreamCache final : public UpstreamCache { public: @@ -362,6 +479,7 @@ public: { if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) { + m_Stats.Add(*Endpoint, Result); return Result; } } @@ -375,6 +493,7 @@ public: { if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) { + m_Stats.Add(*Endpoint, Result); return Result; } } @@ -425,7 +544,11 @@ private: for (auto& Endpoint : m_Endpoints) { - Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + if (PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + Result.Success) + { + m_Stats.Add(*Endpoint, Result); + } } } @@ -477,6 +600,7 @@ private: ::ZenCacheStore& m_CacheStore; CidStore& m_CidStore; UpstreamQueue m_UpstreamQueue; + UpstreamStats m_Stats; std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints; std::vector<std::thread> m_UpstreamThreads; std::atomic_bool m_IsRunning{false}; diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index d8359bc2c..327778452 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -43,13 +43,17 @@ struct GetUpstreamCacheResult { IoBuffer Value; std::string Reason; - bool Success = false; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + bool Success = false; }; struct PutUpstreamCacheResult { std::string Reason; - bool Success; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + bool Success = false; }; /** diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 7ed4eead0..7cdaa0036 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -394,12 +394,10 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas cpr::Response Response = Session.Get(); m_Log.debug("GET {}", Response); - if (Response.status_code == 200) - { - return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true}; - } + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {}; + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } ZenCacheResult @@ -416,12 +414,10 @@ ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHa cpr::Response Response = Session.Get(); m_Log.debug("GET {}", Response); - if (Response.status_code == 200) - { - return {.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()), .Success = true}; - } + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); - return {}; + return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } ZenCacheResult @@ -440,7 +436,7 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas cpr::Response Response = Session.Put(); m_Log.debug("PUT {}", Response); - return {.Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } ZenCacheResult @@ -458,7 +454,7 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa cpr::Response Response = Session.Put(); m_Log.debug("PUT {}", Response); - return {.Success = Response.status_code == 200}; + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } } // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 5df6da4a3..c4bff8980 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -88,7 +88,9 @@ namespace detail { struct ZenCacheResult { IoBuffer Response; - bool Success = false; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + bool Success = false; }; /** Zen Structured Cache session |