From defea76033f75804a45bbb650e19cb16c677295b Mon Sep 17 00:00:00 2001 From: Fuwn Date: Wed, 28 Jan 2026 03:12:23 -0800 Subject: feat: Add SSE streaming for instant page load and real-time updates New refresh_mode 'stream' eliminates blocking database queries from initial page load. Page renders instantly with skeleton UI, then hydrates via SSE. - Add SSE hub for managing client connections and broadcasting - Add /api/stream endpoint with init and update events - Add stream.html skeleton template with loading animations - Wire scheduler to broadcast on check completion - Backwards compatible: page/api modes unchanged --- internal/server/server.go | 335 +++++++++++++++++++- internal/server/sse.go | 222 ++++++++++++++ internal/server/templates/stream.html | 556 ++++++++++++++++++++++++++++++++++ 3 files changed, 1108 insertions(+), 5 deletions(-) create mode 100644 internal/server/sse.go create mode 100644 internal/server/templates/stream.html (limited to 'internal/server') diff --git a/internal/server/server.go b/internal/server/server.go index 9296647..e0b0b15 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -50,6 +50,7 @@ type Server struct { templates *template.Template reloadConfig ReloadFunc version VersionInfo + sseHub *SSEHub // SSE hub for real-time streaming } // New creates a new HTTP server @@ -66,6 +67,7 @@ func New(cfg *config.Config, store *storage.Storage, sched *monitor.Scheduler, l scheduler: sched, logger: logger, templates: tmpl, + sseHub: NewSSEHub(logger), } // Setup routes @@ -106,6 +108,13 @@ func New(cfg *config.Config, store *storage.Storage, sched *monitor.Scheduler, l // Config reload endpoint - always requires authentication mux.HandleFunc("POST /api/reload", s.withStrictAuth(s.handleAPIReload)) + // SSE stream endpoint - public for stream mode, otherwise follows api.access + if cfg.Display.RefreshMode == "stream" { + mux.HandleFunc("GET /api/stream", s.handleAPIStream) + } else { + mux.HandleFunc("GET /api/stream", s.withAPIAuth(s.handleAPIStream)) + } + // Create HTTP server s.server = &http.Server{ Addr: fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port), @@ -295,12 +304,15 @@ type IncidentUpdateData struct { Message string } -// handleIndex renders the main status page func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) { ctx := r.Context() cfg := s.getConfig() - // Get all monitor stats + if cfg.Display.RefreshMode == "stream" { + s.handleIndexStream(w, r) + return + } + stats, err := s.storage.GetAllMonitorStats(ctx) if err != nil { s.logger.Error("failed to get monitor stats", "error", err) @@ -502,15 +514,132 @@ func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) { return } - // Write buffered response atomically w.Header().Set("Content-Type", "text/html; charset=utf-8") if _, err := w.Write(buf.Bytes()); err != nil { - // Client likely disconnected, just log it s.logger.Debug("failed to write response", "error", err) } } -// handleAPIStatus returns JSON status for all monitors +func (s *Server) handleIndexStream(w http.ResponseWriter, r *http.Request) { + cfg := s.getConfig() + + var themeCSS template.CSS + if cfg.Site.ThemeURL != "" { + resolvedTheme, err := theme.LoadTheme(cfg.Site.ThemeURL) + if err != nil { + s.logger.Warn("failed to load theme", "url", cfg.Site.ThemeURL, "error", err) + } else if resolvedTheme != nil { + cssString := resolvedTheme.GenerateCSS() + resolvedTheme.GenerateVariableOverrides() + themeCSS = template.CSS(cssString) + } + } + + data := StreamPageData{ + Site: cfg.Site, + TickMode: cfg.Display.TickMode, + TickCount: cfg.Display.TickCount, + Timezone: cfg.Display.Timezone, + UseBrowserTimezone: cfg.Display.Timezone == "Browser", + ThemeCSS: themeCSS, + CustomHead: template.HTML(cfg.Site.CustomHead), + Scale: cfg.Display.Scale, + VersionTooltip: s.formatVersionTooltip(), + Groups: make([]StreamGroupData, 0, len(cfg.Groups)), + } + + for _, group := range cfg.Groups { + gd := StreamGroupData{ + Name: group.Name, + DefaultCollapsed: group.DefaultCollapsed != nil && *group.DefaultCollapsed, + ShowGroupUptime: group.ShowGroupUptime == nil || *group.ShowGroupUptime, + Monitors: make([]StreamMonitorData, 0, len(group.Monitors)), + } + + for _, monCfg := range group.Monitors { + gd.Monitors = append(gd.Monitors, StreamMonitorData{ + ID: monCfg.ID(), + Name: monCfg.Name, + Type: monCfg.Type, + Link: template.URL(monCfg.Link), + HidePing: monCfg.HidePing, + DisablePingTooltips: monCfg.DisablePingTooltips, + }) + } + + data.Groups = append(data.Groups, gd) + } + + for _, inc := range cfg.Incidents { + isActive := inc.Status != "resolved" + id := StreamIncidentData{ + Title: inc.Title, + Status: inc.Status, + StatusClass: incidentStatusToClass(inc.Status), + Message: inc.Message, + ScheduledStart: inc.ScheduledStart, + ScheduledEnd: inc.ScheduledEnd, + CreatedAt: inc.CreatedAt, + IsScheduled: inc.Status == "scheduled", + IsActive: isActive, + } + data.Incidents = append(data.Incidents, id) + } + + var buf bytes.Buffer + if err := s.templates.ExecuteTemplate(&buf, "stream.html", data); err != nil { + s.logger.Error("failed to render stream template", "error", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + if _, err := w.Write(buf.Bytes()); err != nil { + s.logger.Debug("failed to write response", "error", err) + } +} + +type StreamPageData struct { + Site config.SiteConfig + Groups []StreamGroupData + Incidents []StreamIncidentData + TickMode string + TickCount int + Timezone string + UseBrowserTimezone bool + ThemeCSS template.CSS + CustomHead template.HTML + Scale float64 + VersionTooltip string +} + +type StreamGroupData struct { + Name string + Monitors []StreamMonitorData + DefaultCollapsed bool + ShowGroupUptime bool +} + +type StreamMonitorData struct { + ID string + Name string + Type string + Link template.URL + HidePing bool + DisablePingTooltips bool +} + +type StreamIncidentData struct { + Title string + Status string + StatusClass string + Message string + ScheduledStart *time.Time + ScheduledEnd *time.Time + CreatedAt *time.Time + IsScheduled bool + IsActive bool +} + func (s *Server) handleAPIStatus(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -675,6 +804,202 @@ func (s *Server) handleAPIPage(w http.ResponseWriter, r *http.Request) { s.jsonResponse(w, response) } +func (s *Server) handleAPIStream(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + cfg := s.getConfig() + + flusher, ok := w.(http.Flusher) + if !ok { + s.jsonError(w, "SSE not supported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + s.logger.Debug("SSE client connected", "remote_addr", r.RemoteAddr) + + initialData := s.buildSSEPageData(ctx, cfg, "init") + s.writeSSEEvent(w, flusher, "init", initialData) + + client := &sseClient{ + ch: make(chan []byte, 64), + doneCh: make(chan struct{}), + clientIP: r.RemoteAddr, + } + s.sseHub.addClient(client) + defer s.sseHub.removeClient(client) + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + s.logger.Debug("SSE client disconnected", "remote_addr", r.RemoteAddr) + return + case data := <-client.ch: + fmt.Fprintf(w, "event: update\ndata: %s\n\n", data) + flusher.Flush() + case <-ticker.C: + fmt.Fprintf(w, ": keepalive\n\n") + flusher.Flush() + } + } +} + +func (s *Server) writeSSEEvent(w http.ResponseWriter, flusher http.Flusher, eventType string, data any) { + jsonData, err := json.Marshal(data) + if err != nil { + s.logger.Error("failed to marshal SSE data", "error", err) + return + } + fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, jsonData) + flusher.Flush() +} + +func (s *Server) buildSSEPageData(ctx context.Context, cfg *config.Config, dataType string) *SSEPageData { + stats, err := s.storage.GetAllMonitorStats(ctx) + if err != nil { + s.logger.Error("failed to get monitor stats for SSE", "error", err) + stats = make(map[string]*storage.MonitorStats) + } + + data := &SSEPageData{ + Type: dataType, + Monitors: make(map[string]APIMonitorData), + LastUpdated: time.Now(), + } + + overallUp := true + hasDegraded := false + + if dataType == "init" { + data.Site = &SSESiteData{ + Name: cfg.Site.Name, + Description: cfg.Site.Description, + } + data.Groups = make([]SSEGroupData, 0, len(cfg.Groups)) + data.Incidents = make([]SSEIncidentData, 0, len(cfg.Incidents)) + + for _, inc := range cfg.Incidents { + isActive := inc.Status != "resolved" + data.Incidents = append(data.Incidents, SSEIncidentData{ + Title: inc.Title, + Status: inc.Status, + Message: inc.Message, + IsActive: isActive, + IsScheduled: inc.Status == "scheduled", + ScheduledStart: inc.ScheduledStart, + ScheduledEnd: inc.ScheduledEnd, + CreatedAt: inc.CreatedAt, + }) + } + } + + for _, group := range cfg.Groups { + var groupData SSEGroupData + if dataType == "init" { + groupData = SSEGroupData{ + Name: group.Name, + MonitorIDs: make([]string, 0, len(group.Monitors)), + DefaultCollapsed: group.DefaultCollapsed != nil && *group.DefaultCollapsed, + ShowGroupUptime: group.ShowGroupUptime == nil || *group.ShowGroupUptime, + } + } + + var totalUptime float64 + var monitorsWithUptime int + + for _, monCfg := range group.Monitors { + monitorID := monCfg.ID() + + if dataType == "init" { + groupData.MonitorIDs = append(groupData.MonitorIDs, monitorID) + } + + stat, ok := stats[monitorID] + if !ok { + data.Monitors[monitorID] = APIMonitorData{ + Status: "unknown", + Ticks: nil, + } + continue + } + + ticks, err := s.storage.GetAggregatedHistory( + ctx, + monitorID, + cfg.Display.TickCount, + cfg.Display.TickMode, + cfg.Display.PingFixedSlots, + ) + if err != nil { + s.logger.Error("failed to get tick history", "monitor", monitorID, "error", err) + ticks = nil + } + + data.Monitors[monitorID] = APIMonitorData{ + Status: stat.CurrentStatus, + ResponseTime: stat.LastResponseTime, + Uptime: stat.UptimePercent, + LastError: stat.LastError, + SSLDaysLeft: stat.SSLDaysLeft, + Ticks: ticks, + } + + data.Counts.Total++ + switch stat.CurrentStatus { + case "down": + overallUp = false + data.Counts.Down++ + case "degraded": + hasDegraded = true + data.Counts.Degraded++ + case "up": + data.Counts.Up++ + } + + if stat.UptimePercent >= 0 { + totalUptime += stat.UptimePercent + monitorsWithUptime++ + } + } + + if dataType == "init" { + if monitorsWithUptime > 0 { + groupData.GroupUptime = totalUptime / float64(monitorsWithUptime) + } + data.Groups = append(data.Groups, groupData) + } + } + + if !overallUp { + data.OverallStatus = "Major Outage" + } else if hasDegraded { + data.OverallStatus = "Partial Outage" + } else { + data.OverallStatus = "All Systems Operational" + } + + return data +} + +func (s *Server) BroadcastStatusUpdate(ctx context.Context) { + if s.sseHub.ClientCount() == 0 { + return + } + cfg := s.getConfig() + data := s.buildSSEPageData(ctx, cfg, "update") + s.sseHub.Broadcast(data) +} + +func (s *Server) GetSSEHub() *SSEHub { + return s.sseHub +} + // handleAPIHealth returns a simple health check response (always public) func (s *Server) handleAPIHealth(w http.ResponseWriter, r *http.Request) { s.jsonResponse(w, map[string]string{"status": "ok"}) diff --git a/internal/server/sse.go b/internal/server/sse.go new file mode 100644 index 0000000..f95cd70 --- /dev/null +++ b/internal/server/sse.go @@ -0,0 +1,222 @@ +package server + +import ( + "encoding/json" + "fmt" + "log/slog" + "net/http" + "sync" + "time" +) + +// SSEHub manages Server-Sent Events connections and broadcasts +type SSEHub struct { + mu sync.RWMutex + clients map[*sseClient]struct{} + logger *slog.Logger + closed bool + closedCh chan struct{} +} + +type sseClient struct { + ch chan []byte + doneCh chan struct{} + clientIP string +} + +// NewSSEHub creates a new SSE hub for managing client connections +func NewSSEHub(logger *slog.Logger) *SSEHub { + return &SSEHub{ + clients: make(map[*sseClient]struct{}), + logger: logger, + closedCh: make(chan struct{}), + } +} + +// ServeHTTP handles SSE connections +func (h *SSEHub) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Check if the response writer supports flushing + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "SSE not supported", http.StatusInternalServerError) + return + } + + // Set SSE headers + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering + + // Create client + client := &sseClient{ + ch: make(chan []byte, 64), // Buffered to prevent blocking broadcasts + doneCh: make(chan struct{}), + clientIP: r.RemoteAddr, + } + + // Register client + h.addClient(client) + defer h.removeClient(client) + + h.logger.Debug("SSE client connected", "remote_addr", r.RemoteAddr) + + // Send initial connection event + fmt.Fprintf(w, "event: connected\ndata: {\"status\":\"connected\"}\n\n") + flusher.Flush() + + // Keepalive ticker (send comment every 30s to keep connection alive) + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + // Stream events to client + for { + select { + case <-r.Context().Done(): + h.logger.Debug("SSE client disconnected", "remote_addr", r.RemoteAddr) + return + + case <-h.closedCh: + // Hub is closing + return + + case data := <-client.ch: + // Send data event + fmt.Fprintf(w, "event: update\ndata: %s\n\n", data) + flusher.Flush() + + case <-ticker.C: + // Send keepalive comment (not an event, just keeps connection alive) + fmt.Fprintf(w, ": keepalive\n\n") + flusher.Flush() + } + } +} + +// Broadcast sends data to all connected clients +func (h *SSEHub) Broadcast(data any) { + h.mu.RLock() + if h.closed { + h.mu.RUnlock() + return + } + + clientCount := len(h.clients) + if clientCount == 0 { + h.mu.RUnlock() + return + } + + // Marshal data to JSON + jsonData, err := json.Marshal(data) + if err != nil { + h.mu.RUnlock() + h.logger.Error("failed to marshal SSE broadcast data", "error", err) + return + } + + // Send to all clients (non-blocking) + for client := range h.clients { + select { + case client.ch <- jsonData: + // Sent successfully + default: + // Client buffer full, skip this update + h.logger.Debug("SSE client buffer full, skipping update", "remote_addr", client.clientIP) + } + } + h.mu.RUnlock() + + h.logger.Debug("SSE broadcast sent", "clients", clientCount, "bytes", len(jsonData)) +} + +// ClientCount returns the number of connected clients +func (h *SSEHub) ClientCount() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.clients) +} + +// Close shuts down the hub and disconnects all clients +func (h *SSEHub) Close() { + h.mu.Lock() + defer h.mu.Unlock() + + if h.closed { + return + } + + h.closed = true + close(h.closedCh) + + // Close all client channels + for client := range h.clients { + close(client.doneCh) + } + h.clients = nil + + h.logger.Info("SSE hub closed") +} + +func (h *SSEHub) addClient(client *sseClient) { + h.mu.Lock() + defer h.mu.Unlock() + + if h.closed { + return + } + + h.clients[client] = struct{}{} + h.logger.Debug("SSE client registered", "total_clients", len(h.clients)) +} + +func (h *SSEHub) removeClient(client *sseClient) { + h.mu.Lock() + defer h.mu.Unlock() + + if _, ok := h.clients[client]; ok { + delete(h.clients, client) + close(client.ch) + h.logger.Debug("SSE client unregistered", "total_clients", len(h.clients)) + } +} + +// SSEPageData is the data structure sent via SSE for page updates +type SSEPageData struct { + Type string `json:"type"` // "init" for initial load, "update" for changes + Monitors map[string]APIMonitorData `json:"monitors"` + OverallStatus string `json:"overall_status"` + Counts StatusCounts `json:"counts"` + LastUpdated time.Time `json:"last_updated"` + // Additional fields for initial load + Groups []SSEGroupData `json:"groups,omitempty"` + Incidents []SSEIncidentData `json:"incidents,omitempty"` + Site *SSESiteData `json:"site,omitempty"` +} + +// SSEGroupData contains group info for initial SSE load +type SSEGroupData struct { + Name string `json:"name"` + MonitorIDs []string `json:"monitor_ids"` + DefaultCollapsed bool `json:"default_collapsed"` + ShowGroupUptime bool `json:"show_group_uptime"` + GroupUptime float64 `json:"group_uptime"` +} + +// SSEIncidentData contains incident info for SSE +type SSEIncidentData struct { + Title string `json:"title"` + Status string `json:"status"` + Message string `json:"message"` + IsActive bool `json:"is_active"` + IsScheduled bool `json:"is_scheduled"` + ScheduledStart *time.Time `json:"scheduled_start,omitempty"` + ScheduledEnd *time.Time `json:"scheduled_end,omitempty"` + CreatedAt *time.Time `json:"created_at,omitempty"` +} + +// SSESiteData contains site info for initial SSE load +type SSESiteData struct { + Name string `json:"name"` + Description string `json:"description"` +} diff --git a/internal/server/templates/stream.html b/internal/server/templates/stream.html new file mode 100644 index 0000000..2e40d3b --- /dev/null +++ b/internal/server/templates/stream.html @@ -0,0 +1,556 @@ + + + + + {{if .ThemeCSS}} + + {{end}} + + + {{.Site.Name}} [Loading...] + + {{if .Site.Favicon}} + + {{else}} + + {{end}} + + {{if .CustomHead}}{{.CustomHead}}{{end}} + + + +
+
+
+
+ {{if .Site.Logo}} + Logo + {{end}} +
+

{{.Site.Name}}

+

{{.Site.Description}}

+
+
+
+
+ +
+
+
+
+
+
+ Connecting... +
+ -- +
+
+ +
+ {{range $groupIndex, $group := .Groups}} +
+
+
+
+ + + +

{{$group.Name}}

+
+ {{if $group.ShowGroupUptime}} + -- + {{end}} +
+
+
+ {{range .Monitors}} +
+
+
+
+
+ {{if .Link}}{{.Name}}{{else}}{{.Name}}{{end}} + {{.Type}} +
+
+ {{if not .HidePing}}--{{end}} + + +
+
+
+ -- +
+
+
+ {{range seq $.TickCount}} +
+ {{end}} +
+
+ {{end}} +
+
+ {{end}} +
+ + {{if .Incidents}} +
+

Incidents

+
+ {{range .Incidents}} +
+
+
+
+
+ {{if eq .Status "resolved"}} + + {{else if eq .Status "scheduled"}} + + {{else}} + + {{end}} + {{.Title}} +
+

{{.Message}}

+ {{if .IsScheduled}} +

Scheduled: {{if .ScheduledStart}}{{.ScheduledStart}}{{end}} - {{if .ScheduledEnd}}{{.ScheduledEnd}}{{end}}

+ {{end}} +
+
+ {{.Status}} +
+
+
+
+ {{end}} +
+
+ {{end}} + +
+
+ Connecting... + Powered by Kaze +
+
+
+ +
+ +
+
+
+ +
+
+ ↑↓ navigate + select + esc close +
+
+
+ + + + -- cgit v1.2.3