aboutsummaryrefslogtreecommitdiff
path: root/internal/monitor
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-01-17 23:17:49 -0800
committerFuwn <[email protected]>2026-01-17 23:17:49 -0800
commit4bc6165258cd7b5b76ccb01aa75c7cefdc35d143 (patch)
treee7c3bb335a1efd48f82d365169e8b4a66b7abe1d /internal/monitor
downloadkaze-4bc6165258cd7b5b76ccb01aa75c7cefdc35d143.tar.xz
kaze-4bc6165258cd7b5b76ccb01aa75c7cefdc35d143.zip
feat: Initial commit
Diffstat (limited to 'internal/monitor')
-rw-r--r--internal/monitor/http.go182
-rw-r--r--internal/monitor/monitor.go86
-rw-r--r--internal/monitor/scheduler.go182
-rw-r--r--internal/monitor/tcp.go89
4 files changed, 539 insertions, 0 deletions
diff --git a/internal/monitor/http.go b/internal/monitor/http.go
new file mode 100644
index 0000000..8432401
--- /dev/null
+++ b/internal/monitor/http.go
@@ -0,0 +1,182 @@
+package monitor
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/Fuwn/kaze/internal/config"
+)
+
+// HTTPMonitor monitors HTTP and HTTPS endpoints
+type HTTPMonitor struct {
+ name string
+ monitorType string
+ target string
+ interval time.Duration
+ timeout time.Duration
+ method string
+ headers map[string]string
+ body string
+ expectedStatus int
+ verifySSL bool
+ client *http.Client
+}
+
+// NewHTTPMonitor creates a new HTTP/HTTPS monitor
+func NewHTTPMonitor(cfg config.MonitorConfig) (*HTTPMonitor, error) {
+ // Validate target URL
+ target := cfg.Target
+ if cfg.Type == "https" && !strings.HasPrefix(target, "https://") {
+ if strings.HasPrefix(target, "http://") {
+ target = strings.Replace(target, "http://", "https://", 1)
+ } else {
+ target = "https://" + target
+ }
+ } else if cfg.Type == "http" && !strings.HasPrefix(target, "http://") && !strings.HasPrefix(target, "https://") {
+ target = "http://" + target
+ }
+
+ verifySSL := true
+ if cfg.VerifySSL != nil {
+ verifySSL = *cfg.VerifySSL
+ }
+
+ // Create HTTP client with custom transport
+ transport := &http.Transport{
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: !verifySSL,
+ },
+ DialContext: (&net.Dialer{
+ Timeout: cfg.Timeout.Duration,
+ KeepAlive: 30 * time.Second,
+ }).DialContext,
+ TLSHandshakeTimeout: 10 * time.Second,
+ ResponseHeaderTimeout: cfg.Timeout.Duration,
+ ExpectContinueTimeout: 1 * time.Second,
+ MaxIdleConns: 100,
+ MaxIdleConnsPerHost: 10,
+ IdleConnTimeout: 90 * time.Second,
+ }
+
+ client := &http.Client{
+ Transport: transport,
+ Timeout: cfg.Timeout.Duration,
+ CheckRedirect: func(req *http.Request, via []*http.Request) error {
+ if len(via) >= 10 {
+ return fmt.Errorf("too many redirects")
+ }
+ return nil
+ },
+ }
+
+ return &HTTPMonitor{
+ name: cfg.Name,
+ monitorType: cfg.Type,
+ target: target,
+ interval: cfg.Interval.Duration,
+ timeout: cfg.Timeout.Duration,
+ method: cfg.Method,
+ headers: cfg.Headers,
+ body: cfg.Body,
+ expectedStatus: cfg.ExpectedStatus,
+ verifySSL: verifySSL,
+ client: client,
+ }, nil
+}
+
+// Name returns the monitor's name
+func (m *HTTPMonitor) Name() string {
+ return m.name
+}
+
+// Type returns the monitor type
+func (m *HTTPMonitor) Type() string {
+ return m.monitorType
+}
+
+// Target returns the monitor target
+func (m *HTTPMonitor) Target() string {
+ return m.target
+}
+
+// Interval returns the check interval
+func (m *HTTPMonitor) Interval() time.Duration {
+ return m.interval
+}
+
+// Check performs the HTTP/HTTPS check
+func (m *HTTPMonitor) Check(ctx context.Context) *Result {
+ result := &Result{
+ MonitorName: m.name,
+ Timestamp: time.Now(),
+ }
+
+ // Create request
+ var bodyReader io.Reader
+ if m.body != "" {
+ bodyReader = strings.NewReader(m.body)
+ }
+
+ req, err := http.NewRequestWithContext(ctx, m.method, m.target, bodyReader)
+ if err != nil {
+ result.Status = StatusDown
+ result.Error = fmt.Errorf("failed to create request: %w", err)
+ return result
+ }
+
+ // Set headers
+ req.Header.Set("User-Agent", "Kaze-Monitor/1.0")
+ for key, value := range m.headers {
+ req.Header.Set(key, value)
+ }
+
+ // Perform request and measure response time
+ start := time.Now()
+ resp, err := m.client.Do(req)
+ result.ResponseTime = time.Since(start)
+
+ if err != nil {
+ result.Status = StatusDown
+ result.Error = fmt.Errorf("request failed: %w", err)
+ return result
+ }
+ defer resp.Body.Close()
+
+ // Discard body to allow connection reuse
+ io.Copy(io.Discard, resp.Body)
+
+ result.StatusCode = resp.StatusCode
+
+ // Check SSL certificate for HTTPS
+ if m.monitorType == "https" && resp.TLS != nil && len(resp.TLS.PeerCertificates) > 0 {
+ cert := resp.TLS.PeerCertificates[0]
+ result.SSLExpiry = &cert.NotAfter
+ result.SSLDaysLeft = int(time.Until(cert.NotAfter).Hours() / 24)
+ }
+
+ // Determine status based on response code
+ if resp.StatusCode == m.expectedStatus {
+ result.Status = StatusUp
+ } else if resp.StatusCode >= 200 && resp.StatusCode < 400 {
+ // Got a success code but not the expected one
+ result.Status = StatusDegraded
+ result.Error = fmt.Errorf("unexpected status code: got %d, expected %d", resp.StatusCode, m.expectedStatus)
+ } else {
+ result.Status = StatusDown
+ result.Error = fmt.Errorf("bad status code: %d", resp.StatusCode)
+ }
+
+ // Check for slow response (degraded if > 2 seconds)
+ if result.Status == StatusUp && result.ResponseTime > 2*time.Second {
+ result.Status = StatusDegraded
+ result.Error = fmt.Errorf("slow response: %v", result.ResponseTime)
+ }
+
+ return result
+}
diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go
new file mode 100644
index 0000000..4f4ab0f
--- /dev/null
+++ b/internal/monitor/monitor.go
@@ -0,0 +1,86 @@
+package monitor
+
+import (
+ "context"
+ "time"
+
+ "github.com/Fuwn/kaze/internal/config"
+ "github.com/Fuwn/kaze/internal/storage"
+)
+
+// Result represents the outcome of a monitor check
+type Result struct {
+ MonitorName string
+ Timestamp time.Time
+ Status Status
+ ResponseTime time.Duration
+ StatusCode int // HTTP status code (0 for non-HTTP)
+ Error error
+ SSLExpiry *time.Time
+ SSLDaysLeft int
+}
+
+// Status represents the status of a monitor
+type Status string
+
+const (
+ StatusUp Status = "up"
+ StatusDown Status = "down"
+ StatusDegraded Status = "degraded"
+)
+
+// Monitor is the interface that all monitor types must implement
+type Monitor interface {
+ // Name returns the monitor's name
+ Name() string
+
+ // Type returns the monitor type (http, https, tcp)
+ Type() string
+
+ // Target returns the monitor target (URL or host:port)
+ Target() string
+
+ // Interval returns the check interval
+ Interval() time.Duration
+
+ // Check performs the monitoring check and returns the result
+ Check(ctx context.Context) *Result
+}
+
+// New creates a new monitor based on the configuration
+func New(cfg config.MonitorConfig) (Monitor, error) {
+ switch cfg.Type {
+ case "http", "https":
+ return NewHTTPMonitor(cfg)
+ case "tcp":
+ return NewTCPMonitor(cfg)
+ default:
+ return nil, &UnsupportedTypeError{Type: cfg.Type}
+ }
+}
+
+// UnsupportedTypeError is returned when an unknown monitor type is specified
+type UnsupportedTypeError struct {
+ Type string
+}
+
+func (e *UnsupportedTypeError) Error() string {
+ return "unsupported monitor type: " + e.Type
+}
+
+// ToCheckResult converts a monitor Result to a storage CheckResult
+func (r *Result) ToCheckResult() *storage.CheckResult {
+ cr := &storage.CheckResult{
+ MonitorName: r.MonitorName,
+ Timestamp: r.Timestamp,
+ Status: string(r.Status),
+ ResponseTime: r.ResponseTime.Milliseconds(),
+ StatusCode: r.StatusCode,
+ SSLExpiry: r.SSLExpiry,
+ SSLDaysLeft: r.SSLDaysLeft,
+ }
+ if r.Error != nil {
+ cr.Error = r.Error.Error()
+ }
+ return cr
+}
diff --git a/internal/monitor/scheduler.go b/internal/monitor/scheduler.go
new file mode 100644
index 0000000..7a06131
--- /dev/null
+++ b/internal/monitor/scheduler.go
@@ -0,0 +1,182 @@
+package monitor
+
+import (
+ "context"
+ "log/slog"
+ "sync"
+ "time"
+
+ "github.com/Fuwn/kaze/internal/config"
+ "github.com/Fuwn/kaze/internal/storage"
+)
+
+// Scheduler manages and runs all monitors
+type Scheduler struct {
+ monitors []Monitor
+ storage *storage.Storage
+ logger *slog.Logger
+ wg sync.WaitGroup
+ ctx context.Context
+ cancel context.CancelFunc
+}
+
+// NewScheduler creates a new monitor scheduler
+func NewScheduler(cfg *config.Config, store *storage.Storage, logger *slog.Logger) (*Scheduler, error) {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ s := &Scheduler{
+ storage: store,
+ logger: logger,
+ ctx: ctx,
+ cancel: cancel,
+ }
+
+ // Create monitors from configuration
+ for _, group := range cfg.Groups {
+ for _, monCfg := range group.Monitors {
+ mon, err := New(monCfg)
+ if err != nil {
+ cancel()
+ return nil, err
+ }
+ s.monitors = append(s.monitors, mon)
+ logger.Info("registered monitor",
+ "name", mon.Name(),
+ "type", mon.Type(),
+ "target", mon.Target(),
+ "interval", mon.Interval())
+ }
+ }
+
+ return s, nil
+}
+
+// Start begins running all monitors
+func (s *Scheduler) Start() {
+ s.logger.Info("starting scheduler", "monitors", len(s.monitors))
+
+ for _, mon := range s.monitors {
+ s.wg.Add(1)
+ go s.runMonitor(mon)
+ }
+
+ // Start cleanup routine
+ s.wg.Add(1)
+ go s.runCleanup()
+}
+
+// Stop gracefully stops all monitors
+func (s *Scheduler) Stop() {
+ s.logger.Info("stopping scheduler")
+ s.cancel()
+ s.wg.Wait()
+ s.logger.Info("scheduler stopped")
+}
+
+// runMonitor runs a single monitor in a loop
+func (s *Scheduler) runMonitor(mon Monitor) {
+ defer s.wg.Done()
+
+ // Run immediately on start
+ s.executeCheck(mon)
+
+ ticker := time.NewTicker(mon.Interval())
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-s.ctx.Done():
+ s.logger.Info("monitor stopped", "name", mon.Name())
+ return
+ case <-ticker.C:
+ s.executeCheck(mon)
+ }
+ }
+}
+
+// executeCheck performs a single check and saves the result
+func (s *Scheduler) executeCheck(mon Monitor) {
+ // Create a context with timeout for this check
+ checkCtx, cancel := context.WithTimeout(s.ctx, mon.Interval())
+ defer cancel()
+
+ result := mon.Check(checkCtx)
+
+ // Log the result
+ logAttrs := []any{
+ "name", mon.Name(),
+ "status", result.Status,
+ "response_time", result.ResponseTime,
+ }
+ if result.StatusCode > 0 {
+ logAttrs = append(logAttrs, "status_code", result.StatusCode)
+ }
+ if result.SSLDaysLeft > 0 {
+ logAttrs = append(logAttrs, "ssl_days_left", result.SSLDaysLeft)
+ }
+ if result.Error != nil {
+ logAttrs = append(logAttrs, "error", result.Error)
+ }
+
+ if result.Status == StatusUp {
+ s.logger.Debug("check completed", logAttrs...)
+ } else {
+ s.logger.Warn("check completed", logAttrs...)
+ }
+
+ // Save to storage
+ if err := s.storage.SaveCheckResult(s.ctx, result.ToCheckResult()); err != nil {
+ s.logger.Error("failed to save check result",
+ "name", mon.Name(),
+ "error", err)
+ }
+}
+
+// runCleanup periodically cleans up old data
+func (s *Scheduler) runCleanup() {
+ defer s.wg.Done()
+
+ // Run cleanup daily
+ ticker := time.NewTicker(24 * time.Hour)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-s.ctx.Done():
+ return
+ case <-ticker.C:
+ s.logger.Info("running database cleanup")
+ if err := s.storage.Cleanup(s.ctx); err != nil {
+ s.logger.Error("cleanup failed", "error", err)
+ } else {
+ s.logger.Info("cleanup completed")
+ }
+ }
+ }
+}
+
+// GetMonitors returns all registered monitors
+func (s *Scheduler) GetMonitors() []Monitor {
+ return s.monitors
+}
+
+// RunCheck manually triggers a check for a specific monitor
+func (s *Scheduler) RunCheck(name string) *Result {
+ for _, mon := range s.monitors {
+ if mon.Name() == name {
+ ctx, cancel := context.WithTimeout(context.Background(), mon.Interval())
+ defer cancel()
+ result := mon.Check(ctx)
+
+ // Save the result
+ if err := s.storage.SaveCheckResult(context.Background(), result.ToCheckResult()); err != nil {
+ s.logger.Error("failed to save manual check result",
+ "name", mon.Name(),
+ "error", err)
+ }
+
+ return result
+ }
+ }
+ return nil
+}
diff --git a/internal/monitor/tcp.go b/internal/monitor/tcp.go
new file mode 100644
index 0000000..f93ae10
--- /dev/null
+++ b/internal/monitor/tcp.go
@@ -0,0 +1,89 @@
+package monitor
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "time"
+
+ "github.com/Fuwn/kaze/internal/config"
+)
+
+// TCPMonitor monitors TCP endpoints
+type TCPMonitor struct {
+ name string
+ target string
+ interval time.Duration
+ timeout time.Duration
+}
+
+// NewTCPMonitor creates a new TCP monitor
+func NewTCPMonitor(cfg config.MonitorConfig) (*TCPMonitor, error) {
+ // Validate target format (should be host:port)
+ _, _, err := net.SplitHostPort(cfg.Target)
+ if err != nil {
+ return nil, fmt.Errorf("invalid TCP target %q: must be host:port format: %w", cfg.Target, err)
+ }
+
+ return &TCPMonitor{
+ name: cfg.Name,
+ target: cfg.Target,
+ interval: cfg.Interval.Duration,
+ timeout: cfg.Timeout.Duration,
+ }, nil
+}
+
+// Name returns the monitor's name
+func (m *TCPMonitor) Name() string {
+ return m.name
+}
+
+// Type returns the monitor type
+func (m *TCPMonitor) Type() string {
+ return "tcp"
+}
+
+// Target returns the monitor target
+func (m *TCPMonitor) Target() string {
+ return m.target
+}
+
+// Interval returns the check interval
+func (m *TCPMonitor) Interval() time.Duration {
+ return m.interval
+}
+
+// Check performs the TCP connection check
+func (m *TCPMonitor) Check(ctx context.Context) *Result {
+ result := &Result{
+ MonitorName: m.name,
+ Timestamp: time.Now(),
+ }
+
+ // Create a dialer with timeout
+ dialer := &net.Dialer{
+ Timeout: m.timeout,
+ }
+
+ // Attempt to connect
+ start := time.Now()
+ conn, err := dialer.DialContext(ctx, "tcp", m.target)
+ result.ResponseTime = time.Since(start)
+
+ if err != nil {
+ result.Status = StatusDown
+ result.Error = fmt.Errorf("connection failed: %w", err)
+ return result
+ }
+ defer conn.Close()
+
+ result.Status = StatusUp
+
+ // Check for slow response (degraded if > 1 second for TCP)
+ if result.ResponseTime > 1*time.Second {
+ result.Status = StatusDegraded
+ result.Error = fmt.Errorf("slow connection: %v", result.ResponseTime)
+ }
+
+ return result
+}