aboutsummaryrefslogtreecommitdiff
path: root/internal/server/server.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-01-28 03:12:23 -0800
committerFuwn <[email protected]>2026-01-28 03:12:23 -0800
commitdefea76033f75804a45bbb650e19cb16c677295b (patch)
treecfc507fb0d782f02925c001429df7d3c1d079420 /internal/server/server.go
parentfix: Handle libsql string-based time values (diff)
downloadkaze-defea76033f75804a45bbb650e19cb16c677295b.tar.xz
kaze-defea76033f75804a45bbb650e19cb16c677295b.zip
feat: Add SSE streaming for instant page load and real-time updates
New refresh_mode 'stream' eliminates blocking database queries from initial page load. Page renders instantly with skeleton UI, then hydrates via SSE. - Add SSE hub for managing client connections and broadcasting - Add /api/stream endpoint with init and update events - Add stream.html skeleton template with loading animations - Wire scheduler to broadcast on check completion - Backwards compatible: page/api modes unchanged
Diffstat (limited to 'internal/server/server.go')
-rw-r--r--internal/server/server.go335
1 files changed, 330 insertions, 5 deletions
diff --git a/internal/server/server.go b/internal/server/server.go
index 9296647..e0b0b15 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -50,6 +50,7 @@ type Server struct {
templates *template.Template
reloadConfig ReloadFunc
version VersionInfo
+ sseHub *SSEHub // SSE hub for real-time streaming
}
// New creates a new HTTP server
@@ -66,6 +67,7 @@ func New(cfg *config.Config, store *storage.Storage, sched *monitor.Scheduler, l
scheduler: sched,
logger: logger,
templates: tmpl,
+ sseHub: NewSSEHub(logger),
}
// Setup routes
@@ -106,6 +108,13 @@ func New(cfg *config.Config, store *storage.Storage, sched *monitor.Scheduler, l
// Config reload endpoint - always requires authentication
mux.HandleFunc("POST /api/reload", s.withStrictAuth(s.handleAPIReload))
+ // SSE stream endpoint - public for stream mode, otherwise follows api.access
+ if cfg.Display.RefreshMode == "stream" {
+ mux.HandleFunc("GET /api/stream", s.handleAPIStream)
+ } else {
+ mux.HandleFunc("GET /api/stream", s.withAPIAuth(s.handleAPIStream))
+ }
+
// Create HTTP server
s.server = &http.Server{
Addr: fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port),
@@ -295,12 +304,15 @@ type IncidentUpdateData struct {
Message string
}
-// handleIndex renders the main status page
func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
cfg := s.getConfig()
- // Get all monitor stats
+ if cfg.Display.RefreshMode == "stream" {
+ s.handleIndexStream(w, r)
+ return
+ }
+
stats, err := s.storage.GetAllMonitorStats(ctx)
if err != nil {
s.logger.Error("failed to get monitor stats", "error", err)
@@ -502,15 +514,132 @@ func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) {
return
}
- // Write buffered response atomically
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if _, err := w.Write(buf.Bytes()); err != nil {
- // Client likely disconnected, just log it
s.logger.Debug("failed to write response", "error", err)
}
}
-// handleAPIStatus returns JSON status for all monitors
+func (s *Server) handleIndexStream(w http.ResponseWriter, r *http.Request) {
+ cfg := s.getConfig()
+
+ var themeCSS template.CSS
+ if cfg.Site.ThemeURL != "" {
+ resolvedTheme, err := theme.LoadTheme(cfg.Site.ThemeURL)
+ if err != nil {
+ s.logger.Warn("failed to load theme", "url", cfg.Site.ThemeURL, "error", err)
+ } else if resolvedTheme != nil {
+ cssString := resolvedTheme.GenerateCSS() + resolvedTheme.GenerateVariableOverrides()
+ themeCSS = template.CSS(cssString)
+ }
+ }
+
+ data := StreamPageData{
+ Site: cfg.Site,
+ TickMode: cfg.Display.TickMode,
+ TickCount: cfg.Display.TickCount,
+ Timezone: cfg.Display.Timezone,
+ UseBrowserTimezone: cfg.Display.Timezone == "Browser",
+ ThemeCSS: themeCSS,
+ CustomHead: template.HTML(cfg.Site.CustomHead),
+ Scale: cfg.Display.Scale,
+ VersionTooltip: s.formatVersionTooltip(),
+ Groups: make([]StreamGroupData, 0, len(cfg.Groups)),
+ }
+
+ for _, group := range cfg.Groups {
+ gd := StreamGroupData{
+ Name: group.Name,
+ DefaultCollapsed: group.DefaultCollapsed != nil && *group.DefaultCollapsed,
+ ShowGroupUptime: group.ShowGroupUptime == nil || *group.ShowGroupUptime,
+ Monitors: make([]StreamMonitorData, 0, len(group.Monitors)),
+ }
+
+ for _, monCfg := range group.Monitors {
+ gd.Monitors = append(gd.Monitors, StreamMonitorData{
+ ID: monCfg.ID(),
+ Name: monCfg.Name,
+ Type: monCfg.Type,
+ Link: template.URL(monCfg.Link),
+ HidePing: monCfg.HidePing,
+ DisablePingTooltips: monCfg.DisablePingTooltips,
+ })
+ }
+
+ data.Groups = append(data.Groups, gd)
+ }
+
+ for _, inc := range cfg.Incidents {
+ isActive := inc.Status != "resolved"
+ id := StreamIncidentData{
+ Title: inc.Title,
+ Status: inc.Status,
+ StatusClass: incidentStatusToClass(inc.Status),
+ Message: inc.Message,
+ ScheduledStart: inc.ScheduledStart,
+ ScheduledEnd: inc.ScheduledEnd,
+ CreatedAt: inc.CreatedAt,
+ IsScheduled: inc.Status == "scheduled",
+ IsActive: isActive,
+ }
+ data.Incidents = append(data.Incidents, id)
+ }
+
+ var buf bytes.Buffer
+ if err := s.templates.ExecuteTemplate(&buf, "stream.html", data); err != nil {
+ s.logger.Error("failed to render stream template", "error", err)
+ http.Error(w, "Internal Server Error", http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "text/html; charset=utf-8")
+ if _, err := w.Write(buf.Bytes()); err != nil {
+ s.logger.Debug("failed to write response", "error", err)
+ }
+}
+
+type StreamPageData struct {
+ Site config.SiteConfig
+ Groups []StreamGroupData
+ Incidents []StreamIncidentData
+ TickMode string
+ TickCount int
+ Timezone string
+ UseBrowserTimezone bool
+ ThemeCSS template.CSS
+ CustomHead template.HTML
+ Scale float64
+ VersionTooltip string
+}
+
+type StreamGroupData struct {
+ Name string
+ Monitors []StreamMonitorData
+ DefaultCollapsed bool
+ ShowGroupUptime bool
+}
+
+type StreamMonitorData struct {
+ ID string
+ Name string
+ Type string
+ Link template.URL
+ HidePing bool
+ DisablePingTooltips bool
+}
+
+type StreamIncidentData struct {
+ Title string
+ Status string
+ StatusClass string
+ Message string
+ ScheduledStart *time.Time
+ ScheduledEnd *time.Time
+ CreatedAt *time.Time
+ IsScheduled bool
+ IsActive bool
+}
+
func (s *Server) handleAPIStatus(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
@@ -675,6 +804,202 @@ func (s *Server) handleAPIPage(w http.ResponseWriter, r *http.Request) {
s.jsonResponse(w, response)
}
+func (s *Server) handleAPIStream(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ cfg := s.getConfig()
+
+ flusher, ok := w.(http.Flusher)
+ if !ok {
+ s.jsonError(w, "SSE not supported", http.StatusInternalServerError)
+ return
+ }
+
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+ w.Header().Set("X-Accel-Buffering", "no")
+
+ s.logger.Debug("SSE client connected", "remote_addr", r.RemoteAddr)
+
+ initialData := s.buildSSEPageData(ctx, cfg, "init")
+ s.writeSSEEvent(w, flusher, "init", initialData)
+
+ client := &sseClient{
+ ch: make(chan []byte, 64),
+ doneCh: make(chan struct{}),
+ clientIP: r.RemoteAddr,
+ }
+ s.sseHub.addClient(client)
+ defer s.sseHub.removeClient(client)
+
+ ticker := time.NewTicker(30 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ s.logger.Debug("SSE client disconnected", "remote_addr", r.RemoteAddr)
+ return
+ case data := <-client.ch:
+ fmt.Fprintf(w, "event: update\ndata: %s\n\n", data)
+ flusher.Flush()
+ case <-ticker.C:
+ fmt.Fprintf(w, ": keepalive\n\n")
+ flusher.Flush()
+ }
+ }
+}
+
+func (s *Server) writeSSEEvent(w http.ResponseWriter, flusher http.Flusher, eventType string, data any) {
+ jsonData, err := json.Marshal(data)
+ if err != nil {
+ s.logger.Error("failed to marshal SSE data", "error", err)
+ return
+ }
+ fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, jsonData)
+ flusher.Flush()
+}
+
+func (s *Server) buildSSEPageData(ctx context.Context, cfg *config.Config, dataType string) *SSEPageData {
+ stats, err := s.storage.GetAllMonitorStats(ctx)
+ if err != nil {
+ s.logger.Error("failed to get monitor stats for SSE", "error", err)
+ stats = make(map[string]*storage.MonitorStats)
+ }
+
+ data := &SSEPageData{
+ Type: dataType,
+ Monitors: make(map[string]APIMonitorData),
+ LastUpdated: time.Now(),
+ }
+
+ overallUp := true
+ hasDegraded := false
+
+ if dataType == "init" {
+ data.Site = &SSESiteData{
+ Name: cfg.Site.Name,
+ Description: cfg.Site.Description,
+ }
+ data.Groups = make([]SSEGroupData, 0, len(cfg.Groups))
+ data.Incidents = make([]SSEIncidentData, 0, len(cfg.Incidents))
+
+ for _, inc := range cfg.Incidents {
+ isActive := inc.Status != "resolved"
+ data.Incidents = append(data.Incidents, SSEIncidentData{
+ Title: inc.Title,
+ Status: inc.Status,
+ Message: inc.Message,
+ IsActive: isActive,
+ IsScheduled: inc.Status == "scheduled",
+ ScheduledStart: inc.ScheduledStart,
+ ScheduledEnd: inc.ScheduledEnd,
+ CreatedAt: inc.CreatedAt,
+ })
+ }
+ }
+
+ for _, group := range cfg.Groups {
+ var groupData SSEGroupData
+ if dataType == "init" {
+ groupData = SSEGroupData{
+ Name: group.Name,
+ MonitorIDs: make([]string, 0, len(group.Monitors)),
+ DefaultCollapsed: group.DefaultCollapsed != nil && *group.DefaultCollapsed,
+ ShowGroupUptime: group.ShowGroupUptime == nil || *group.ShowGroupUptime,
+ }
+ }
+
+ var totalUptime float64
+ var monitorsWithUptime int
+
+ for _, monCfg := range group.Monitors {
+ monitorID := monCfg.ID()
+
+ if dataType == "init" {
+ groupData.MonitorIDs = append(groupData.MonitorIDs, monitorID)
+ }
+
+ stat, ok := stats[monitorID]
+ if !ok {
+ data.Monitors[monitorID] = APIMonitorData{
+ Status: "unknown",
+ Ticks: nil,
+ }
+ continue
+ }
+
+ ticks, err := s.storage.GetAggregatedHistory(
+ ctx,
+ monitorID,
+ cfg.Display.TickCount,
+ cfg.Display.TickMode,
+ cfg.Display.PingFixedSlots,
+ )
+ if err != nil {
+ s.logger.Error("failed to get tick history", "monitor", monitorID, "error", err)
+ ticks = nil
+ }
+
+ data.Monitors[monitorID] = APIMonitorData{
+ Status: stat.CurrentStatus,
+ ResponseTime: stat.LastResponseTime,
+ Uptime: stat.UptimePercent,
+ LastError: stat.LastError,
+ SSLDaysLeft: stat.SSLDaysLeft,
+ Ticks: ticks,
+ }
+
+ data.Counts.Total++
+ switch stat.CurrentStatus {
+ case "down":
+ overallUp = false
+ data.Counts.Down++
+ case "degraded":
+ hasDegraded = true
+ data.Counts.Degraded++
+ case "up":
+ data.Counts.Up++
+ }
+
+ if stat.UptimePercent >= 0 {
+ totalUptime += stat.UptimePercent
+ monitorsWithUptime++
+ }
+ }
+
+ if dataType == "init" {
+ if monitorsWithUptime > 0 {
+ groupData.GroupUptime = totalUptime / float64(monitorsWithUptime)
+ }
+ data.Groups = append(data.Groups, groupData)
+ }
+ }
+
+ if !overallUp {
+ data.OverallStatus = "Major Outage"
+ } else if hasDegraded {
+ data.OverallStatus = "Partial Outage"
+ } else {
+ data.OverallStatus = "All Systems Operational"
+ }
+
+ return data
+}
+
+func (s *Server) BroadcastStatusUpdate(ctx context.Context) {
+ if s.sseHub.ClientCount() == 0 {
+ return
+ }
+ cfg := s.getConfig()
+ data := s.buildSSEPageData(ctx, cfg, "update")
+ s.sseHub.Broadcast(data)
+}
+
+func (s *Server) GetSSEHub() *SSEHub {
+ return s.sseHub
+}
+
// handleAPIHealth returns a simple health check response (always public)
func (s *Server) handleAPIHealth(w http.ResponseWriter, r *http.Request) {
s.jsonResponse(w, map[string]string{"status": "ok"})