aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-10-25 12:21:51 +0200
committerGitHub <[email protected]>2023-10-25 12:21:51 +0200
commit0b220255724020daea18ddb559f5edf3fdb1621b (patch)
tree7809227752c15d90111c4e600d80b59d90ad25d2 /src
parentNew rotating file logger that keeps on running regardless of errors (#495) (diff)
downloadzen-0b220255724020daea18ddb559f5edf3fdb1621b.tar.xz
zen-0b220255724020daea18ddb559f5edf3fdb1621b.zip
statsd metrics reporting (#496)
added support for reporting metrics via statsd style UDP messaging, which is supported by many monitoring solution providers this change adds reporting only of three cache related metrics (hit/miss/put) but this should be extended to include more metrics after additional evaluation
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/windows.h1
-rw-r--r--src/zennet-test/xmake.lua17
-rw-r--r--src/zennet-test/zennet-test.cpp52
-rw-r--r--src/zennet-test/zennet-test.h5
-rw-r--r--src/zennet/include/zennet/statsdclient.h40
-rw-r--r--src/zennet/include/zennet/zennet.h9
-rw-r--r--src/zennet/statsdclient.cpp463
-rw-r--r--src/zennet/xmake.lua12
-rw-r--r--src/zennet/zennet.cpp19
-rw-r--r--src/zenserver/cache/structuredcachestore.cpp28
-rw-r--r--src/zenserver/cache/structuredcachestore.h21
-rw-r--r--src/zenserver/config.cpp12
-rw-r--r--src/zenserver/config.h8
-rw-r--r--src/zenserver/stats/statsreporter.cpp60
-rw-r--r--src/zenserver/stats/statsreporter.h39
-rw-r--r--src/zenserver/xmake.lua8
-rw-r--r--src/zenserver/zenserver.cpp28
-rw-r--r--src/zenserver/zenserver.h4
18 files changed, 812 insertions, 14 deletions
diff --git a/src/zencore/include/zencore/windows.h b/src/zencore/include/zencore/windows.h
index 6c238f845..0943a85ea 100644
--- a/src/zencore/include/zencore/windows.h
+++ b/src/zencore/include/zencore/windows.h
@@ -23,6 +23,7 @@ struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax erro
# endif
# include <windows.h>
# undef GetObject
+# undef SendMessage
ZEN_THIRD_PARTY_INCLUDES_END
diff --git a/src/zennet-test/xmake.lua b/src/zennet-test/xmake.lua
new file mode 100644
index 000000000..bbede7739
--- /dev/null
+++ b/src/zennet-test/xmake.lua
@@ -0,0 +1,17 @@
+-- Copyright Epic Games, Inc. All Rights Reserved.
+
+target("zennet-test")
+ set_kind("binary")
+ set_group("tests")
+ add_headerfiles("**.h")
+ add_files("*.cpp")
+ add_deps("zencore", "zenutil", "zennet")
+ add_packages("vcpkg::mimalloc")
+ add_packages("vcpkg::catch2")
+ add_packages("vcpkg::doctest")
+
+ if is_plat("macosx") then
+ add_ldflags("-framework CoreFoundation")
+ add_ldflags("-framework Security")
+ add_ldflags("-framework SystemConfiguration")
+ end
diff --git a/src/zennet-test/zennet-test.cpp b/src/zennet-test/zennet-test.cpp
new file mode 100644
index 000000000..b133f246c
--- /dev/null
+++ b/src/zennet-test/zennet-test.cpp
@@ -0,0 +1,52 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zennet-test.h"
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/timer.h>
+#include <zennet/zennet.h>
+#include <zenutil/zenserverprocess.h>
+
+#if ZEN_USE_MIMALLOC
+ZEN_THIRD_PARTY_INCLUDES_START
+# include <mimalloc.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+# define ZEN_TEST_WITH_RUNNER 1
+# include <zencore/testing.h>
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+
+using namespace std::literals;
+
+zen::ZenServerEnvironment TestEnv;
+
+int
+main([[maybe_unused]] int argc, [[maybe_unused]] char** argv)
+{
+#if ZEN_USE_MIMALLOC
+ mi_version();
+#endif
+
+#if ZEN_WITH_TESTS
+ zen::zennet_forcelinktests();
+
+ zen::logging::InitializeLogging();
+
+ std::filesystem::path ProgramBaseDir = std::filesystem::path(argv[0]).parent_path();
+ std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test";
+
+ TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir);
+
+ ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir);
+
+ return ZEN_RUN_TESTS(argc, argv);
+#else
+ return 0;
+#endif
+}
diff --git a/src/zennet-test/zennet-test.h b/src/zennet-test/zennet-test.h
new file mode 100644
index 000000000..f84fd52ce
--- /dev/null
+++ b/src/zennet-test/zennet-test.h
@@ -0,0 +1,5 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/zenserverprocess.h>
+
+extern zen::ZenServerEnvironment TestEnv;
diff --git a/src/zennet/include/zennet/statsdclient.h b/src/zennet/include/zennet/statsdclient.h
new file mode 100644
index 000000000..a429286ae
--- /dev/null
+++ b/src/zennet/include/zennet/statsdclient.h
@@ -0,0 +1,40 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/zencore.h>
+
+#include <memory>
+#include <string_view>
+
+namespace zen {
+
+class StatsTransportBase
+{
+public:
+ virtual ~StatsTransportBase() = 0;
+ virtual void SendMessage(const void* Data, size_t Size) = 0;
+};
+
+class StatsDaemonClient
+{
+public:
+ virtual ~StatsDaemonClient() = 0;
+
+ virtual void SetMessageSize(size_t MessageSize, bool UseThreads) = 0;
+ virtual void Flush() = 0;
+
+ virtual void Increment(std::string_view Metric) = 0;
+ virtual void Decrement(std::string_view Metric) = 0;
+ virtual void Count(std::string_view Metric, int64_t CountDelta) = 0;
+ virtual void Gauge(std::string_view Metric, uint64_t CurrentValue) = 0;
+ virtual void Meter(std::string_view Metric, uint64_t IncrementValue) = 0;
+
+ // Not (yet) implemented: Set,Timing
+};
+
+std::unique_ptr<StatsDaemonClient> CreateStatsDaemonClient(std::string_view TargetHost = "localhost", uint16_t TargetPort = 8125);
+
+void statsd_forcelink();
+
+} // namespace zen
diff --git a/src/zennet/include/zennet/zennet.h b/src/zennet/include/zennet/zennet.h
new file mode 100644
index 000000000..11b368cd1
--- /dev/null
+++ b/src/zennet/include/zennet/zennet.h
@@ -0,0 +1,9 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+namespace zen {
+
+void zennet_forcelinktests();
+
+}
diff --git a/src/zennet/statsdclient.cpp b/src/zennet/statsdclient.cpp
new file mode 100644
index 000000000..fe5ca4dda
--- /dev/null
+++ b/src/zennet/statsdclient.cpp
@@ -0,0 +1,463 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zennet/statsdclient.h>
+
+#include <zencore/logging.h>
+#include <zencore/string.h>
+#include <zencore/testing.h>
+#include <zencore/thread.h>
+
+#include <deque>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <zencore/windows.h>
+#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+StatsTransportBase::~StatsTransportBase() = default;
+StatsDaemonClient::~StatsDaemonClient() = default;
+
+//////////////////////////////////////////////////////////////////////////
+
+class StatsdUdpClient : public StatsTransportBase
+{
+ using udp = asio::ip::udp;
+
+public:
+ StatsdUdpClient(std::string_view TargetHost, uint16_t TargetPort) : m_TargetHost(TargetHost), m_TargetPort(TargetPort)
+ {
+ InitializeSocket();
+ }
+ ~StatsdUdpClient() { Shutdown(); }
+
+ virtual void SendMessage(const void* Data, size_t Size) override
+ {
+ if (m_Socket)
+ {
+ std::error_code Ec;
+ m_Socket->send_to(asio::buffer(Data, Size), m_ServerAddr, 0, Ec);
+ }
+ }
+
+private:
+ asio::io_service m_IoService;
+ std::string m_TargetHost;
+ uint16_t m_TargetPort;
+
+ udp::endpoint m_ServerAddr;
+ std::unique_ptr<udp::socket> m_Socket;
+
+ void InitializeSocket();
+ void Shutdown();
+};
+
+void
+StatsdUdpClient::InitializeSocket()
+{
+ std::error_code Ec;
+ m_Socket = std::make_unique<udp::socket>(m_IoService);
+ m_Socket->open(udp::v4(), Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("StatsdUdpClient::InitializeSocket socket creation failed: {}", Ec.message());
+
+ return m_Socket.reset();
+ }
+
+ udp::resolver DnsResolver(m_IoService);
+ udp::resolver::query DnsQuery(udp::v4(), m_TargetHost, fmt::format("{}", m_TargetPort));
+ udp::resolver::iterator It = DnsResolver.resolve(DnsQuery, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("StatsdUdpClient::InitializeSocket resolve of '{}:{}' failed: {}", m_TargetHost, m_TargetPort, Ec.message());
+
+ return m_Socket.reset();
+ }
+
+ m_ServerAddr = *It;
+}
+
+void
+StatsdUdpClient::Shutdown()
+{
+ if (m_Socket)
+ {
+ m_Socket->shutdown(asio::socket_base::shutdown_send);
+ m_Socket.reset();
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+class StatsdMemoryClient : public StatsTransportBase
+{
+ using MessageBuffer_t = std::vector<uint8_t>;
+
+public:
+ StatsdMemoryClient() {}
+ ~StatsdMemoryClient() {}
+
+ virtual void SendMessage(const void* Data, size_t Size) override
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Messages.emplace_back(reinterpret_cast<const uint8_t*>(Data), reinterpret_cast<const uint8_t*>(Data) + Size);
+ }
+
+ std::deque<MessageBuffer_t> GetMessages()
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ return std::move(m_Messages);
+ }
+
+private:
+ RwLock m_Lock;
+ std::deque<MessageBuffer_t> m_Messages;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+class StatsMessageBuilder
+{
+ using udp = asio::ip::udp;
+
+public:
+ StatsMessageBuilder(StatsTransportBase& Transport) : m_Transport(Transport) { m_AcceptMessages.test_and_set(); }
+
+ ~StatsMessageBuilder()
+ {
+ if (m_MessagingThread.joinable())
+ {
+ m_AcceptMessages.clear();
+ m_ShutdownRequested.Set();
+ m_MessagingThread.join();
+ }
+ else
+ {
+ FlushMessageBuffer();
+ }
+ }
+
+ void SetUseBackgroundThread(bool UseBackgroundThread) { m_UseBackgroundThread = UseBackgroundThread; }
+
+ void SetMessageSize(size_t MessageSize)
+ {
+ m_MessageSize = MessageSize;
+
+ if (m_UseBackgroundThread && !m_MessagingThread.joinable())
+ {
+ m_MessagingThread = std::thread([this] {
+ SetCurrentThreadName("statsd_reporter");
+
+ ZEN_INFO("statsd reporting thread started");
+
+ bool ShutdownRequested = false;
+
+ do
+ {
+ ShutdownRequested = m_ShutdownRequested.Wait(100);
+
+ FlushAll();
+ } while (!ShutdownRequested);
+
+ FlushAll();
+
+ ZEN_INFO("statsd reporting thread exiting");
+ });
+ }
+ }
+
+ void SendMetric(std::string_view MetricMessage)
+ {
+ if (m_AcceptMessages.test() == false)
+ {
+ return;
+ }
+
+ if (MetricMessage.size() >= m_MessageSize)
+ {
+ return SendMessage(MetricMessage);
+ }
+
+ RwLock::ExclusiveLockScope _(m_MessageLock);
+
+ if ((m_MessageBuffer.size() + MetricMessage.size()) >= m_MessageSize)
+ {
+ // This will clear the message buffer
+ EnqueueOrSendCurrentMessage(_);
+ }
+
+ if (!m_MessageBuffer.empty())
+ {
+ m_MessageBuffer.push_back(uint8_t('\n'));
+ }
+
+ m_MessageBuffer.insert(m_MessageBuffer.end(),
+ reinterpret_cast<const uint8_t*>(MetricMessage.data()),
+ reinterpret_cast<const uint8_t*>(MetricMessage.data() + MetricMessage.size()));
+ }
+
+ void FlushMessageBuffer();
+ void FlushQueue();
+
+ void FlushAll()
+ {
+ FlushQueue();
+ FlushMessageBuffer();
+ }
+
+private:
+ StatsTransportBase& m_Transport;
+ std::thread m_MessagingThread;
+ Event m_ShutdownRequested;
+ std::atomic_flag m_AcceptMessages;
+ RwLock m_MessageLock;
+ std::vector<uint8_t> m_MessageBuffer;
+ size_t m_MessageSize = 0;
+ bool m_UseBackgroundThread = true;
+
+ std::deque<std::vector<uint8_t>> m_MessageQueue;
+
+ void EnqueueOrSendCurrentMessage(RwLock::ExclusiveLockScope&)
+ {
+ if (m_MessagingThread.joinable())
+ {
+ m_MessageQueue.emplace_back(std::move(m_MessageBuffer));
+ m_MessageBuffer.reserve(m_MessageSize);
+ }
+ else
+ {
+ m_Transport.SendMessage(m_MessageBuffer.data(), m_MessageBuffer.size());
+ m_MessageBuffer.clear();
+ }
+ }
+
+ void SendMessage(std::string_view Message) { m_Transport.SendMessage(Message.data(), Message.size()); }
+};
+
+void
+StatsMessageBuilder::FlushMessageBuffer()
+{
+ RwLock::ExclusiveLockScope _(m_MessageLock);
+
+ if (m_MessageBuffer.empty())
+ return;
+
+ m_Transport.SendMessage(m_MessageBuffer.data(), m_MessageBuffer.size());
+ m_MessageBuffer.clear();
+}
+
+void
+StatsMessageBuilder::FlushQueue()
+{
+ std::deque<std::vector<uint8_t>> Queue;
+
+ {
+ RwLock::ExclusiveLockScope _(m_MessageLock);
+
+ std::swap(Queue, m_MessageQueue);
+ }
+
+ while (!Queue.empty())
+ {
+ std::vector<uint8_t>& Message = Queue.front();
+
+ m_Transport.SendMessage(Message.data(), Message.size());
+
+ Queue.pop_front();
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+class StatsDaemonClientImpl : public StatsDaemonClient
+{
+public:
+ StatsDaemonClientImpl(std::unique_ptr<StatsTransportBase>&& Transport);
+ ~StatsDaemonClientImpl();
+
+ virtual void Decrement(std::string_view Metric) override;
+ virtual void Increment(std::string_view Metric) override;
+ virtual void Count(std::string_view Metric, int64_t CountDelta) override;
+ virtual void Gauge(std::string_view Metric, uint64_t CurrentValue) override;
+ virtual void Meter(std::string_view Metric, uint64_t IncrementValue) override;
+ virtual void SetMessageSize(size_t MessageSize, bool UseThreads) override;
+ virtual void Flush() override;
+
+private:
+ std::unique_ptr<StatsTransportBase> m_Transport;
+ StatsMessageBuilder m_MessageBuilder;
+ size_t m_MessageSize = 0;
+};
+
+std::unique_ptr<StatsDaemonClient>
+CreateStatsDaemonClient(std::string_view TargetHost, uint16_t TargetPort)
+{
+ return std::make_unique<StatsDaemonClientImpl>(std::make_unique<StatsdUdpClient>(TargetHost, TargetPort));
+}
+
+std::unique_ptr<StatsDaemonClient>
+CreateStatsDaemonClient(std::unique_ptr<StatsTransportBase>&& Transport)
+{
+ return std::make_unique<StatsDaemonClientImpl>(std::move(Transport));
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+StatsDaemonClientImpl::StatsDaemonClientImpl(std::unique_ptr<StatsTransportBase>&& Transport)
+: m_Transport(std::move(Transport))
+, m_MessageBuilder(*m_Transport)
+{
+}
+
+StatsDaemonClientImpl::~StatsDaemonClientImpl()
+{
+}
+
+void
+StatsDaemonClientImpl::Increment(std::string_view Metric)
+{
+ ExtendableStringBuilder<128> MetricMessage;
+ MetricMessage << Metric << ":1|c";
+ m_MessageBuilder.SendMetric(MetricMessage);
+}
+
+void
+StatsDaemonClientImpl::Decrement(std::string_view Metric)
+{
+ ExtendableStringBuilder<128> MetricMessage;
+ MetricMessage << Metric << ":-1|c";
+ m_MessageBuilder.SendMetric(MetricMessage);
+}
+
+void
+StatsDaemonClientImpl::Count(std::string_view Metric, int64_t CountDelta)
+{
+ ExtendableStringBuilder<128> MetricMessage;
+ MetricMessage << Metric << ":" << CountDelta << "|c";
+ m_MessageBuilder.SendMetric(MetricMessage);
+}
+
+void
+StatsDaemonClientImpl::Gauge(std::string_view Metric, uint64_t CurrentValue)
+{
+ ExtendableStringBuilder<128> MetricMessage;
+ MetricMessage << Metric << ":" << CurrentValue << "|g";
+ m_MessageBuilder.SendMetric(MetricMessage);
+}
+
+void
+StatsDaemonClientImpl::Meter(std::string_view Metric, uint64_t IncrementValue)
+{
+ ExtendableStringBuilder<128> MetricMessage;
+ MetricMessage << Metric << ":" << IncrementValue << "|m";
+ m_MessageBuilder.SendMetric(MetricMessage);
+}
+
+void
+StatsDaemonClientImpl::SetMessageSize(size_t MessageSize, bool UseThreads)
+{
+ m_MessageBuilder.SetUseBackgroundThread(UseThreads);
+ m_MessageBuilder.SetMessageSize(MessageSize);
+}
+
+void
+StatsDaemonClientImpl::Flush()
+{
+ m_MessageBuilder.FlushAll();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+#if ZEN_WITH_TESTS
+
+void
+statsd_forcelink()
+{
+}
+
+TEST_CASE("zennet.statsd.emit")
+{
+ // auto Client = CreateStatsDaemonClient("localhost", 8125);
+ auto MemoryClient = std::make_unique<StatsdMemoryClient>();
+ StatsdMemoryClient* RawMemoryClient = MemoryClient.get();
+ auto Client = CreateStatsDaemonClient(std::move(MemoryClient));
+
+ Client->Count("test.counter1", 1);
+ Client->Count("test.counter2", 2);
+ Client->Meter("test.meter1", 42);
+ Client->Meter("test.meter2", 42);
+ Client->Meter("test.meter2", 42);
+ Client->Increment("test.count");
+ Client->Decrement("test.count");
+ Client->Increment("test.count");
+ Client->Gauge("test.gauge", 1);
+ Client->Gauge("test.gauge", 99);
+
+ const std::string_view FormattedMessages[] = {"test.counter1:1|c",
+ "test.counter2:2|c",
+ "test.meter1:42|m",
+ "test.meter2:42|m",
+ "test.meter2:42|m",
+ "test.count:1|c",
+ "test.count:-1|c",
+ "test.count:1|c",
+ "test.gauge:1|g",
+ "test.gauge:99|g"};
+
+ auto Messages = RawMemoryClient->GetMessages();
+
+ CHECK_EQ(Messages.size(), 10);
+
+ for (int i = 0; i < 10; ++i)
+ {
+ std::string_view Message(reinterpret_cast<const char*>(Messages[i].data()), Messages[i].size());
+ CHECK_EQ(Message, FormattedMessages[i]);
+ }
+}
+
+TEST_CASE("zennet.statsd.batch")
+{
+ // auto Client = CreateStatsDaemonClient("localhost", 8125);
+ auto MemoryClient = std::make_unique<StatsdMemoryClient>();
+ StatsdMemoryClient* RawMemoryClient = MemoryClient.get();
+ auto Client = CreateStatsDaemonClient(std::move(MemoryClient));
+
+ const int MaxMessageSize = 1000;
+ const bool UseThreads = false;
+ Client->SetMessageSize(MaxMessageSize, UseThreads);
+
+ Client->Count("test.counter1", 1);
+ Client->Count("test.counter2", 2);
+ Client->Meter("test.meter1", 42);
+ Client->Meter("test.meter2", 42);
+ Client->Meter("test.meter2", 42);
+ Client->Increment("test.count");
+ Client->Decrement("test.count");
+ Client->Increment("test.count");
+ Client->Gauge("test.gauge", 1);
+ Client->Gauge("test.gauge", 99);
+
+ for (int i = 0; i < 1000; ++i)
+ {
+ Client->Increment("test.count1000");
+ }
+
+ Client->Flush();
+
+ auto Messages = RawMemoryClient->GetMessages();
+
+ CHECK_EQ(Messages.size(), 20);
+
+ for (const auto& Message : Messages)
+ {
+ CHECK(Message.size() <= MaxMessageSize);
+ }
+}
+
+#endif
+
+} // namespace zen
diff --git a/src/zennet/xmake.lua b/src/zennet/xmake.lua
new file mode 100644
index 000000000..9b240e024
--- /dev/null
+++ b/src/zennet/xmake.lua
@@ -0,0 +1,12 @@
+-- Copyright Epic Games, Inc. All Rights Reserved.
+
+target('zennet')
+ set_kind("static")
+ set_group("libs")
+ add_headerfiles("**.h")
+ add_files("**.cpp")
+ add_includedirs("include", {public=true})
+ add_deps("zencore", "zenutil")
+ add_packages(
+ "vcpkg::gsl-lite"
+ )
diff --git a/src/zennet/zennet.cpp b/src/zennet/zennet.cpp
new file mode 100644
index 000000000..30a012a3b
--- /dev/null
+++ b/src/zennet/zennet.cpp
@@ -0,0 +1,19 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zennet/zennet.h"
+
+#include <zencore/testing.h>
+
+#include <zennet/statsdclient.h>
+
+namespace zen {
+
+void
+zennet_forcelinktests()
+{
+#if ZEN_WITH_TESTS
+ zen::statsd_forcelink();
+#endif
+}
+
+} // namespace zen
diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp
index 89123a70f..6fab14eee 100644
--- a/src/zenserver/cache/structuredcachestore.cpp
+++ b/src/zenserver/cache/structuredcachestore.cpp
@@ -16,6 +16,7 @@
#include <zencore/timer.h>
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
+#include <zennet/statsdclient.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/cache/cache.h>
@@ -663,7 +664,7 @@ ZenCacheStore::StorageSize() const
}
ZenCacheStore::CacheStoreStats
-ZenCacheStore::Stats()
+ZenCacheStore::Stats(bool IncludeNamespaceStats)
{
ZenCacheStore::CacheStoreStats Result{.HitCount = m_HitCount,
.MissCount = m_MissCount,
@@ -672,13 +673,32 @@ ZenCacheStore::Stats()
.RejectedReadCount = m_RejectedReadCount,
.PutOps = m_PutOps.Snapshot(),
.GetOps = m_GetOps.Snapshot()};
- IterateNamespaces([&](std::string_view NamespaceName, ZenCacheNamespace& Store) {
- Result.NamespaceStats.emplace_back(NamedNamespaceStats{.NamespaceName = std::string(NamespaceName), .Stats = Store.Stats()});
- });
+
+ if (IncludeNamespaceStats)
+ {
+ IterateNamespaces([&](std::string_view NamespaceName, ZenCacheNamespace& Store) {
+ Result.NamespaceStats.emplace_back(NamedNamespaceStats{.NamespaceName = std::string(NamespaceName), .Stats = Store.Stats()});
+ });
+ }
+
return Result;
}
void
+ZenCacheStore::ReportMetrics(StatsDaemonClient& Statsd)
+{
+ const bool IncludeNamespaceStats = false;
+ const CacheStoreStats Now = Stats(IncludeNamespaceStats);
+ const CacheStoreStats& Old = m_LastReportedMetrics;
+
+ Statsd.Meter("zen.cache_hits", Now.HitCount - Old.HitCount);
+ Statsd.Meter("zen.cache_misses", Now.MissCount - Old.MissCount);
+ Statsd.Meter("zen.cache_writes", Now.WriteCount - Old.WriteCount);
+
+ m_LastReportedMetrics = Now;
+}
+
+void
ZenCacheStore::SetLoggingConfig(const Configuration::LogConfig& Loggingconfig)
{
if (!Loggingconfig.EnableAccessLog && !Loggingconfig.EnableWriteLog)
diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h
index 02d0e31c0..dacf482d8 100644
--- a/src/zenserver/cache/structuredcachestore.h
+++ b/src/zenserver/cache/structuredcachestore.h
@@ -3,6 +3,7 @@
#pragma once
#include "cachedisklayer.h"
+#include "stats/statsreporter.h"
#include <zencore/compactbinary.h>
#include <zencore/iohash.h>
@@ -18,6 +19,8 @@
namespace zen {
+class StatsDaemonClient;
+
/******************************************************************************
/$$$$$$$$ /$$$$$$ /$$
@@ -126,7 +129,7 @@ private:
*/
-class ZenCacheStore final : public RefCounted
+class ZenCacheStore final : public RefCounted, public StatsProvider
{
public:
static constexpr std::string_view DefaultNamespace =
@@ -161,11 +164,11 @@ public:
struct CacheStoreStats
{
- uint64_t HitCount;
- uint64_t MissCount;
- uint64_t WriteCount;
- uint64_t RejectedWriteCount;
- uint64_t RejectedReadCount;
+ uint64_t HitCount = 0;
+ uint64_t MissCount = 0;
+ uint64_t WriteCount = 0;
+ uint64_t RejectedWriteCount = 0;
+ uint64_t RejectedReadCount = 0;
metrics::RequestStatsSnapshot PutOps;
metrics::RequestStatsSnapshot GetOps;
std::vector<NamedNamespaceStats> NamespaceStats;
@@ -199,7 +202,7 @@ public:
const std::string_view ValueFilter) const;
GcStorageSize StorageSize() const;
- CacheStoreStats Stats();
+ CacheStoreStats Stats(bool IncludeNamespaceStats = true);
Configuration GetConfiguration() const { return m_Configuration; }
void SetLoggingConfig(const Configuration::LogConfig& Loggingconfig);
@@ -212,6 +215,9 @@ public:
std::string_view Bucket,
std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>&& Fn);
+ // StatsProvider
+ virtual void ReportMetrics(StatsDaemonClient& Statsd) override;
+
private:
const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const;
ZenCacheNamespace* GetNamespace(std::string_view Namespace);
@@ -219,6 +225,7 @@ private:
typedef std::unordered_map<std::string, std::unique_ptr<ZenCacheNamespace>> NamespaceMap;
+ CacheStoreStats m_LastReportedMetrics;
const DiskWriteBlocker* m_DiskWriteBlocker = nullptr;
mutable RwLock m_NamespacesLock;
NamespaceMap m_Namespaces;
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 81b1dcfe2..ca438fe38 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -799,6 +799,11 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("trace.host"sv, ServerOptions.TraceHost, "tracehost"sv);
LuaOptions.AddOption("trace.file"sv, ServerOptions.TraceFile, "tracefile"sv);
+ ////// stats
+ LuaOptions.AddOption("stats.enable"sv, ServerOptions.StatsConfig.Enabled);
+ LuaOptions.AddOption("stats.host"sv, ServerOptions.StatsConfig.StatsdHost);
+ LuaOptions.AddOption("stats.port"sv, ServerOptions.StatsConfig.StatsdPort);
+
////// cache
LuaOptions.AddOption("cache.enable"sv, ServerOptions.StructuredCacheConfig.Enabled);
LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheConfig.WriteLogEnabled, "cache-write-log");
@@ -1299,6 +1304,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<std::vector<std::string>>(BucketConfigs),
"");
+ options.add_option("stats",
+ "",
+ "statsd",
+ "",
+ cxxopts::value<bool>(ServerOptions.StatsConfig.Enabled)->default_value("false"),
+ "Enable statsd reporter (localhost:8125)");
+
try
{
auto result = options.parse(argc, argv);
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index 7743e536f..a1e091665 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -96,6 +96,13 @@ struct ZenObjectStoreConfig
std::vector<BucketConfig> Buckets;
};
+struct ZenStatsConfig
+{
+ bool Enabled = false;
+ std::string StatsdHost = "localhost";
+ int StatsdPort = 8125;
+};
+
struct ZenStructuredCacheConfig
{
bool Enabled = true;
@@ -116,6 +123,7 @@ struct ZenServerOptions
ZenObjectStoreConfig ObjectStoreConfig;
zen::HttpServerConfig HttpServerConfig;
ZenStructuredCacheConfig StructuredCacheConfig;
+ ZenStatsConfig StatsConfig;
std::filesystem::path DataDir; // Root directory for state (used for testing)
std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
std::filesystem::path AbsLogFile; // Absolute path to main log file
diff --git a/src/zenserver/stats/statsreporter.cpp b/src/zenserver/stats/statsreporter.cpp
new file mode 100644
index 000000000..5d5ef4bfa
--- /dev/null
+++ b/src/zenserver/stats/statsreporter.cpp
@@ -0,0 +1,60 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "statsreporter.h"
+
+#include <zencore/logging.h>
+#include <zennet/statsdclient.h>
+
+namespace zen {
+
+StatsReporter::StatsReporter()
+{
+}
+
+StatsReporter::~StatsReporter()
+{
+}
+
+void
+StatsReporter::Initialize(const ZenStatsConfig& Config)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (Config.Enabled)
+ {
+ ZEN_INFO("initializing stats reporter: {}:{}", Config.StatsdHost, Config.StatsdPort)
+ m_Statsd = CreateStatsDaemonClient(Config.StatsdHost, gsl::narrow<uint16_t>(Config.StatsdPort));
+ m_Statsd->SetMessageSize(1500, false);
+ }
+}
+
+void
+StatsReporter::Shutdown()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Statsd.reset();
+}
+
+void
+StatsReporter::AddProvider(StatsProvider* Provider)
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Providers.push_back(Provider);
+}
+
+void
+StatsReporter::ReportStats()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (m_Statsd)
+ {
+ for (StatsProvider* Provider : m_Providers)
+ {
+ Provider->ReportMetrics(*m_Statsd);
+ }
+
+ m_Statsd->Flush();
+ }
+}
+
+} // namespace zen
diff --git a/src/zenserver/stats/statsreporter.h b/src/zenserver/stats/statsreporter.h
new file mode 100644
index 000000000..ed6ec6c55
--- /dev/null
+++ b/src/zenserver/stats/statsreporter.h
@@ -0,0 +1,39 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "config.h"
+
+#include <zencore/thread.h>
+
+namespace zen {
+
+class StatsDaemonClient;
+
+class StatsProvider
+{
+public:
+ virtual void ReportMetrics(StatsDaemonClient& Statsd) = 0;
+};
+
+class StatsReporter
+{
+public:
+ StatsReporter();
+ ~StatsReporter();
+
+ StatsReporter& operator=(const StatsReporter&) = delete;
+ StatsReporter(const StatsReporter&) = delete;
+
+ void Initialize(const ZenStatsConfig& Config);
+ void Shutdown();
+ void AddProvider(StatsProvider* Provider);
+ void ReportStats();
+
+private:
+ RwLock m_Lock;
+ std::unique_ptr<StatsDaemonClient> m_Statsd;
+ std::vector<StatsProvider*> m_Providers;
+};
+
+} // namespace zen
diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua
index 329435acb..123690be2 100644
--- a/src/zenserver/xmake.lua
+++ b/src/zenserver/xmake.lua
@@ -2,8 +2,12 @@
target("zenserver")
set_kind("binary")
- add_deps("zencore", "zenhttp", "zenstore", "zenutil")
- add_deps("zenvfs")
+ add_deps("zencore",
+ "zenhttp",
+ "zennet",
+ "zenstore",
+ "zenutil",
+ "zenvfs")
add_headerfiles("**.h")
add_files("**.cpp")
add_files("zenserver.cpp", {unity_ignored = true })
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index e5346abf0..d7fc2d069 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -41,7 +41,6 @@
ZEN_THIRD_PARTY_INCLUDES_START
#include <fmt/format.h>
#include <asio.hpp>
-#include <lua.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
#include <exception>
@@ -197,7 +196,14 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
m_Http->RegisterService(*m_AuthService);
m_Http->RegisterService(m_HealthService);
+
m_Http->RegisterService(m_StatsService);
+ m_StatsReporter.Initialize(ServerOptions.StatsConfig);
+ if (ServerOptions.StatsConfig.Enabled)
+ {
+ EnqueueStatsReportingTimer();
+ }
+
m_Http->RegisterService(m_StatusService);
m_StatusService.RegisterHandler("status", *this);
@@ -260,10 +266,12 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
m_Http->RegisterService(*m_ObjStoreService);
}
+#if ZEN_WITH_VFS
m_VfsService = std::make_unique<VfsService>();
m_VfsService->AddService(Ref<ProjectStore>(m_ProjectStore));
m_VfsService->AddService(Ref<ZenCacheStore>(m_CacheStore));
m_Http->RegisterService(*m_VfsService);
+#endif
ZEN_INFO("initializing GC, enabled '{}', interval {}, lightweight interval {}",
ServerOptions.GcConfig.Enabled,
@@ -514,6 +522,8 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
m_Http->RegisterService(*m_StructuredCacheService);
m_Http->RegisterService(*m_UpstreamService);
+
+ m_StatsReporter.AddProvider(m_CacheStore.Get());
}
void
@@ -598,6 +608,8 @@ ZenServer::Cleanup()
m_JobQueue->Stop();
}
+ m_StatsReporter.Shutdown();
+
m_GcScheduler.Shutdown();
m_AdminService.reset();
m_VfsService.reset();
@@ -661,6 +673,20 @@ ZenServer::EnqueueSigIntTimer()
}
void
+ZenServer::EnqueueStatsReportingTimer()
+{
+ m_StatsReportingTimer.expires_after(std::chrono::milliseconds(500));
+ m_StatsReportingTimer.async_wait([this](const asio::error_code& Ec) {
+ if (!Ec)
+ {
+ m_StatsReporter.ReportStats();
+ EnqueueStatsReportingTimer();
+ }
+ });
+ EnsureIoRunner();
+}
+
+void
ZenServer::CheckStateMarker()
{
std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker";
diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h
index 25e45ccd0..de069be69 100644
--- a/src/zenserver/zenserver.h
+++ b/src/zenserver/zenserver.h
@@ -33,6 +33,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include "objectstore/objectstore.h"
#include "projectstore/httpprojectstore.h"
#include "projectstore/projectstore.h"
+#include "stats/statsreporter.h"
#include "upstream/upstream.h"
#include "vfs/vfsservice.h"
@@ -75,6 +76,7 @@ public:
void EnqueueTimer();
void EnqueueStateMarkerTimer();
void EnqueueSigIntTimer();
+ void EnqueueStatsReportingTimer();
void CheckStateMarker();
void CheckSigInt();
void CheckOwnerPid();
@@ -96,6 +98,7 @@ private:
asio::steady_timer m_PidCheckTimer{m_IoContext};
asio::steady_timer m_StateMakerTimer{m_IoContext};
asio::steady_timer m_SigIntTimer{m_IoContext};
+ asio::steady_timer m_StatsReportingTimer{m_IoContext};
ProcessMonitor m_ProcessMonitor;
NamedMutex m_ServerMutex;
@@ -109,6 +112,7 @@ private:
inline void SetNewState(ServerState NewState) { m_CurrentState = NewState; }
static std::string_view ToString(ServerState Value);
+ StatsReporter m_StatsReporter;
Ref<HttpServer> m_Http;
std::unique_ptr<AuthMgr> m_AuthMgr;
std::unique_ptr<HttpAuthService> m_AuthService;