aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/monitoring/httpstats.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhttp/monitoring/httpstats.cpp')
-rw-r--r--src/zenhttp/monitoring/httpstats.cpp195
1 files changed, 194 insertions, 1 deletions
diff --git a/src/zenhttp/monitoring/httpstats.cpp b/src/zenhttp/monitoring/httpstats.cpp
index b097a0d3f..2370def0c 100644
--- a/src/zenhttp/monitoring/httpstats.cpp
+++ b/src/zenhttp/monitoring/httpstats.cpp
@@ -3,15 +3,57 @@
#include "zenhttp/httpstats.h"
#include <zencore/compactbinarybuilder.h>
+#include <zencore/string.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
namespace zen {
-HttpStatsService::HttpStatsService() : m_Log(logging::Get("stats"))
+HttpStatsService::HttpStatsService(bool EnableWebSockets) : m_Log(logging::Get("stats"))
{
+ if (EnableWebSockets)
+ {
+ m_PushEnabled.store(true);
+ m_PushThread = std::thread([this] { PushThreadFunction(); });
+ }
+}
+
+HttpStatsService::HttpStatsService(asio::io_context& IoContext, bool EnableWebSockets) : m_Log(logging::Get("stats"))
+{
+ if (EnableWebSockets)
+ {
+ m_PushEnabled.store(true);
+ m_PushTimer = std::make_unique<asio::steady_timer>(IoContext);
+ EnqueuePushTimer();
+ }
}
HttpStatsService::~HttpStatsService()
{
+ Shutdown();
+}
+
+void
+HttpStatsService::Shutdown()
+{
+ if (!m_PushEnabled.exchange(false))
+ {
+ return;
+ }
+
+ if (m_PushTimer)
+ {
+ m_PushTimer->cancel();
+ m_PushTimer.reset();
+ }
+
+ if (m_PushThread.joinable())
+ {
+ m_PushEvent.Set();
+ m_PushThread.join();
+ }
+
+ m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.clear(); });
}
const char*
@@ -39,6 +81,7 @@ HttpStatsService::UnregisterHandler(std::string_view Id, IHttpStatsProvider& Pro
void
HttpStatsService::HandleRequest(HttpServerRequest& Request)
{
+ ZEN_TRACE_CPU("HttpStatsService::HandleRequest");
using namespace std::literals;
std::string_view Key = Request.RelativeUri();
@@ -89,4 +132,154 @@ HttpStatsService::HandleRequest(HttpServerRequest& Request)
}
}
+//////////////////////////////////////////////////////////////////////////
+//
+// IWebSocketHandler
+//
+
+void
+HttpStatsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection)
+{
+ ZEN_TRACE_CPU("HttpStatsService::OnWebSocketOpen");
+ ZEN_INFO("Stats WebSocket client connected");
+
+ m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
+
+ // Send initial state immediately
+ if (m_PushThread.joinable())
+ {
+ m_PushEvent.Set();
+ }
+}
+
+void
+HttpStatsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/)
+{
+ // No client-to-server messages expected
+}
+
+void
+HttpStatsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason)
+{
+ ZEN_TRACE_CPU("HttpStatsService::OnWebSocketClose");
+ ZEN_INFO("Stats WebSocket client disconnected (code {})", Code);
+
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) {
+ return C.Get() == &Conn;
+ });
+ m_WsConnections.erase(It, m_WsConnections.end());
+ });
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Stats broadcast
+//
+
+void
+HttpStatsService::BroadcastStats()
+{
+ ZEN_TRACE_CPU("HttpStatsService::BroadcastStats");
+ std::vector<Ref<WebSocketConnection>> Connections;
+ m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; });
+
+ if (Connections.empty())
+ {
+ return;
+ }
+
+ // Collect stats from all providers
+ ExtendableStringBuilder<4096> JsonBuilder;
+ JsonBuilder.Append("{");
+
+ bool First = true;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ for (auto& [Id, Provider] : m_Providers)
+ {
+ CbObject Stats = Provider->CollectStats();
+ if (!Stats)
+ {
+ continue;
+ }
+
+ if (!First)
+ {
+ JsonBuilder.Append(",");
+ }
+ First = false;
+
+ // Emit as "provider_id": { ... }
+ JsonBuilder.Append("\"");
+ JsonBuilder.Append(Id);
+ JsonBuilder.Append("\":");
+
+ ExtendableStringBuilder<2048> StatsJson;
+ Stats.ToJson(StatsJson);
+ JsonBuilder.Append(StatsJson.ToView());
+ }
+ }
+
+ JsonBuilder.Append("}");
+
+ std::string_view Json = JsonBuilder.ToView();
+ for (auto& Conn : Connections)
+ {
+ if (Conn->IsOpen())
+ {
+ Conn->SendText(Json);
+ }
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Thread-based push (fallback when no io_context)
+//
+
+void
+HttpStatsService::PushThreadFunction()
+{
+ SetCurrentThreadName("stats_ws_push");
+
+ while (m_PushEnabled.load())
+ {
+ m_PushEvent.Wait(5000);
+ m_PushEvent.Reset();
+
+ if (!m_PushEnabled.load())
+ {
+ break;
+ }
+
+ BroadcastStats();
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Timer-based push (when io_context is provided)
+//
+
+void
+HttpStatsService::EnqueuePushTimer()
+{
+ if (!m_PushTimer)
+ {
+ return;
+ }
+
+ m_PushTimer->expires_after(std::chrono::seconds(5));
+ m_PushTimer->async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+
+ BroadcastStats();
+ EnqueuePushTimer();
+ });
+}
+
} // namespace zen