package server import ( "encoding/json" "fmt" "log/slog" "net/http" "sync" "time" ) // SSEHub manages Server-Sent Events connections and broadcasts type SSEHub struct { mu sync.RWMutex clients map[*sseClient]struct{} logger *slog.Logger closed bool closedCh chan struct{} } type sseClient struct { ch chan []byte doneCh chan struct{} clientIP string } // NewSSEHub creates a new SSE hub for managing client connections func NewSSEHub(logger *slog.Logger) *SSEHub { return &SSEHub{ clients: make(map[*sseClient]struct{}), logger: logger, closedCh: make(chan struct{}), } } // ServeHTTP handles SSE connections func (h *SSEHub) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Check if the response writer supports flushing flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "SSE not supported", http.StatusInternalServerError) return } // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering // Create client client := &sseClient{ ch: make(chan []byte, 64), // Buffered to prevent blocking broadcasts doneCh: make(chan struct{}), clientIP: r.RemoteAddr, } // Register client h.addClient(client) defer h.removeClient(client) h.logger.Debug("SSE client connected", "remote_addr", r.RemoteAddr) // Send initial connection event fmt.Fprintf(w, "event: connected\ndata: {\"status\":\"connected\"}\n\n") flusher.Flush() // Keepalive ticker (send comment every 30s to keep connection alive) ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() // Stream events to client for { select { case <-r.Context().Done(): h.logger.Debug("SSE client disconnected", "remote_addr", r.RemoteAddr) return case <-h.closedCh: // Hub is closing return case data := <-client.ch: // Send data event fmt.Fprintf(w, "event: update\ndata: %s\n\n", data) flusher.Flush() case <-ticker.C: // Send keepalive comment (not an event, just keeps connection alive) fmt.Fprintf(w, ": keepalive\n\n") flusher.Flush() } } } // Broadcast sends data to all connected clients func (h *SSEHub) Broadcast(data any) { h.mu.RLock() if h.closed { h.mu.RUnlock() return } clientCount := len(h.clients) if clientCount == 0 { h.mu.RUnlock() return } // Marshal data to JSON jsonData, err := json.Marshal(data) if err != nil { h.mu.RUnlock() h.logger.Error("failed to marshal SSE broadcast data", "error", err) return } // Send to all clients (non-blocking) for client := range h.clients { select { case client.ch <- jsonData: // Sent successfully default: // Client buffer full, skip this update h.logger.Debug("SSE client buffer full, skipping update", "remote_addr", client.clientIP) } } h.mu.RUnlock() h.logger.Debug("SSE broadcast sent", "clients", clientCount, "bytes", len(jsonData)) } // ClientCount returns the number of connected clients func (h *SSEHub) ClientCount() int { h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) } // Close shuts down the hub and disconnects all clients func (h *SSEHub) Close() { h.mu.Lock() defer h.mu.Unlock() if h.closed { return } h.closed = true close(h.closedCh) // Close all client channels for client := range h.clients { close(client.doneCh) } h.clients = nil h.logger.Info("SSE hub closed") } func (h *SSEHub) addClient(client *sseClient) { h.mu.Lock() defer h.mu.Unlock() if h.closed { return } h.clients[client] = struct{}{} h.logger.Debug("SSE client registered", "total_clients", len(h.clients)) } func (h *SSEHub) removeClient(client *sseClient) { h.mu.Lock() defer h.mu.Unlock() if _, ok := h.clients[client]; ok { delete(h.clients, client) close(client.ch) h.logger.Debug("SSE client unregistered", "total_clients", len(h.clients)) } } // SSEPageData is the data structure sent via SSE for page updates type SSEPageData struct { Type string `json:"type"` // "init" for initial load, "update" for changes Monitors map[string]APIMonitorData `json:"monitors"` OverallStatus string `json:"overall_status"` Counts StatusCounts `json:"counts"` LastUpdated time.Time `json:"last_updated"` // Additional fields for initial load Groups []SSEGroupData `json:"groups,omitempty"` Incidents []SSEIncidentData `json:"incidents,omitempty"` Site *SSESiteData `json:"site,omitempty"` } // SSEGroupData contains group info for initial SSE load type SSEGroupData struct { Name string `json:"name"` MonitorIDs []string `json:"monitor_ids"` DefaultCollapsed bool `json:"default_collapsed"` ShowGroupUptime bool `json:"show_group_uptime"` GroupUptime float64 `json:"group_uptime"` } // SSEIncidentData contains incident info for SSE type SSEIncidentData struct { Title string `json:"title"` Status string `json:"status"` Message string `json:"message"` IsActive bool `json:"is_active"` IsScheduled bool `json:"is_scheduled"` ScheduledStart *time.Time `json:"scheduled_start,omitempty"` ScheduledEnd *time.Time `json:"scheduled_end,omitempty"` CreatedAt *time.Time `json:"created_at,omitempty"` } // SSESiteData contains site info for initial SSE load type SSESiteData struct { Name string `json:"name"` Description string `json:"description"` }