// Copyright Epic Games, Inc. All Rights Reserved. #include "zenhttp/httpstats.h" #include #include #include #include namespace zen { HttpStatsService::HttpStatsService(bool EnableWebSockets) : m_Log(logging::Get("stats")) { if (EnableWebSockets) { m_PushEnabled.store(true); m_PushThread = std::thread([this] { PushThreadFunction(); }); } } HttpStatsService::HttpStatsService(asio::io_context& IoContext, bool EnableWebSockets) : m_Log(logging::Get("stats")) { if (EnableWebSockets) { m_PushEnabled.store(true); m_PushTimer = std::make_unique(IoContext); EnqueuePushTimer(); } } HttpStatsService::~HttpStatsService() { Shutdown(); } void HttpStatsService::Shutdown() { if (!m_PushEnabled.exchange(false)) { return; } if (m_PushTimer) { m_PushTimer->cancel(); m_PushTimer.reset(); } if (m_PushThread.joinable()) { m_PushEvent.Set(); m_PushThread.join(); } m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.clear(); }); } const char* HttpStatsService::BaseUri() const { return "/stats"; } void HttpStatsService::RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider) { RwLock::ExclusiveLockScope _(m_Lock); m_Providers.insert_or_assign(std::string(Id), &Provider); } void HttpStatsService::UnregisterHandler(std::string_view Id, IHttpStatsProvider& Provider) { ZEN_UNUSED(Provider); RwLock::ExclusiveLockScope _(m_Lock); m_Providers.erase(std::string(Id)); } void HttpStatsService::HandleRequest(HttpServerRequest& Request) { ZEN_TRACE_CPU("HttpStatsService::HandleRequest"); using namespace std::literals; std::string_view Key = Request.RelativeUri(); switch (Request.RequestVerb()) { case HttpVerb::kHead: case HttpVerb::kGet: { if (Key.empty()) { CbObjectWriter Cbo; Cbo.BeginArray("providers"); { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Providers) { Cbo << Kv.first; } } Cbo.EndArray(); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } else if (Key[0] == '/') { Key.remove_prefix(1); size_t SlashPos = Key.find_first_of("/?"); if (SlashPos != std::string::npos) { Key = Key.substr(0, SlashPos); } RwLock::SharedLockScope _(m_Lock); if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers)) { return It->second->HandleStatsRequest(Request); } } } [[fallthrough]]; default: return; } } ////////////////////////////////////////////////////////////////////////// // // IWebSocketHandler // void HttpStatsService::OnWebSocketOpen(Ref Connection) { ZEN_TRACE_CPU("HttpStatsService::OnWebSocketOpen"); ZEN_INFO("Stats WebSocket client connected"); m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); // Send initial state immediately if (m_PushThread.joinable()) { m_PushEvent.Set(); } } void HttpStatsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/) { // No client-to-server messages expected } void HttpStatsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) { ZEN_TRACE_CPU("HttpStatsService::OnWebSocketClose"); ZEN_INFO("Stats WebSocket client disconnected (code {})", Code); m_WsConnectionsLock.WithExclusiveLock([&] { auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref& C) { return C.Get() == &Conn; }); m_WsConnections.erase(It, m_WsConnections.end()); }); } ////////////////////////////////////////////////////////////////////////// // // Stats broadcast // void HttpStatsService::BroadcastStats() { ZEN_TRACE_CPU("HttpStatsService::BroadcastStats"); std::vector> Connections; m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; }); if (Connections.empty()) { return; } // Collect stats from all providers ExtendableStringBuilder<4096> JsonBuilder; JsonBuilder.Append("{"); bool First = true; { RwLock::SharedLockScope _(m_Lock); for (auto& [Id, Provider] : m_Providers) { CbObject Stats = Provider->CollectStats(); if (!Stats) { continue; } if (!First) { JsonBuilder.Append(","); } First = false; // Emit as "provider_id": { ... } JsonBuilder.Append("\""); JsonBuilder.Append(Id); JsonBuilder.Append("\":"); ExtendableStringBuilder<2048> StatsJson; Stats.ToJson(StatsJson); JsonBuilder.Append(StatsJson.ToView()); } } JsonBuilder.Append("}"); std::string_view Json = JsonBuilder.ToView(); for (auto& Conn : Connections) { if (Conn->IsOpen()) { Conn->SendText(Json); } } } ////////////////////////////////////////////////////////////////////////// // // Thread-based push (fallback when no io_context) // void HttpStatsService::PushThreadFunction() { SetCurrentThreadName("stats_ws_push"); while (m_PushEnabled.load()) { m_PushEvent.Wait(1000); m_PushEvent.Reset(); if (!m_PushEnabled.load()) { break; } BroadcastStats(); } } ////////////////////////////////////////////////////////////////////////// // // Timer-based push (when io_context is provided) // void HttpStatsService::EnqueuePushTimer() { if (!m_PushTimer) { return; } m_PushTimer->expires_after(std::chrono::seconds(1)); m_PushTimer->async_wait([this](const asio::error_code& Ec) { if (Ec) { return; } BroadcastStats(); EnqueuePushTimer(); }); } } // namespace zen