diff options
| author | Fuwn <[email protected]> | 2026-02-26 15:07:03 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-26 15:07:03 -0800 |
| commit | f1b26a68534e4299de7b5fe6b15d2b248fac40e1 (patch) | |
| tree | 4003d041bc46866b7d4b7f0bfb6c6924d991cbb6 /internal/api/observability.go | |
| parent | feat: initial Plutia release — verifiable high-performance PLC mirror (mirr... (diff) | |
| download | plutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.tar.xz plutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.zip | |
feat: harden launch readiness with versioning, metrics, and resilience
Diffstat (limited to 'internal/api/observability.go')
| -rw-r--r-- | internal/api/observability.go | 293 |
1 files changed, 293 insertions, 0 deletions
diff --git a/internal/api/observability.go b/internal/api/observability.go new file mode 100644 index 0000000..ebd7711 --- /dev/null +++ b/internal/api/observability.go @@ -0,0 +1,293 @@ +package api + +import ( + "net" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/ingest" + "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +type BuildInfo struct { + Version string `json:"version"` + Commit string `json:"commit"` + BuildDate string `json:"build_date"` + GoVersion string `json:"go_version"` +} + +type serverMetrics struct { + registry *prometheus.Registry + checkpointDuration prometheus.Histogram + checkpointSequence prometheus.Gauge +} + +func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.Service) *serverMetrics { + reg := prometheus.NewRegistry() + var diskMu sync.Mutex + var diskCached int64 + var diskCachedAt time.Time + m := &serverMetrics{ + registry: reg, + checkpointDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "checkpoint_duration_seconds", + Help: "Time spent generating and signing checkpoints.", + Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 20, 60}, + }), + checkpointSequence: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "checkpoint_sequence", + Help: "Latest checkpoint sequence generated by this mirror.", + }), + } + reg.MustRegister(m.checkpointDuration, m.checkpointSequence) + reg.MustRegister(prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Name: "ingest_ops_total", + Help: "Total operations persisted by the mirror.", + }, + func() float64 { + seq, err := store.GetGlobalSeq() + if err != nil { + return 0 + } + return float64(seq) + }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "ingest_ops_per_second", + Help: "Ingestion operations per second (process-average).", + }, + func() float64 { + if ingestor == nil { + return 0 + } + return ingestor.Stats().IngestOpsPerSec + }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "ingest_lag_ops", + Help: "Difference between latest observed upstream sequence and committed global sequence.", + }, + func() float64 { + if ingestor == nil { + return 0 + } + return float64(ingestor.Stats().LagOps) + }, + )) + reg.MustRegister(prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Name: "verify_failures_total", + Help: "Total signature/link verification failures seen during ingestion.", + }, + func() float64 { + if ingestor == nil { + return 0 + } + return float64(ingestor.Stats().VerifyFailures) + }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "disk_bytes_total", + Help: "Total bytes used by the configured data directory.", + }, + func() float64 { + diskMu.Lock() + defer diskMu.Unlock() + if diskCachedAt.IsZero() || time.Since(diskCachedAt) > 5*time.Second { + size, err := dirSize(cfg.DataDir) + if err == nil { + diskCached = size + diskCachedAt = time.Now() + } + } + return float64(diskCached) + }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "did_count", + Help: "Number of DIDs currently materialized in state storage.", + }, + func() float64 { + if ingestor != nil { + return float64(ingestor.Stats().DIDCount) + } + count := uint64(0) + _ = store.ForEachState(func(_ types.StateV1) error { + count++ + return nil + }) + return float64(count) + }, + )) + return m +} + +func (m *serverMetrics) Handler() http.Handler { + return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{ + EnableOpenMetrics: true, + }) +} + +func (m *serverMetrics) ObserveCheckpoint(duration time.Duration, sequence uint64) { + m.checkpointDuration.Observe(duration.Seconds()) + m.checkpointSequence.Set(float64(sequence)) +} + +type serverOption func(*Server) + +func WithBuildInfo(info BuildInfo) serverOption { + return func(s *Server) { + s.build = info + } +} + +type limiterClass int + +const ( + limiterResolve limiterClass = iota + limiterProof +) + +type tokenBucket struct { + tokens float64 + last time.Time + lastSeen time.Time +} + +type endpointPolicy struct { + rps float64 + burst float64 +} + +type ipRateLimiter struct { + mu sync.Mutex + buckets map[string]*tokenBucket + resolve endpointPolicy + proof endpointPolicy + lastSweep time.Time +} + +func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter { + def := config.Default().RateLimit + if cfg.ResolveRPS <= 0 { + cfg.ResolveRPS = def.ResolveRPS + } + if cfg.ResolveBurst <= 0 { + cfg.ResolveBurst = def.ResolveBurst + } + if cfg.ProofRPS <= 0 { + cfg.ProofRPS = def.ProofRPS + } + if cfg.ProofBurst <= 0 { + cfg.ProofBurst = def.ProofBurst + } + return &ipRateLimiter{ + buckets: map[string]*tokenBucket{}, + resolve: endpointPolicy{ + rps: cfg.ResolveRPS, + burst: float64(cfg.ResolveBurst), + }, + proof: endpointPolicy{ + rps: cfg.ProofRPS, + burst: float64(cfg.ProofBurst), + }, + lastSweep: time.Now(), + } +} + +func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool { + now := time.Now() + l.mu.Lock() + defer l.mu.Unlock() + + if now.Sub(l.lastSweep) > 2*time.Minute { + for key, bucket := range l.buckets { + if now.Sub(bucket.lastSeen) > 15*time.Minute { + delete(l.buckets, key) + } + } + l.lastSweep = now + } + + policy := l.resolve + routeKey := "resolve" + if class == limiterProof { + policy = l.proof + routeKey = "proof" + } + key := routeKey + "|" + ip + b, ok := l.buckets[key] + if !ok { + l.buckets[key] = &tokenBucket{ + tokens: policy.burst - 1, + last: now, + lastSeen: now, + } + return true + } + elapsed := now.Sub(b.last).Seconds() + if elapsed > 0 { + b.tokens += elapsed * policy.rps + if b.tokens > policy.burst { + b.tokens = policy.burst + } + } + b.last = now + b.lastSeen = now + if b.tokens < 1 { + return false + } + b.tokens-- + return true +} + +func clientIP(r *http.Request) string { + if forwarded := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); forwarded != "" { + parts := strings.Split(forwarded, ",") + if len(parts) > 0 { + if ip := strings.TrimSpace(parts[0]); ip != "" { + return ip + } + } + } + if realIP := strings.TrimSpace(r.Header.Get("X-Real-IP")); realIP != "" { + return realIP + } + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err == nil && host != "" { + return host + } + return r.RemoteAddr +} + +func dirSize(path string) (int64, error) { + var total int64 + err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + info, err := d.Info() + if err != nil { + return err + } + total += info.Size() + return nil + }) + return total, err +} |