diff options
| author | Stefan Boberg <[email protected]> | 2023-10-25 12:21:51 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-10-25 12:21:51 +0200 |
| commit | 0b220255724020daea18ddb559f5edf3fdb1621b (patch) | |
| tree | 7809227752c15d90111c4e600d80b59d90ad25d2 /src | |
| parent | New rotating file logger that keeps on running regardless of errors (#495) (diff) | |
| download | zen-0b220255724020daea18ddb559f5edf3fdb1621b.tar.xz zen-0b220255724020daea18ddb559f5edf3fdb1621b.zip | |
statsd metrics reporting (#496)
added support for reporting metrics via statsd style UDP messaging, which is supported by many monitoring solution providers
this change adds reporting only of three cache related metrics (hit/miss/put) but this should be extended to include more metrics after additional evaluation
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/windows.h | 1 | ||||
| -rw-r--r-- | src/zennet-test/xmake.lua | 17 | ||||
| -rw-r--r-- | src/zennet-test/zennet-test.cpp | 52 | ||||
| -rw-r--r-- | src/zennet-test/zennet-test.h | 5 | ||||
| -rw-r--r-- | src/zennet/include/zennet/statsdclient.h | 40 | ||||
| -rw-r--r-- | src/zennet/include/zennet/zennet.h | 9 | ||||
| -rw-r--r-- | src/zennet/statsdclient.cpp | 463 | ||||
| -rw-r--r-- | src/zennet/xmake.lua | 12 | ||||
| -rw-r--r-- | src/zennet/zennet.cpp | 19 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 28 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.h | 21 | ||||
| -rw-r--r-- | src/zenserver/config.cpp | 12 | ||||
| -rw-r--r-- | src/zenserver/config.h | 8 | ||||
| -rw-r--r-- | src/zenserver/stats/statsreporter.cpp | 60 | ||||
| -rw-r--r-- | src/zenserver/stats/statsreporter.h | 39 | ||||
| -rw-r--r-- | src/zenserver/xmake.lua | 8 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 28 | ||||
| -rw-r--r-- | src/zenserver/zenserver.h | 4 |
18 files changed, 812 insertions, 14 deletions
diff --git a/src/zencore/include/zencore/windows.h b/src/zencore/include/zencore/windows.h index 6c238f845..0943a85ea 100644 --- a/src/zencore/include/zencore/windows.h +++ b/src/zencore/include/zencore/windows.h @@ -23,6 +23,7 @@ struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax erro # endif # include <windows.h> # undef GetObject +# undef SendMessage ZEN_THIRD_PARTY_INCLUDES_END diff --git a/src/zennet-test/xmake.lua b/src/zennet-test/xmake.lua new file mode 100644 index 000000000..bbede7739 --- /dev/null +++ b/src/zennet-test/xmake.lua @@ -0,0 +1,17 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +target("zennet-test") + set_kind("binary") + set_group("tests") + add_headerfiles("**.h") + add_files("*.cpp") + add_deps("zencore", "zenutil", "zennet") + add_packages("vcpkg::mimalloc") + add_packages("vcpkg::catch2") + add_packages("vcpkg::doctest") + + if is_plat("macosx") then + add_ldflags("-framework CoreFoundation") + add_ldflags("-framework Security") + add_ldflags("-framework SystemConfiguration") + end diff --git a/src/zennet-test/zennet-test.cpp b/src/zennet-test/zennet-test.cpp new file mode 100644 index 000000000..b133f246c --- /dev/null +++ b/src/zennet-test/zennet-test.cpp @@ -0,0 +1,52 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zennet-test.h" +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/timer.h> +#include <zennet/zennet.h> +#include <zenutil/zenserverprocess.h> + +#if ZEN_USE_MIMALLOC +ZEN_THIRD_PARTY_INCLUDES_START +# include <mimalloc.h> +ZEN_THIRD_PARTY_INCLUDES_END +#endif + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS +# define ZEN_TEST_WITH_RUNNER 1 +# include <zencore/testing.h> +#endif + +////////////////////////////////////////////////////////////////////////// + +using namespace std::literals; + +zen::ZenServerEnvironment TestEnv; + +int +main([[maybe_unused]] int argc, [[maybe_unused]] char** argv) +{ +#if ZEN_USE_MIMALLOC + mi_version(); +#endif + +#if ZEN_WITH_TESTS + zen::zennet_forcelinktests(); + + zen::logging::InitializeLogging(); + + std::filesystem::path ProgramBaseDir = std::filesystem::path(argv[0]).parent_path(); + std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test"; + + TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir); + + ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir); + + return ZEN_RUN_TESTS(argc, argv); +#else + return 0; +#endif +} diff --git a/src/zennet-test/zennet-test.h b/src/zennet-test/zennet-test.h new file mode 100644 index 000000000..f84fd52ce --- /dev/null +++ b/src/zennet-test/zennet-test.h @@ -0,0 +1,5 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/zenserverprocess.h> + +extern zen::ZenServerEnvironment TestEnv; diff --git a/src/zennet/include/zennet/statsdclient.h b/src/zennet/include/zennet/statsdclient.h new file mode 100644 index 000000000..a429286ae --- /dev/null +++ b/src/zennet/include/zennet/statsdclient.h @@ -0,0 +1,40 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#include <memory> +#include <string_view> + +namespace zen { + +class StatsTransportBase +{ +public: + virtual ~StatsTransportBase() = 0; + virtual void SendMessage(const void* Data, size_t Size) = 0; +}; + +class StatsDaemonClient +{ +public: + virtual ~StatsDaemonClient() = 0; + + virtual void SetMessageSize(size_t MessageSize, bool UseThreads) = 0; + virtual void Flush() = 0; + + virtual void Increment(std::string_view Metric) = 0; + virtual void Decrement(std::string_view Metric) = 0; + virtual void Count(std::string_view Metric, int64_t CountDelta) = 0; + virtual void Gauge(std::string_view Metric, uint64_t CurrentValue) = 0; + virtual void Meter(std::string_view Metric, uint64_t IncrementValue) = 0; + + // Not (yet) implemented: Set,Timing +}; + +std::unique_ptr<StatsDaemonClient> CreateStatsDaemonClient(std::string_view TargetHost = "localhost", uint16_t TargetPort = 8125); + +void statsd_forcelink(); + +} // namespace zen diff --git a/src/zennet/include/zennet/zennet.h b/src/zennet/include/zennet/zennet.h new file mode 100644 index 000000000..11b368cd1 --- /dev/null +++ b/src/zennet/include/zennet/zennet.h @@ -0,0 +1,9 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +namespace zen { + +void zennet_forcelinktests(); + +} diff --git a/src/zennet/statsdclient.cpp b/src/zennet/statsdclient.cpp new file mode 100644 index 000000000..fe5ca4dda --- /dev/null +++ b/src/zennet/statsdclient.cpp @@ -0,0 +1,463 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zennet/statsdclient.h> + +#include <zencore/logging.h> +#include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/thread.h> + +#include <deque> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <zencore/windows.h> +#include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +StatsTransportBase::~StatsTransportBase() = default; +StatsDaemonClient::~StatsDaemonClient() = default; + +////////////////////////////////////////////////////////////////////////// + +class StatsdUdpClient : public StatsTransportBase +{ + using udp = asio::ip::udp; + +public: + StatsdUdpClient(std::string_view TargetHost, uint16_t TargetPort) : m_TargetHost(TargetHost), m_TargetPort(TargetPort) + { + InitializeSocket(); + } + ~StatsdUdpClient() { Shutdown(); } + + virtual void SendMessage(const void* Data, size_t Size) override + { + if (m_Socket) + { + std::error_code Ec; + m_Socket->send_to(asio::buffer(Data, Size), m_ServerAddr, 0, Ec); + } + } + +private: + asio::io_service m_IoService; + std::string m_TargetHost; + uint16_t m_TargetPort; + + udp::endpoint m_ServerAddr; + std::unique_ptr<udp::socket> m_Socket; + + void InitializeSocket(); + void Shutdown(); +}; + +void +StatsdUdpClient::InitializeSocket() +{ + std::error_code Ec; + m_Socket = std::make_unique<udp::socket>(m_IoService); + m_Socket->open(udp::v4(), Ec); + + if (Ec) + { + ZEN_WARN("StatsdUdpClient::InitializeSocket socket creation failed: {}", Ec.message()); + + return m_Socket.reset(); + } + + udp::resolver DnsResolver(m_IoService); + udp::resolver::query DnsQuery(udp::v4(), m_TargetHost, fmt::format("{}", m_TargetPort)); + udp::resolver::iterator It = DnsResolver.resolve(DnsQuery, Ec); + + if (Ec) + { + ZEN_WARN("StatsdUdpClient::InitializeSocket resolve of '{}:{}' failed: {}", m_TargetHost, m_TargetPort, Ec.message()); + + return m_Socket.reset(); + } + + m_ServerAddr = *It; +} + +void +StatsdUdpClient::Shutdown() +{ + if (m_Socket) + { + m_Socket->shutdown(asio::socket_base::shutdown_send); + m_Socket.reset(); + } +} + +////////////////////////////////////////////////////////////////////////// + +class StatsdMemoryClient : public StatsTransportBase +{ + using MessageBuffer_t = std::vector<uint8_t>; + +public: + StatsdMemoryClient() {} + ~StatsdMemoryClient() {} + + virtual void SendMessage(const void* Data, size_t Size) override + { + RwLock::ExclusiveLockScope _(m_Lock); + m_Messages.emplace_back(reinterpret_cast<const uint8_t*>(Data), reinterpret_cast<const uint8_t*>(Data) + Size); + } + + std::deque<MessageBuffer_t> GetMessages() + { + RwLock::ExclusiveLockScope _(m_Lock); + return std::move(m_Messages); + } + +private: + RwLock m_Lock; + std::deque<MessageBuffer_t> m_Messages; +}; + +////////////////////////////////////////////////////////////////////////// + +class StatsMessageBuilder +{ + using udp = asio::ip::udp; + +public: + StatsMessageBuilder(StatsTransportBase& Transport) : m_Transport(Transport) { m_AcceptMessages.test_and_set(); } + + ~StatsMessageBuilder() + { + if (m_MessagingThread.joinable()) + { + m_AcceptMessages.clear(); + m_ShutdownRequested.Set(); + m_MessagingThread.join(); + } + else + { + FlushMessageBuffer(); + } + } + + void SetUseBackgroundThread(bool UseBackgroundThread) { m_UseBackgroundThread = UseBackgroundThread; } + + void SetMessageSize(size_t MessageSize) + { + m_MessageSize = MessageSize; + + if (m_UseBackgroundThread && !m_MessagingThread.joinable()) + { + m_MessagingThread = std::thread([this] { + SetCurrentThreadName("statsd_reporter"); + + ZEN_INFO("statsd reporting thread started"); + + bool ShutdownRequested = false; + + do + { + ShutdownRequested = m_ShutdownRequested.Wait(100); + + FlushAll(); + } while (!ShutdownRequested); + + FlushAll(); + + ZEN_INFO("statsd reporting thread exiting"); + }); + } + } + + void SendMetric(std::string_view MetricMessage) + { + if (m_AcceptMessages.test() == false) + { + return; + } + + if (MetricMessage.size() >= m_MessageSize) + { + return SendMessage(MetricMessage); + } + + RwLock::ExclusiveLockScope _(m_MessageLock); + + if ((m_MessageBuffer.size() + MetricMessage.size()) >= m_MessageSize) + { + // This will clear the message buffer + EnqueueOrSendCurrentMessage(_); + } + + if (!m_MessageBuffer.empty()) + { + m_MessageBuffer.push_back(uint8_t('\n')); + } + + m_MessageBuffer.insert(m_MessageBuffer.end(), + reinterpret_cast<const uint8_t*>(MetricMessage.data()), + reinterpret_cast<const uint8_t*>(MetricMessage.data() + MetricMessage.size())); + } + + void FlushMessageBuffer(); + void FlushQueue(); + + void FlushAll() + { + FlushQueue(); + FlushMessageBuffer(); + } + +private: + StatsTransportBase& m_Transport; + std::thread m_MessagingThread; + Event m_ShutdownRequested; + std::atomic_flag m_AcceptMessages; + RwLock m_MessageLock; + std::vector<uint8_t> m_MessageBuffer; + size_t m_MessageSize = 0; + bool m_UseBackgroundThread = true; + + std::deque<std::vector<uint8_t>> m_MessageQueue; + + void EnqueueOrSendCurrentMessage(RwLock::ExclusiveLockScope&) + { + if (m_MessagingThread.joinable()) + { + m_MessageQueue.emplace_back(std::move(m_MessageBuffer)); + m_MessageBuffer.reserve(m_MessageSize); + } + else + { + m_Transport.SendMessage(m_MessageBuffer.data(), m_MessageBuffer.size()); + m_MessageBuffer.clear(); + } + } + + void SendMessage(std::string_view Message) { m_Transport.SendMessage(Message.data(), Message.size()); } +}; + +void +StatsMessageBuilder::FlushMessageBuffer() +{ + RwLock::ExclusiveLockScope _(m_MessageLock); + + if (m_MessageBuffer.empty()) + return; + + m_Transport.SendMessage(m_MessageBuffer.data(), m_MessageBuffer.size()); + m_MessageBuffer.clear(); +} + +void +StatsMessageBuilder::FlushQueue() +{ + std::deque<std::vector<uint8_t>> Queue; + + { + RwLock::ExclusiveLockScope _(m_MessageLock); + + std::swap(Queue, m_MessageQueue); + } + + while (!Queue.empty()) + { + std::vector<uint8_t>& Message = Queue.front(); + + m_Transport.SendMessage(Message.data(), Message.size()); + + Queue.pop_front(); + } +} + +////////////////////////////////////////////////////////////////////////// + +class StatsDaemonClientImpl : public StatsDaemonClient +{ +public: + StatsDaemonClientImpl(std::unique_ptr<StatsTransportBase>&& Transport); + ~StatsDaemonClientImpl(); + + virtual void Decrement(std::string_view Metric) override; + virtual void Increment(std::string_view Metric) override; + virtual void Count(std::string_view Metric, int64_t CountDelta) override; + virtual void Gauge(std::string_view Metric, uint64_t CurrentValue) override; + virtual void Meter(std::string_view Metric, uint64_t IncrementValue) override; + virtual void SetMessageSize(size_t MessageSize, bool UseThreads) override; + virtual void Flush() override; + +private: + std::unique_ptr<StatsTransportBase> m_Transport; + StatsMessageBuilder m_MessageBuilder; + size_t m_MessageSize = 0; +}; + +std::unique_ptr<StatsDaemonClient> +CreateStatsDaemonClient(std::string_view TargetHost, uint16_t TargetPort) +{ + return std::make_unique<StatsDaemonClientImpl>(std::make_unique<StatsdUdpClient>(TargetHost, TargetPort)); +} + +std::unique_ptr<StatsDaemonClient> +CreateStatsDaemonClient(std::unique_ptr<StatsTransportBase>&& Transport) +{ + return std::make_unique<StatsDaemonClientImpl>(std::move(Transport)); +} + +////////////////////////////////////////////////////////////////////////// + +StatsDaemonClientImpl::StatsDaemonClientImpl(std::unique_ptr<StatsTransportBase>&& Transport) +: m_Transport(std::move(Transport)) +, m_MessageBuilder(*m_Transport) +{ +} + +StatsDaemonClientImpl::~StatsDaemonClientImpl() +{ +} + +void +StatsDaemonClientImpl::Increment(std::string_view Metric) +{ + ExtendableStringBuilder<128> MetricMessage; + MetricMessage << Metric << ":1|c"; + m_MessageBuilder.SendMetric(MetricMessage); +} + +void +StatsDaemonClientImpl::Decrement(std::string_view Metric) +{ + ExtendableStringBuilder<128> MetricMessage; + MetricMessage << Metric << ":-1|c"; + m_MessageBuilder.SendMetric(MetricMessage); +} + +void +StatsDaemonClientImpl::Count(std::string_view Metric, int64_t CountDelta) +{ + ExtendableStringBuilder<128> MetricMessage; + MetricMessage << Metric << ":" << CountDelta << "|c"; + m_MessageBuilder.SendMetric(MetricMessage); +} + +void +StatsDaemonClientImpl::Gauge(std::string_view Metric, uint64_t CurrentValue) +{ + ExtendableStringBuilder<128> MetricMessage; + MetricMessage << Metric << ":" << CurrentValue << "|g"; + m_MessageBuilder.SendMetric(MetricMessage); +} + +void +StatsDaemonClientImpl::Meter(std::string_view Metric, uint64_t IncrementValue) +{ + ExtendableStringBuilder<128> MetricMessage; + MetricMessage << Metric << ":" << IncrementValue << "|m"; + m_MessageBuilder.SendMetric(MetricMessage); +} + +void +StatsDaemonClientImpl::SetMessageSize(size_t MessageSize, bool UseThreads) +{ + m_MessageBuilder.SetUseBackgroundThread(UseThreads); + m_MessageBuilder.SetMessageSize(MessageSize); +} + +void +StatsDaemonClientImpl::Flush() +{ + m_MessageBuilder.FlushAll(); +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +void +statsd_forcelink() +{ +} + +TEST_CASE("zennet.statsd.emit") +{ + // auto Client = CreateStatsDaemonClient("localhost", 8125); + auto MemoryClient = std::make_unique<StatsdMemoryClient>(); + StatsdMemoryClient* RawMemoryClient = MemoryClient.get(); + auto Client = CreateStatsDaemonClient(std::move(MemoryClient)); + + Client->Count("test.counter1", 1); + Client->Count("test.counter2", 2); + Client->Meter("test.meter1", 42); + Client->Meter("test.meter2", 42); + Client->Meter("test.meter2", 42); + Client->Increment("test.count"); + Client->Decrement("test.count"); + Client->Increment("test.count"); + Client->Gauge("test.gauge", 1); + Client->Gauge("test.gauge", 99); + + const std::string_view FormattedMessages[] = {"test.counter1:1|c", + "test.counter2:2|c", + "test.meter1:42|m", + "test.meter2:42|m", + "test.meter2:42|m", + "test.count:1|c", + "test.count:-1|c", + "test.count:1|c", + "test.gauge:1|g", + "test.gauge:99|g"}; + + auto Messages = RawMemoryClient->GetMessages(); + + CHECK_EQ(Messages.size(), 10); + + for (int i = 0; i < 10; ++i) + { + std::string_view Message(reinterpret_cast<const char*>(Messages[i].data()), Messages[i].size()); + CHECK_EQ(Message, FormattedMessages[i]); + } +} + +TEST_CASE("zennet.statsd.batch") +{ + // auto Client = CreateStatsDaemonClient("localhost", 8125); + auto MemoryClient = std::make_unique<StatsdMemoryClient>(); + StatsdMemoryClient* RawMemoryClient = MemoryClient.get(); + auto Client = CreateStatsDaemonClient(std::move(MemoryClient)); + + const int MaxMessageSize = 1000; + const bool UseThreads = false; + Client->SetMessageSize(MaxMessageSize, UseThreads); + + Client->Count("test.counter1", 1); + Client->Count("test.counter2", 2); + Client->Meter("test.meter1", 42); + Client->Meter("test.meter2", 42); + Client->Meter("test.meter2", 42); + Client->Increment("test.count"); + Client->Decrement("test.count"); + Client->Increment("test.count"); + Client->Gauge("test.gauge", 1); + Client->Gauge("test.gauge", 99); + + for (int i = 0; i < 1000; ++i) + { + Client->Increment("test.count1000"); + } + + Client->Flush(); + + auto Messages = RawMemoryClient->GetMessages(); + + CHECK_EQ(Messages.size(), 20); + + for (const auto& Message : Messages) + { + CHECK(Message.size() <= MaxMessageSize); + } +} + +#endif + +} // namespace zen diff --git a/src/zennet/xmake.lua b/src/zennet/xmake.lua new file mode 100644 index 000000000..9b240e024 --- /dev/null +++ b/src/zennet/xmake.lua @@ -0,0 +1,12 @@ +-- Copyright Epic Games, Inc. All Rights Reserved. + +target('zennet') + set_kind("static") + set_group("libs") + add_headerfiles("**.h") + add_files("**.cpp") + add_includedirs("include", {public=true}) + add_deps("zencore", "zenutil") + add_packages( + "vcpkg::gsl-lite" + ) diff --git a/src/zennet/zennet.cpp b/src/zennet/zennet.cpp new file mode 100644 index 000000000..30a012a3b --- /dev/null +++ b/src/zennet/zennet.cpp @@ -0,0 +1,19 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zennet/zennet.h" + +#include <zencore/testing.h> + +#include <zennet/statsdclient.h> + +namespace zen { + +void +zennet_forcelinktests() +{ +#if ZEN_WITH_TESTS + zen::statsd_forcelink(); +#endif +} + +} // namespace zen diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index 89123a70f..6fab14eee 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -16,6 +16,7 @@ #include <zencore/timer.h> #include <zencore/trace.h> #include <zencore/workthreadpool.h> +#include <zennet/statsdclient.h> #include <zenstore/scrubcontext.h> #include <zenutil/cache/cache.h> @@ -663,7 +664,7 @@ ZenCacheStore::StorageSize() const } ZenCacheStore::CacheStoreStats -ZenCacheStore::Stats() +ZenCacheStore::Stats(bool IncludeNamespaceStats) { ZenCacheStore::CacheStoreStats Result{.HitCount = m_HitCount, .MissCount = m_MissCount, @@ -672,13 +673,32 @@ ZenCacheStore::Stats() .RejectedReadCount = m_RejectedReadCount, .PutOps = m_PutOps.Snapshot(), .GetOps = m_GetOps.Snapshot()}; - IterateNamespaces([&](std::string_view NamespaceName, ZenCacheNamespace& Store) { - Result.NamespaceStats.emplace_back(NamedNamespaceStats{.NamespaceName = std::string(NamespaceName), .Stats = Store.Stats()}); - }); + + if (IncludeNamespaceStats) + { + IterateNamespaces([&](std::string_view NamespaceName, ZenCacheNamespace& Store) { + Result.NamespaceStats.emplace_back(NamedNamespaceStats{.NamespaceName = std::string(NamespaceName), .Stats = Store.Stats()}); + }); + } + return Result; } void +ZenCacheStore::ReportMetrics(StatsDaemonClient& Statsd) +{ + const bool IncludeNamespaceStats = false; + const CacheStoreStats Now = Stats(IncludeNamespaceStats); + const CacheStoreStats& Old = m_LastReportedMetrics; + + Statsd.Meter("zen.cache_hits", Now.HitCount - Old.HitCount); + Statsd.Meter("zen.cache_misses", Now.MissCount - Old.MissCount); + Statsd.Meter("zen.cache_writes", Now.WriteCount - Old.WriteCount); + + m_LastReportedMetrics = Now; +} + +void ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig) { if (!Loggingconfig.EnableAccessLog && !Loggingconfig.EnableWriteLog) diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h index 02d0e31c0..dacf482d8 100644 --- a/src/zenserver/cache/structuredcachestore.h +++ b/src/zenserver/cache/structuredcachestore.h @@ -3,6 +3,7 @@ #pragma once #include "cachedisklayer.h" +#include "stats/statsreporter.h" #include <zencore/compactbinary.h> #include <zencore/iohash.h> @@ -18,6 +19,8 @@ namespace zen { +class StatsDaemonClient; + /****************************************************************************** /$$$$$$$$ /$$$$$$ /$$ @@ -126,7 +129,7 @@ private: */ -class ZenCacheStore final : public RefCounted +class ZenCacheStore final : public RefCounted, public StatsProvider { public: static constexpr std::string_view DefaultNamespace = @@ -161,11 +164,11 @@ public: struct CacheStoreStats { - uint64_t HitCount; - uint64_t MissCount; - uint64_t WriteCount; - uint64_t RejectedWriteCount; - uint64_t RejectedReadCount; + uint64_t HitCount = 0; + uint64_t MissCount = 0; + uint64_t WriteCount = 0; + uint64_t RejectedWriteCount = 0; + uint64_t RejectedReadCount = 0; metrics::RequestStatsSnapshot PutOps; metrics::RequestStatsSnapshot GetOps; std::vector<NamedNamespaceStats> NamespaceStats; @@ -199,7 +202,7 @@ public: const std::string_view ValueFilter) const; GcStorageSize StorageSize() const; - CacheStoreStats Stats(); + CacheStoreStats Stats(bool IncludeNamespaceStats = true); Configuration GetConfiguration() const { return m_Configuration; } void SetLoggingConfig(const Configuration::LogConfig& Loggingconfig); @@ -212,6 +215,9 @@ public: std::string_view Bucket, std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>&& Fn); + // StatsProvider + virtual void ReportMetrics(StatsDaemonClient& Statsd) override; + private: const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const; ZenCacheNamespace* GetNamespace(std::string_view Namespace); @@ -219,6 +225,7 @@ private: typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap; + CacheStoreStats m_LastReportedMetrics; const DiskWriteBlocker* m_DiskWriteBlocker = nullptr; mutable RwLock m_NamespacesLock; NamespaceMap m_Namespaces; diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 81b1dcfe2..ca438fe38 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -799,6 +799,11 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("trace.host"sv, ServerOptions.TraceHost, "tracehost"sv); LuaOptions.AddOption("trace.file"sv, ServerOptions.TraceFile, "tracefile"sv); + ////// stats + LuaOptions.AddOption("stats.enable"sv, ServerOptions.StatsConfig.Enabled); + LuaOptions.AddOption("stats.host"sv, ServerOptions.StatsConfig.StatsdHost); + LuaOptions.AddOption("stats.port"sv, ServerOptions.StatsConfig.StatsdPort); + ////// cache LuaOptions.AddOption("cache.enable"sv, ServerOptions.StructuredCacheConfig.Enabled); LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheConfig.WriteLogEnabled, "cache-write-log"); @@ -1299,6 +1304,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<std::vector<std::string>>(BucketConfigs), ""); + options.add_option("stats", + "", + "statsd", + "", + cxxopts::value<bool>(ServerOptions.StatsConfig.Enabled)->default_value("false"), + "Enable statsd reporter (localhost:8125)"); + try { auto result = options.parse(argc, argv); diff --git a/src/zenserver/config.h b/src/zenserver/config.h index 7743e536f..a1e091665 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -96,6 +96,13 @@ struct ZenObjectStoreConfig std::vector<BucketConfig> Buckets; }; +struct ZenStatsConfig +{ + bool Enabled = false; + std::string StatsdHost = "localhost"; + int StatsdPort = 8125; +}; + struct ZenStructuredCacheConfig { bool Enabled = true; @@ -116,6 +123,7 @@ struct ZenServerOptions ZenObjectStoreConfig ObjectStoreConfig; zen::HttpServerConfig HttpServerConfig; ZenStructuredCacheConfig StructuredCacheConfig; + ZenStatsConfig StatsConfig; std::filesystem::path DataDir; // Root directory for state (used for testing) std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) std::filesystem::path AbsLogFile; // Absolute path to main log file diff --git a/src/zenserver/stats/statsreporter.cpp b/src/zenserver/stats/statsreporter.cpp new file mode 100644 index 000000000..5d5ef4bfa --- /dev/null +++ b/src/zenserver/stats/statsreporter.cpp @@ -0,0 +1,60 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "statsreporter.h" + +#include <zencore/logging.h> +#include <zennet/statsdclient.h> + +namespace zen { + +StatsReporter::StatsReporter() +{ +} + +StatsReporter::~StatsReporter() +{ +} + +void +StatsReporter::Initialize(const ZenStatsConfig& Config) +{ + RwLock::ExclusiveLockScope _(m_Lock); + + if (Config.Enabled) + { + ZEN_INFO("initializing stats reporter: {}:{}", Config.StatsdHost, Config.StatsdPort) + m_Statsd = CreateStatsDaemonClient(Config.StatsdHost, gsl::narrow<uint16_t>(Config.StatsdPort)); + m_Statsd->SetMessageSize(1500, false); + } +} + +void +StatsReporter::Shutdown() +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_Statsd.reset(); +} + +void +StatsReporter::AddProvider(StatsProvider* Provider) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_Providers.push_back(Provider); +} + +void +StatsReporter::ReportStats() +{ + RwLock::ExclusiveLockScope _(m_Lock); + if (m_Statsd) + { + for (StatsProvider* Provider : m_Providers) + { + Provider->ReportMetrics(*m_Statsd); + } + + m_Statsd->Flush(); + } +} + +} // namespace zen diff --git a/src/zenserver/stats/statsreporter.h b/src/zenserver/stats/statsreporter.h new file mode 100644 index 000000000..ed6ec6c55 --- /dev/null +++ b/src/zenserver/stats/statsreporter.h @@ -0,0 +1,39 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "config.h" + +#include <zencore/thread.h> + +namespace zen { + +class StatsDaemonClient; + +class StatsProvider +{ +public: + virtual void ReportMetrics(StatsDaemonClient& Statsd) = 0; +}; + +class StatsReporter +{ +public: + StatsReporter(); + ~StatsReporter(); + + StatsReporter& operator=(const StatsReporter&) = delete; + StatsReporter(const StatsReporter&) = delete; + + void Initialize(const ZenStatsConfig& Config); + void Shutdown(); + void AddProvider(StatsProvider* Provider); + void ReportStats(); + +private: + RwLock m_Lock; + std::unique_ptr<StatsDaemonClient> m_Statsd; + std::vector<StatsProvider*> m_Providers; +}; + +} // namespace zen diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua index 329435acb..123690be2 100644 --- a/src/zenserver/xmake.lua +++ b/src/zenserver/xmake.lua @@ -2,8 +2,12 @@ target("zenserver") set_kind("binary") - add_deps("zencore", "zenhttp", "zenstore", "zenutil") - add_deps("zenvfs") + add_deps("zencore", + "zenhttp", + "zennet", + "zenstore", + "zenutil", + "zenvfs") add_headerfiles("**.h") add_files("**.cpp") add_files("zenserver.cpp", {unity_ignored = true }) diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index e5346abf0..d7fc2d069 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -41,7 +41,6 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/format.h> #include <asio.hpp> -#include <lua.hpp> ZEN_THIRD_PARTY_INCLUDES_END #include <exception> @@ -197,7 +196,14 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_Http->RegisterService(*m_AuthService); m_Http->RegisterService(m_HealthService); + m_Http->RegisterService(m_StatsService); + m_StatsReporter.Initialize(ServerOptions.StatsConfig); + if (ServerOptions.StatsConfig.Enabled) + { + EnqueueStatsReportingTimer(); + } + m_Http->RegisterService(m_StatusService); m_StatusService.RegisterHandler("status", *this); @@ -260,10 +266,12 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_Http->RegisterService(*m_ObjStoreService); } +#if ZEN_WITH_VFS m_VfsService = std::make_unique<VfsService>(); m_VfsService->AddService(Ref<ProjectStore>(m_ProjectStore)); m_VfsService->AddService(Ref<ZenCacheStore>(m_CacheStore)); m_Http->RegisterService(*m_VfsService); +#endif ZEN_INFO("initializing GC, enabled '{}', interval {}, lightweight interval {}", ServerOptions.GcConfig.Enabled, @@ -514,6 +522,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) m_Http->RegisterService(*m_StructuredCacheService); m_Http->RegisterService(*m_UpstreamService); + + m_StatsReporter.AddProvider(m_CacheStore.Get()); } void @@ -598,6 +608,8 @@ ZenServer::Cleanup() m_JobQueue->Stop(); } + m_StatsReporter.Shutdown(); + m_GcScheduler.Shutdown(); m_AdminService.reset(); m_VfsService.reset(); @@ -661,6 +673,20 @@ ZenServer::EnqueueSigIntTimer() } void +ZenServer::EnqueueStatsReportingTimer() +{ + m_StatsReportingTimer.expires_after(std::chrono::milliseconds(500)); + m_StatsReportingTimer.async_wait([this](const asio::error_code& Ec) { + if (!Ec) + { + m_StatsReporter.ReportStats(); + EnqueueStatsReportingTimer(); + } + }); + EnsureIoRunner(); +} + +void ZenServer::CheckStateMarker() { std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index 25e45ccd0..de069be69 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -33,6 +33,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include "objectstore/objectstore.h" #include "projectstore/httpprojectstore.h" #include "projectstore/projectstore.h" +#include "stats/statsreporter.h" #include "upstream/upstream.h" #include "vfs/vfsservice.h" @@ -75,6 +76,7 @@ public: void EnqueueTimer(); void EnqueueStateMarkerTimer(); void EnqueueSigIntTimer(); + void EnqueueStatsReportingTimer(); void CheckStateMarker(); void CheckSigInt(); void CheckOwnerPid(); @@ -96,6 +98,7 @@ private: asio::steady_timer m_PidCheckTimer{m_IoContext}; asio::steady_timer m_StateMakerTimer{m_IoContext}; asio::steady_timer m_SigIntTimer{m_IoContext}; + asio::steady_timer m_StatsReportingTimer{m_IoContext}; ProcessMonitor m_ProcessMonitor; NamedMutex m_ServerMutex; @@ -109,6 +112,7 @@ private: inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; } static std::string_view ToString(ServerState Value); + StatsReporter m_StatsReporter; Ref<HttpServer> m_Http; std::unique_ptr<AuthMgr> m_AuthMgr; std::unique_ptr<HttpAuthService> m_AuthService; |