aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/sessions/httpsessions.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/sessions/httpsessions.cpp')
-rw-r--r--src/zenserver/sessions/httpsessions.cpp173
1 files changed, 145 insertions, 28 deletions
diff --git a/src/zenserver/sessions/httpsessions.cpp b/src/zenserver/sessions/httpsessions.cpp
index 6cf12bea4..6996ce5b5 100644
--- a/src/zenserver/sessions/httpsessions.cpp
+++ b/src/zenserver/sessions/httpsessions.cpp
@@ -3,7 +3,6 @@
#include "httpsessions.h"
#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/trace.h>
@@ -12,17 +11,22 @@
namespace zen {
using namespace std::literals;
-HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService, HttpStatsService& StatsService, SessionsService& Sessions)
+HttpSessionsService::HttpSessionsService(HttpStatusService& StatusService,
+ HttpStatsService& StatsService,
+ SessionsService& Sessions,
+ asio::io_context& IoContext)
: m_Log(logging::Get("sessions"))
, m_StatusService(StatusService)
, m_StatsService(StatsService)
, m_Sessions(Sessions)
+, m_PushTimer(IoContext)
{
Initialize();
}
HttpSessionsService::~HttpSessionsService()
{
+ m_PushTimer.cancel();
m_StatsService.UnregisterHandler("sessions", *this);
m_StatusService.UnregisterHandler("sessions", *this);
}
@@ -111,8 +115,15 @@ HttpSessionsService::Initialize()
[this](HttpRouterRequest& Req) { ListSessionsRequest(Req); },
HttpVerb::kGet);
+ m_Router.RegisterRoute(
+ "ws",
+ [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); },
+ HttpVerb::kGet);
+
m_StatsService.RegisterHandler("sessions", *this);
m_StatusService.RegisterHandler("sessions", *this);
+
+ EnqueuePushTimer();
}
static void
@@ -123,22 +134,53 @@ WriteSessionInfo(CbWriter& Writer, const SessionsService::SessionInfo& Info)
{
Writer << "appname" << Info.AppName;
}
+ if (!Info.Mode.empty())
+ {
+ Writer << "mode" << Info.Mode;
+ }
if (Info.JobId != Oid::Zero)
{
Writer << "jobid" << Info.JobId;
}
Writer << "created_at" << Info.CreatedAt;
Writer << "updated_at" << Info.UpdatedAt;
+ if (Info.EndedAt.GetTicks() != 0)
+ {
+ Writer << "ended_at" << Info.EndedAt;
+ }
if (Info.Metadata.GetSize() > 0)
{
- Writer.BeginObject("metadata");
- for (const CbField& Field : Info.Metadata)
- {
- Writer.AddField(Field);
- }
- Writer.EndObject();
+ Writer.AddObject("metadata"sv, Info.Metadata);
+ }
+}
+
+CbObject
+HttpSessionsService::BuildSessionListResponse()
+{
+ std::vector<Ref<SessionsService::Session>> Active = m_Sessions.GetSessions();
+ std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions();
+
+ CbObjectWriter Response;
+ if (m_SelfSessionId != Oid::Zero)
+ {
+ Response << "self_id" << m_SelfSessionId;
+ }
+ Response.BeginArray("sessions");
+ for (const Ref<SessionsService::Session>& Session : Active)
+ {
+ Response.BeginObject();
+ WriteSessionInfo(Response, Session->Info());
+ Response.EndObject();
+ }
+ for (const Ref<SessionsService::Session>& Session : Ended)
+ {
+ Response.BeginObject();
+ WriteSessionInfo(Response, Session->Info());
+ Response.EndObject();
}
+ Response.EndArray();
+ return Response.Save();
}
void
@@ -149,16 +191,35 @@ HttpSessionsService::ListSessionsRequest(HttpRouterRequest& Req)
m_SessionsStats.SessionListCount++;
m_SessionsStats.RequestCount++;
- std::vector<Ref<SessionsService::Session>> Sessions = m_Sessions.GetSessions();
+ HttpServerRequest::QueryParams Params = ServerRequest.GetQueryParams();
+ std::string_view Status = Params.GetValue("status"sv);
+
+ std::vector<Ref<SessionsService::Session>> Sessions;
+ if (Status == "ended"sv)
+ {
+ Sessions = m_Sessions.GetEndedSessions();
+ }
+ else if (Status == "all"sv)
+ {
+ Sessions = m_Sessions.GetSessions();
+ std::vector<Ref<SessionsService::Session>> Ended = m_Sessions.GetEndedSessions();
+ Sessions.insert(Sessions.end(), Ended.begin(), Ended.end());
+ }
+ else
+ {
+ Sessions = m_Sessions.GetSessions();
+ }
CbObjectWriter Response;
+ if (m_SelfSessionId != Oid::Zero)
+ {
+ Response << "self_id" << m_SelfSessionId;
+ }
Response.BeginArray("sessions");
for (const Ref<SessionsService::Session>& Session : Sessions)
{
Response.BeginObject();
- {
- WriteSessionInfo(Response, Session->Info());
- }
+ WriteSessionInfo(Response, Session->Info());
Response.EndObject();
}
Response.EndArray();
@@ -187,30 +248,17 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
case HttpVerb::kPost:
case HttpVerb::kPut:
{
- IoBuffer Payload = ServerRequest.ReadPayload();
- CbObject RequestObject;
-
- if (Payload.GetSize() > 0)
- {
- if (CbValidateError ValidationResult = ValidateCompactBinary(Payload.GetView(), CbValidateMode::All);
- ValidationResult != CbValidateError::None)
- {
- m_SessionsStats.BadRequestCount++;
- return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- fmt::format("Invalid payload: {}", zen::ToString(ValidationResult)));
- }
- RequestObject = LoadCompactBinaryObject(Payload);
- }
+ CbObject RequestObject = ServerRequest.ReadPayloadObject();
if (ServerRequest.RequestVerb() == HttpVerb::kPost)
{
std::string AppName(RequestObject["appname"sv].AsString());
+ std::string Mode(RequestObject["mode"sv].AsString());
Oid JobId = RequestObject["jobid"sv].AsObjectId();
CbObjectView MetadataView = RequestObject["metadata"sv].AsObjectView();
m_SessionsStats.SessionWriteCount++;
- if (m_Sessions.RegisterSession(SessionId, std::move(AppName), JobId, MetadataView))
+ if (m_Sessions.RegisterSession(SessionId, std::move(AppName), std::move(Mode), JobId, MetadataView))
{
return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", SessionId));
}
@@ -265,4 +313,73 @@ HttpSessionsService::SessionRequest(HttpRouterRequest& Req)
}
}
+//////////////////////////////////////////////////////////////////////////
+//
+// WebSocket push
+//
+
+void
+HttpSessionsService::OnWebSocketOpen(Ref<WebSocketConnection> Connection)
+{
+ ZEN_INFO("Sessions WebSocket client connected");
+ m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
+}
+
+void
+HttpSessionsService::OnWebSocketMessage(WebSocketConnection& /*Conn*/, const WebSocketMessage& /*Msg*/)
+{
+ // No client-to-server messages expected
+}
+
+void
+HttpSessionsService::OnWebSocketClose(WebSocketConnection& Conn, [[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason)
+{
+ ZEN_INFO("Sessions WebSocket client disconnected (code {})", Code);
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) {
+ return C.Get() == &Conn;
+ });
+ m_WsConnections.erase(It, m_WsConnections.end());
+ });
+}
+
+void
+HttpSessionsService::BroadcastSessions()
+{
+ std::vector<Ref<WebSocketConnection>> Connections;
+ m_WsConnectionsLock.WithSharedLock([&] { Connections = m_WsConnections; });
+
+ if (Connections.empty())
+ {
+ return;
+ }
+
+ ExtendableStringBuilder<4096> JsonBuilder;
+ BuildSessionListResponse().ToJson(JsonBuilder);
+ std::string_view Json = JsonBuilder.ToView();
+
+ for (const Ref<WebSocketConnection>& Conn : Connections)
+ {
+ if (Conn->IsOpen())
+ {
+ Conn->SendText(Json);
+ }
+ }
+}
+
+void
+HttpSessionsService::EnqueuePushTimer()
+{
+ m_PushTimer.expires_after(std::chrono::seconds(2));
+ m_PushTimer.async_wait([this](const asio::error_code& Ec) {
+ if (Ec)
+ {
+ return;
+ }
+
+ BroadcastSessions();
+ EnqueuePushTimer();
+ });
+}
+
} // namespace zen