diff options
Diffstat (limited to 'src/zenhttp/monitoring/httpstats.cpp')
| -rw-r--r-- | src/zenhttp/monitoring/httpstats.cpp | 312 |
1 files changed, 274 insertions, 38 deletions
diff --git a/src/zenhttp/monitoring/httpstats.cpp b/src/zenhttp/monitoring/httpstats.cpp index b097a0d3f..7e6207e56 100644 --- a/src/zenhttp/monitoring/httpstats.cpp +++ b/src/zenhttp/monitoring/httpstats.cpp @@ -3,15 +3,162 @@ #include "zenhttp/httpstats.h" #include <zencore/compactbinarybuilder.h> +#include <zencore/string.h> +#include <zencore/thread.h> +#include <zencore/trace.h> namespace zen { -HttpStatsService::HttpStatsService() : m_Log(logging::Get("stats")) +HttpStatsService::HttpStatsService(bool EnableWebSockets) : m_Log(logging::Get("stats")) { + if (EnableWebSockets) + { + m_PushEnabled.store(true); + m_PushThread = std::thread([this] { PushThreadFunction(); }); + } + Initialize(); +} + +HttpStatsService::HttpStatsService(asio::io_context& IoContext, bool EnableWebSockets) : m_Log(logging::Get("stats")) +{ + if (EnableWebSockets) + { + m_PushEnabled.store(true); + m_PushTimer = std::make_unique<asio::steady_timer>(IoContext); + EnqueuePushTimer(); + } + Initialize(); +} + +void +HttpStatsService::Initialize() +{ + m_Router.AddMatcher("handler_id", [](std::string_view Str) -> bool { + if (Str.empty()) + { + return false; + } + for (const auto C : Str) + { + if (std::isalnum(C) || C == '$') + { + // fine + } + else + { + // not fine + return false; + } + } + return true; + }); + + m_Router.RegisterRoute( + "activity_counters", + [this](HttpRouterRequest& Request) { + CbObjectWriter Obj; + + std::uint64_t SumActivity = 0; + + std::vector<std::pair<std::string, uint64_t>> Activities; + { + RwLock::SharedLockScope _(m_Lock); + Activities.reserve(m_Providers.size()); + for (const auto& It : m_Providers) + { + const std::string& HandlerName = It.first; + IHttpStatsProvider* Provider = It.second; + ZEN_ASSERT(Provider != nullptr); + uint64_t ProviderActivityCounter = Provider->GetActivityCounter(); + if (ProviderActivityCounter != 0) + { + Activities.push_back(std::make_pair(HandlerName, ProviderActivityCounter)); + } + SumActivity += ProviderActivityCounter; + } + } + + Obj.BeginArray("providers"); + for (const std::pair<std::string, uint64_t>& Activity : Activities) + { + const std::string& HandlerName = Activity.first; + uint64_t ProviderActivityCounter = Activity.second; + Obj.BeginObject(); + { + Obj.AddString("provider", HandlerName); + Obj.AddInteger("activity_counter", ProviderActivityCounter); + } + Obj.EndObject(); + } + Obj.EndArray(); + + Obj.AddInteger("sum", SumActivity); + + Request.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{handler_id}", + [this](HttpRouterRequest& Request) { + std::string_view Handler = Request.GetCapture(1); + RwLock::SharedLockScope _(m_Lock); + if (auto It = m_Providers.find(std::string{Handler}); It != end(m_Providers)) + { + return It->second->HandleStatsRequest(Request.ServerRequest()); + } + Request.ServerRequest().WriteResponse(HttpResponseCode::NotFound); + }, + HttpVerb::kHead | HttpVerb::kGet); + + m_Router.RegisterRoute( + "", + [this](HttpRouterRequest& Request) { + CbObjectWriter Cbo; + + Cbo.BeginArray("providers"); + + { + RwLock::SharedLockScope _(m_Lock); + for (auto& Kv : m_Providers) + { + Cbo << Kv.first; + } + } + + Cbo.EndArray(); + + Request.ServerRequest().WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kHead | HttpVerb::kGet); } HttpStatsService::~HttpStatsService() { + Shutdown(); +} + +void +HttpStatsService::Shutdown() +{ + if (!m_PushEnabled.exchange(false)) + { + return; + } + + if (m_PushTimer) + { + m_PushTimer->cancel(); + m_PushTimer.reset(); + } + + if (m_PushThread.joinable()) + { + m_PushEvent.Set(); + m_PushThread.join(); + } + + m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.clear(); }); } const char* @@ -39,54 +186,143 @@ HttpStatsService::UnregisterHandler(std::string_view Id, IHttpStatsProvider& Pro void HttpStatsService::HandleRequest(HttpServerRequest& Request) { - using namespace std::literals; + ZEN_TRACE_CPU("HttpStatsService::HandleRequest"); + m_Router.HandleRequest(Request); +} + +////////////////////////////////////////////////////////////////////////// +// +// IWebSocketHandler +// - std::string_view Key = Request.RelativeUri(); +void +HttpStatsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection) +{ + ZEN_TRACE_CPU("HttpStatsService::OnWebSocketOpen"); + ZEN_INFO("Stats WebSocket client connected"); - switch (Request.RequestVerb()) + m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); + + // Send initial state immediately + if (m_PushThread.joinable()) { - case HttpVerb::kHead: - case HttpVerb::kGet: - { - if (Key.empty()) - { - CbObjectWriter Cbo; + m_PushEvent.Set(); + } +} - Cbo.BeginArray("providers"); +void +HttpStatsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/) +{ + // No client-to-server messages expected +} - { - RwLock::SharedLockScope _(m_Lock); - for (auto& Kv : m_Providers) - { - Cbo << Kv.first; - } - } +void +HttpStatsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) +{ + ZEN_TRACE_CPU("HttpStatsService::OnWebSocketClose"); + ZEN_INFO("Stats WebSocket client disconnected (code {})", Code); - Cbo.EndArray(); + m_WsConnectionsLock.WithExclusiveLock([&] { + auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) { + return C.Get() == &Conn; + }); + m_WsConnections.erase(It, m_WsConnections.end()); + }); +} - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - } - else if (Key[0] == '/') - { - Key.remove_prefix(1); - size_t SlashPos = Key.find_first_of("/?"); - if (SlashPos != std::string::npos) - { - Key = Key.substr(0, SlashPos); - } +////////////////////////////////////////////////////////////////////////// +// +// Stats broadcast +// - RwLock::SharedLockScope _(m_Lock); - if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers)) - { - return It->second->HandleStatsRequest(Request); - } - } +void +HttpStatsService::BroadcastStats() +{ + ZEN_TRACE_CPU("HttpStatsService::BroadcastStats"); + std::vector<Ref<WebSocketConnection>> Connections; + m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; }); + + if (Connections.empty()) + { + return; + } + + // Collect stats from all providers into a single CBO object, then convert to JSON + CbObjectWriter Writer; + { + RwLock::SharedLockScope _(m_Lock); + for (auto& [Id, Provider] : m_Providers) + { + CbObject Stats = Provider->CollectStats(); + if (!Stats) + { + continue; } + Writer.AddObject(Id, Stats); + } + } - [[fallthrough]]; - default: - return; + CbObject Payload = Writer.Save(); + ExtendableStringBuilder<4096> JsonBuilder; + Payload.ToJson(JsonBuilder); + std::string_view Json = JsonBuilder.ToView(); + + for (auto& Conn : Connections) + { + if (Conn->IsOpen()) + { + Conn->SendText(Json); + } + } +} + +////////////////////////////////////////////////////////////////////////// +// +// Thread-based push (fallback when no io_context) +// + +void +HttpStatsService::PushThreadFunction() +{ + SetCurrentThreadName("stats_ws_push"); + + while (m_PushEnabled.load()) + { + m_PushEvent.Wait(1000); + m_PushEvent.Reset(); + + if (!m_PushEnabled.load()) + { + break; + } + + BroadcastStats(); + } +} + +////////////////////////////////////////////////////////////////////////// +// +// Timer-based push (when io_context is provided) +// + +void +HttpStatsService::EnqueuePushTimer() +{ + if (!m_PushTimer) + { + return; } + + m_PushTimer->expires_after(std::chrono::seconds(1)); + m_PushTimer->async_wait([this](const asio::error_code& Ec) { + if (Ec) + { + return; + } + + BroadcastStats(); + EnqueuePushTimer(); + }); } } // namespace zen |