aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/monitoring/httpstats.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhttp/monitoring/httpstats.cpp')
-rw-r--r--src/zenhttp/monitoring/httpstats.cpp312
1 files changed, 274 insertions, 38 deletions
diff --git a/src/zenhttp/monitoring/httpstats.cpp b/src/zenhttp/monitoring/httpstats.cpp
index b097a0d3f..7e6207e56 100644
--- a/src/zenhttp/monitoring/httpstats.cpp
+++ b/src/zenhttp/monitoring/httpstats.cpp
@@ -3,15 +3,162 @@
#include "zenhttp/httpstats.h"
#include <zencore/compactbinarybuilder.h>
+#include <zencore/string.h>
+#include <zencore/thread.h>
+#include <zencore/trace.h>
namespace zen {
-HttpStatsService::HttpStatsService() : m_Log(logging::Get("stats"))
+HttpStatsService::HttpStatsService(bool EnableWebSockets) : m_Log(logging::Get("stats"))
{
+ if (EnableWebSockets)
+ {
+ m_PushEnabled.store(true);
+ m_PushThread = std::thread([this] { PushThreadFunction(); });
+ }
+ Initialize();
+}
+
+HttpStatsService::HttpStatsService(asio::io_context& IoContext, bool EnableWebSockets) : m_Log(logging::Get("stats"))
+{
+ if (EnableWebSockets)
+ {
+ m_PushEnabled.store(true);
+ m_PushTimer = std::make_unique<asio::steady_timer>(IoContext);
+ EnqueuePushTimer();
+ }
+ Initialize();
+}
+
+void
+HttpStatsService::Initialize()
+{
+ m_Router.AddMatcher("handler_id", [](std::string_view Str) -> bool {
+ if (Str.empty())
+ {
+ return false;
+ }
+ for (const auto C : Str)
+ {
+ if (std::isalnum(C) || C == '$')
+ {
+ // fine
+ }
+ else
+ {
+ // not fine
+ return false;
+ }
+ }
+ return true;
+ });
+
+ m_Router.RegisterRoute(
+ "activity_counters",
+ [this](HttpRouterRequest& Request) {
+ CbObjectWriter Obj;
+
+ std::uint64_t SumActivity = 0;
+
+ std::vector<std::pair<std::string, uint64_t>> Activities;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ Activities.reserve(m_Providers.size());
+ for (const auto& It : m_Providers)
+ {
+ const std::string& HandlerName = It.first;
+ IHttpStatsProvider* Provider = It.second;
+ ZEN_ASSERT(Provider != nullptr);
+ uint64_t ProviderActivityCounter = Provider->GetActivityCounter();
+ if (ProviderActivityCounter != 0)
+ {
+ Activities.push_back(std::make_pair(HandlerName, ProviderActivityCounter));
+ }
+ SumActivity += ProviderActivityCounter;
+ }
+ }
+
+ Obj.BeginArray("providers");
+ for (const std::pair<std::string, uint64_t>& Activity : Activities)
+ {
+ const std::string& HandlerName = Activity.first;
+ uint64_t ProviderActivityCounter = Activity.second;
+ Obj.BeginObject();
+ {
+ Obj.AddString("provider", HandlerName);
+ Obj.AddInteger("activity_counter", ProviderActivityCounter);
+ }
+ Obj.EndObject();
+ }
+ Obj.EndArray();
+
+ Obj.AddInteger("sum", SumActivity);
+
+ Request.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{handler_id}",
+ [this](HttpRouterRequest& Request) {
+ std::string_view Handler = Request.GetCapture(1);
+ RwLock::SharedLockScope _(m_Lock);
+ if (auto It = m_Providers.find(std::string{Handler}); It != end(m_Providers))
+ {
+ return It->second->HandleStatsRequest(Request.ServerRequest());
+ }
+ Request.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
+ },
+ HttpVerb::kHead | HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "",
+ [this](HttpRouterRequest& Request) {
+ CbObjectWriter Cbo;
+
+ Cbo.BeginArray("providers");
+
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ for (auto& Kv : m_Providers)
+ {
+ Cbo << Kv.first;
+ }
+ }
+
+ Cbo.EndArray();
+
+ Request.ServerRequest().WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kHead | HttpVerb::kGet);
}
HttpStatsService::~HttpStatsService()
{
+ Shutdown();
+}
+
+void
+HttpStatsService::Shutdown()
+{
+ if (!m_PushEnabled.exchange(false))
+ {
+ return;
+ }
+
+ if (m_PushTimer)
+ {
+ m_PushTimer->cancel();
+ m_PushTimer.reset();
+ }
+
+ if (m_PushThread.joinable())
+ {
+ m_PushEvent.Set();
+ m_PushThread.join();
+ }
+
+ m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.clear(); });
}
const char*
@@ -39,54 +186,143 @@ HttpStatsService::UnregisterHandler(std::string_view Id, IHttpStatsProvider& Pro
void
HttpStatsService::HandleRequest(HttpServerRequest& Request)
{
- using namespace std::literals;
+ ZEN_TRACE_CPU("HttpStatsService::HandleRequest");
+ m_Router.HandleRequest(Request);
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// IWebSocketHandler
+//
- std::string_view Key = Request.RelativeUri();
+void
+HttpStatsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection)
+{
+ ZEN_TRACE_CPU("HttpStatsService::OnWebSocketOpen");
+ ZEN_INFO("Stats WebSocket client connected");
- switch (Request.RequestVerb())
+ m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
+
+ // Send initial state immediately
+ if (m_PushThread.joinable())
{
- case HttpVerb::kHead:
- case HttpVerb::kGet:
- {
- if (Key.empty())
- {
- CbObjectWriter Cbo;
+ m_PushEvent.Set();
+ }
+}
- Cbo.BeginArray("providers");
+void
+HttpStatsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/)
+{
+ // No client-to-server messages expected
+}
- {
- RwLock::SharedLockScope _(m_Lock);
- for (auto& Kv : m_Providers)
- {
- Cbo << Kv.first;
- }
- }
+void
+HttpStatsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason)
+{
+ ZEN_TRACE_CPU("HttpStatsService::OnWebSocketClose");
+ ZEN_INFO("Stats WebSocket client disconnected (code {})", Code);
- Cbo.EndArray();
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) {
+ return C.Get() == &Conn;
+ });
+ m_WsConnections.erase(It, m_WsConnections.end());
+ });
+}
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- }
- else if (Key[0] == '/')
- {
- Key.remove_prefix(1);
- size_t SlashPos = Key.find_first_of("/?");
- if (SlashPos != std::string::npos)
- {
- Key = Key.substr(0, SlashPos);
- }
+//////////////////////////////////////////////////////////////////////////
+//
+// Stats broadcast
+//
- RwLock::SharedLockScope _(m_Lock);
- if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers))
- {
- return It->second->HandleStatsRequest(Request);
- }
- }
+void
+HttpStatsService::BroadcastStats()
+{
+ ZEN_TRACE_CPU("HttpStatsService::BroadcastStats");
+ std::vector<Ref<WebSocketConnection>> Connections;
+ m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; });
+
+ if (Connections.empty())
+ {
+ return;
+ }
+
+ // Collect stats from all providers into a single CBO object, then convert to JSON
+ CbObjectWriter Writer;
+ {
+ RwLock::SharedLockScope _(m_Lock);
+ for (auto& [Id, Provider] : m_Providers)
+ {
+ CbObject Stats = Provider->CollectStats();
+ if (!Stats)
+ {
+ continue;
}
+ Writer.AddObject(Id, Stats);
+ }
+ }
- [[fallthrough]];
- default:
- return;
+ CbObject Payload = Writer.Save();
+ ExtendableStringBuilder<4096> JsonBuilder;
+ Payload.ToJson(JsonBuilder);
+ std::string_view Json = JsonBuilder.ToView();
+
+ for (auto& Conn : Connections)
+ {
+ if (Conn->IsOpen())
+ {
+ Conn->SendText(Json);
+ }
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Thread-based push (fallback when no io_context)
+//
+
+void
+HttpStatsService::PushThreadFunction()
+{
+ SetCurrentThreadName("stats_ws_push");
+
+ while (m_PushEnabled.load())
+ {
+ m_PushEvent.Wait(1000);
+ m_PushEvent.Reset();
+
+ if (!m_PushEnabled.load())
+ {
+ break;
+ }
+
+ BroadcastStats();
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Timer-based push (when io_context is provided)
+//
+
+void
+HttpStatsService::EnqueuePushTimer()
+{
+ if (!m_PushTimer)
+ {
+ return;
}
+
+ m_PushTimer->expires_after(std::chrono::seconds(1));
+ m_PushTimer->async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+
+ BroadcastStats();
+ EnqueuePushTimer();
+ });
}
} // namespace zen