// Copyright Epic Games, Inc. All Rights Reserved. #include "zenhttp/httpstats.h" #include #include #include #include namespace zen { HttpStatsService::HttpStatsService(bool EnableWebSockets) : m_Log(logging::Get("stats")) { if (EnableWebSockets) { m_PushEnabled.store(true); m_PushThread = std::thread([this] { PushThreadFunction(); }); } Initialize(); } HttpStatsService::HttpStatsService(asio::io_context& IoContext, bool EnableWebSockets) : m_Log(logging::Get("stats")) { if (EnableWebSockets) { m_PushEnabled.store(true); m_PushTimer = std::make_unique(IoContext); EnqueuePushTimer(); } Initialize(); } void HttpStatsService::Initialize() { m_Router.AddMatcher("handler_id", [](std::string_view Str) -> bool { if (Str.empty()) { return false; } for (const auto C : Str) { if (std::isalnum(C) || C == '$') { // fine } else { // not fine return false; } } return true; }); m_Router.RegisterRoute( "activity_counters", [this](HttpRouterRequest& Request) { CbObjectWriter Obj; std::uint64_t SumActivity = 0; std::vector> Activities; { RwLock::SharedLockScope _(m_Lock); Activities.reserve(m_Providers.size()); for (const auto& It : m_Providers) { const std::string& HandlerName = It.first; IHttpStatsProvider* Provider = It.second; ZEN_ASSERT(Provider != nullptr); uint64_t ProviderActivityCounter = Provider->GetActivityCounter(); if (ProviderActivityCounter != 0) { Activities.push_back(std::make_pair(HandlerName, ProviderActivityCounter)); } SumActivity += ProviderActivityCounter; } } Obj.BeginArray("providers"); for (const std::pair& Activity : Activities) { const std::string& HandlerName = Activity.first; uint64_t ProviderActivityCounter = Activity.second; Obj.BeginObject(); { Obj.AddString("provider", HandlerName); Obj.AddInteger("activity_counter", ProviderActivityCounter); } Obj.EndObject(); } Obj.EndArray(); Obj.AddInteger("sum", SumActivity); Request.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "{handler_id}", [this](HttpRouterRequest& Request) { std::string_view Handler = Request.GetCapture(1); RwLock::SharedLockScope _(m_Lock); if (auto It = m_Providers.find(std::string{Handler}); It != end(m_Providers)) { return It->second->HandleStatsRequest(Request.ServerRequest()); } Request.ServerRequest().WriteResponse(HttpResponseCode::NotFound); }, HttpVerb::kHead | HttpVerb::kGet); m_Router.RegisterRoute( "", [this](HttpRouterRequest& Request) { CbObjectWriter Cbo; Cbo.BeginArray("providers"); { RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Providers) { Cbo << Kv.first; } } Cbo.EndArray(); Request.ServerRequest().WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kHead | HttpVerb::kGet); } HttpStatsService::~HttpStatsService() { Shutdown(); } void HttpStatsService::Shutdown() { if (!m_PushEnabled.exchange(false)) { return; } if (m_PushTimer) { m_PushTimer->cancel(); m_PushTimer.reset(); } if (m_PushThread.joinable()) { m_PushEvent.Set(); m_PushThread.join(); } m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.clear(); }); } const char* HttpStatsService::BaseUri() const { return "/stats"; } void HttpStatsService::RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider) { RwLock::ExclusiveLockScope _(m_Lock); m_Providers.insert_or_assign(std::string(Id), &Provider); } void HttpStatsService::UnregisterHandler(std::string_view Id, IHttpStatsProvider& Provider) { ZEN_UNUSED(Provider); RwLock::ExclusiveLockScope _(m_Lock); m_Providers.erase(std::string(Id)); } void HttpStatsService::HandleRequest(HttpServerRequest& Request) { ZEN_TRACE_CPU("HttpStatsService::HandleRequest"); m_Router.HandleRequest(Request); } ////////////////////////////////////////////////////////////////////////// // // IWebSocketHandler // void HttpStatsService::OnWebSocketOpen(Ref Connection) { ZEN_TRACE_CPU("HttpStatsService::OnWebSocketOpen"); ZEN_INFO("Stats WebSocket client connected"); m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); // Send initial state immediately if (m_PushThread.joinable()) { m_PushEvent.Set(); } } void HttpStatsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/) { // No client-to-server messages expected } void HttpStatsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) { ZEN_TRACE_CPU("HttpStatsService::OnWebSocketClose"); ZEN_INFO("Stats WebSocket client disconnected (code {})", Code); m_WsConnectionsLock.WithExclusiveLock([&] { auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref& C) { return C.Get() == &Conn; }); m_WsConnections.erase(It, m_WsConnections.end()); }); } ////////////////////////////////////////////////////////////////////////// // // Stats broadcast // void HttpStatsService::BroadcastStats() { ZEN_TRACE_CPU("HttpStatsService::BroadcastStats"); std::vector> Connections; m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; }); if (Connections.empty()) { return; } // Collect stats from all providers into a single CBO object, then convert to JSON CbObjectWriter Writer; { RwLock::SharedLockScope _(m_Lock); for (auto& [Id, Provider] : m_Providers) { CbObject Stats = Provider->CollectStats(); if (!Stats) { continue; } Writer.AddObject(Id, Stats); } } CbObject Payload = Writer.Save(); ExtendableStringBuilder<4096> JsonBuilder; Payload.ToJson(JsonBuilder); std::string_view Json = JsonBuilder.ToView(); for (auto& Conn : Connections) { if (Conn->IsOpen()) { Conn->SendText(Json); } } } ////////////////////////////////////////////////////////////////////////// // // Thread-based push (fallback when no io_context) // void HttpStatsService::PushThreadFunction() { SetCurrentThreadName("stats_ws_push"); while (m_PushEnabled.load()) { m_PushEvent.Wait(1000); m_PushEvent.Reset(); if (!m_PushEnabled.load()) { break; } BroadcastStats(); } } ////////////////////////////////////////////////////////////////////////// // // Timer-based push (when io_context is provided) // void HttpStatsService::EnqueuePushTimer() { if (!m_PushTimer) { return; } m_PushTimer->expires_after(std::chrono::seconds(1)); m_PushTimer->async_wait([this](const asio::error_code& Ec) { if (Ec) { return; } BroadcastStats(); EnqueuePushTimer(); }); } } // namespace zen