diff options
| author | Fuwn <[email protected]> | 2026-01-28 03:12:23 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-01-28 03:12:23 -0800 |
| commit | defea76033f75804a45bbb650e19cb16c677295b (patch) | |
| tree | cfc507fb0d782f02925c001429df7d3c1d079420 /internal/server/server.go | |
| parent | fix: Handle libsql string-based time values (diff) | |
| download | kaze-defea76033f75804a45bbb650e19cb16c677295b.tar.xz kaze-defea76033f75804a45bbb650e19cb16c677295b.zip | |
feat: Add SSE streaming for instant page load and real-time updates
New refresh_mode 'stream' eliminates blocking database queries from initial
page load. Page renders instantly with skeleton UI, then hydrates via SSE.
- Add SSE hub for managing client connections and broadcasting
- Add /api/stream endpoint with init and update events
- Add stream.html skeleton template with loading animations
- Wire scheduler to broadcast on check completion
- Backwards compatible: page/api modes unchanged
Diffstat (limited to 'internal/server/server.go')
| -rw-r--r-- | internal/server/server.go | 335 |
1 files changed, 330 insertions, 5 deletions
diff --git a/internal/server/server.go b/internal/server/server.go index 9296647..e0b0b15 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -50,6 +50,7 @@ type Server struct { templates *template.Template reloadConfig ReloadFunc version VersionInfo + sseHub *SSEHub // SSE hub for real-time streaming } // New creates a new HTTP server @@ -66,6 +67,7 @@ func New(cfg *config.Config, store *storage.Storage, sched *monitor.Scheduler, l scheduler: sched, logger: logger, templates: tmpl, + sseHub: NewSSEHub(logger), } // Setup routes @@ -106,6 +108,13 @@ func New(cfg *config.Config, store *storage.Storage, sched *monitor.Scheduler, l // Config reload endpoint - always requires authentication mux.HandleFunc("POST /api/reload", s.withStrictAuth(s.handleAPIReload)) + // SSE stream endpoint - public for stream mode, otherwise follows api.access + if cfg.Display.RefreshMode == "stream" { + mux.HandleFunc("GET /api/stream", s.handleAPIStream) + } else { + mux.HandleFunc("GET /api/stream", s.withAPIAuth(s.handleAPIStream)) + } + // Create HTTP server s.server = &http.Server{ Addr: fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port), @@ -295,12 +304,15 @@ type IncidentUpdateData struct { Message string } -// handleIndex renders the main status page func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) { ctx := r.Context() cfg := s.getConfig() - // Get all monitor stats + if cfg.Display.RefreshMode == "stream" { + s.handleIndexStream(w, r) + return + } + stats, err := s.storage.GetAllMonitorStats(ctx) if err != nil { s.logger.Error("failed to get monitor stats", "error", err) @@ -502,15 +514,132 @@ func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) { return } - // Write buffered response atomically w.Header().Set("Content-Type", "text/html; charset=utf-8") if _, err := w.Write(buf.Bytes()); err != nil { - // Client likely disconnected, just log it s.logger.Debug("failed to write response", "error", err) } } -// handleAPIStatus returns JSON status for all monitors +func (s *Server) handleIndexStream(w http.ResponseWriter, r *http.Request) { + cfg := s.getConfig() + + var themeCSS template.CSS + if cfg.Site.ThemeURL != "" { + resolvedTheme, err := theme.LoadTheme(cfg.Site.ThemeURL) + if err != nil { + s.logger.Warn("failed to load theme", "url", cfg.Site.ThemeURL, "error", err) + } else if resolvedTheme != nil { + cssString := resolvedTheme.GenerateCSS() + resolvedTheme.GenerateVariableOverrides() + themeCSS = template.CSS(cssString) + } + } + + data := StreamPageData{ + Site: cfg.Site, + TickMode: cfg.Display.TickMode, + TickCount: cfg.Display.TickCount, + Timezone: cfg.Display.Timezone, + UseBrowserTimezone: cfg.Display.Timezone == "Browser", + ThemeCSS: themeCSS, + CustomHead: template.HTML(cfg.Site.CustomHead), + Scale: cfg.Display.Scale, + VersionTooltip: s.formatVersionTooltip(), + Groups: make([]StreamGroupData, 0, len(cfg.Groups)), + } + + for _, group := range cfg.Groups { + gd := StreamGroupData{ + Name: group.Name, + DefaultCollapsed: group.DefaultCollapsed != nil && *group.DefaultCollapsed, + ShowGroupUptime: group.ShowGroupUptime == nil || *group.ShowGroupUptime, + Monitors: make([]StreamMonitorData, 0, len(group.Monitors)), + } + + for _, monCfg := range group.Monitors { + gd.Monitors = append(gd.Monitors, StreamMonitorData{ + ID: monCfg.ID(), + Name: monCfg.Name, + Type: monCfg.Type, + Link: template.URL(monCfg.Link), + HidePing: monCfg.HidePing, + DisablePingTooltips: monCfg.DisablePingTooltips, + }) + } + + data.Groups = append(data.Groups, gd) + } + + for _, inc := range cfg.Incidents { + isActive := inc.Status != "resolved" + id := StreamIncidentData{ + Title: inc.Title, + Status: inc.Status, + StatusClass: incidentStatusToClass(inc.Status), + Message: inc.Message, + ScheduledStart: inc.ScheduledStart, + ScheduledEnd: inc.ScheduledEnd, + CreatedAt: inc.CreatedAt, + IsScheduled: inc.Status == "scheduled", + IsActive: isActive, + } + data.Incidents = append(data.Incidents, id) + } + + var buf bytes.Buffer + if err := s.templates.ExecuteTemplate(&buf, "stream.html", data); err != nil { + s.logger.Error("failed to render stream template", "error", err) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + if _, err := w.Write(buf.Bytes()); err != nil { + s.logger.Debug("failed to write response", "error", err) + } +} + +type StreamPageData struct { + Site config.SiteConfig + Groups []StreamGroupData + Incidents []StreamIncidentData + TickMode string + TickCount int + Timezone string + UseBrowserTimezone bool + ThemeCSS template.CSS + CustomHead template.HTML + Scale float64 + VersionTooltip string +} + +type StreamGroupData struct { + Name string + Monitors []StreamMonitorData + DefaultCollapsed bool + ShowGroupUptime bool +} + +type StreamMonitorData struct { + ID string + Name string + Type string + Link template.URL + HidePing bool + DisablePingTooltips bool +} + +type StreamIncidentData struct { + Title string + Status string + StatusClass string + Message string + ScheduledStart *time.Time + ScheduledEnd *time.Time + CreatedAt *time.Time + IsScheduled bool + IsActive bool +} + func (s *Server) handleAPIStatus(w http.ResponseWriter, r *http.Request) { ctx := r.Context() @@ -675,6 +804,202 @@ func (s *Server) handleAPIPage(w http.ResponseWriter, r *http.Request) { s.jsonResponse(w, response) } +func (s *Server) handleAPIStream(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + cfg := s.getConfig() + + flusher, ok := w.(http.Flusher) + if !ok { + s.jsonError(w, "SSE not supported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + s.logger.Debug("SSE client connected", "remote_addr", r.RemoteAddr) + + initialData := s.buildSSEPageData(ctx, cfg, "init") + s.writeSSEEvent(w, flusher, "init", initialData) + + client := &sseClient{ + ch: make(chan []byte, 64), + doneCh: make(chan struct{}), + clientIP: r.RemoteAddr, + } + s.sseHub.addClient(client) + defer s.sseHub.removeClient(client) + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + s.logger.Debug("SSE client disconnected", "remote_addr", r.RemoteAddr) + return + case data := <-client.ch: + fmt.Fprintf(w, "event: update\ndata: %s\n\n", data) + flusher.Flush() + case <-ticker.C: + fmt.Fprintf(w, ": keepalive\n\n") + flusher.Flush() + } + } +} + +func (s *Server) writeSSEEvent(w http.ResponseWriter, flusher http.Flusher, eventType string, data any) { + jsonData, err := json.Marshal(data) + if err != nil { + s.logger.Error("failed to marshal SSE data", "error", err) + return + } + fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, jsonData) + flusher.Flush() +} + +func (s *Server) buildSSEPageData(ctx context.Context, cfg *config.Config, dataType string) *SSEPageData { + stats, err := s.storage.GetAllMonitorStats(ctx) + if err != nil { + s.logger.Error("failed to get monitor stats for SSE", "error", err) + stats = make(map[string]*storage.MonitorStats) + } + + data := &SSEPageData{ + Type: dataType, + Monitors: make(map[string]APIMonitorData), + LastUpdated: time.Now(), + } + + overallUp := true + hasDegraded := false + + if dataType == "init" { + data.Site = &SSESiteData{ + Name: cfg.Site.Name, + Description: cfg.Site.Description, + } + data.Groups = make([]SSEGroupData, 0, len(cfg.Groups)) + data.Incidents = make([]SSEIncidentData, 0, len(cfg.Incidents)) + + for _, inc := range cfg.Incidents { + isActive := inc.Status != "resolved" + data.Incidents = append(data.Incidents, SSEIncidentData{ + Title: inc.Title, + Status: inc.Status, + Message: inc.Message, + IsActive: isActive, + IsScheduled: inc.Status == "scheduled", + ScheduledStart: inc.ScheduledStart, + ScheduledEnd: inc.ScheduledEnd, + CreatedAt: inc.CreatedAt, + }) + } + } + + for _, group := range cfg.Groups { + var groupData SSEGroupData + if dataType == "init" { + groupData = SSEGroupData{ + Name: group.Name, + MonitorIDs: make([]string, 0, len(group.Monitors)), + DefaultCollapsed: group.DefaultCollapsed != nil && *group.DefaultCollapsed, + ShowGroupUptime: group.ShowGroupUptime == nil || *group.ShowGroupUptime, + } + } + + var totalUptime float64 + var monitorsWithUptime int + + for _, monCfg := range group.Monitors { + monitorID := monCfg.ID() + + if dataType == "init" { + groupData.MonitorIDs = append(groupData.MonitorIDs, monitorID) + } + + stat, ok := stats[monitorID] + if !ok { + data.Monitors[monitorID] = APIMonitorData{ + Status: "unknown", + Ticks: nil, + } + continue + } + + ticks, err := s.storage.GetAggregatedHistory( + ctx, + monitorID, + cfg.Display.TickCount, + cfg.Display.TickMode, + cfg.Display.PingFixedSlots, + ) + if err != nil { + s.logger.Error("failed to get tick history", "monitor", monitorID, "error", err) + ticks = nil + } + + data.Monitors[monitorID] = APIMonitorData{ + Status: stat.CurrentStatus, + ResponseTime: stat.LastResponseTime, + Uptime: stat.UptimePercent, + LastError: stat.LastError, + SSLDaysLeft: stat.SSLDaysLeft, + Ticks: ticks, + } + + data.Counts.Total++ + switch stat.CurrentStatus { + case "down": + overallUp = false + data.Counts.Down++ + case "degraded": + hasDegraded = true + data.Counts.Degraded++ + case "up": + data.Counts.Up++ + } + + if stat.UptimePercent >= 0 { + totalUptime += stat.UptimePercent + monitorsWithUptime++ + } + } + + if dataType == "init" { + if monitorsWithUptime > 0 { + groupData.GroupUptime = totalUptime / float64(monitorsWithUptime) + } + data.Groups = append(data.Groups, groupData) + } + } + + if !overallUp { + data.OverallStatus = "Major Outage" + } else if hasDegraded { + data.OverallStatus = "Partial Outage" + } else { + data.OverallStatus = "All Systems Operational" + } + + return data +} + +func (s *Server) BroadcastStatusUpdate(ctx context.Context) { + if s.sseHub.ClientCount() == 0 { + return + } + cfg := s.getConfig() + data := s.buildSSEPageData(ctx, cfg, "update") + s.sseHub.Broadcast(data) +} + +func (s *Server) GetSSEHub() *SSEHub { + return s.sseHub +} + // handleAPIHealth returns a simple health check response (always public) func (s *Server) handleAPIHealth(w http.ResponseWriter, r *http.Request) { s.jsonResponse(w, map[string]string{"status": "ok"}) |