aboutsummaryrefslogtreecommitdiff
path: root/internal/api/observability.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-26 15:07:03 -0800
committerFuwn <[email protected]>2026-02-26 15:07:03 -0800
commitf1b26a68534e4299de7b5fe6b15d2b248fac40e1 (patch)
tree4003d041bc46866b7d4b7f0bfb6c6924d991cbb6 /internal/api/observability.go
parentfeat: initial Plutia release — verifiable high-performance PLC mirror (mirr... (diff)
downloadplutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.tar.xz
plutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.zip
feat: harden launch readiness with versioning, metrics, and resilience
Diffstat (limited to 'internal/api/observability.go')
-rw-r--r--internal/api/observability.go293
1 files changed, 293 insertions, 0 deletions
diff --git a/internal/api/observability.go b/internal/api/observability.go
new file mode 100644
index 0000000..ebd7711
--- /dev/null
+++ b/internal/api/observability.go
@@ -0,0 +1,293 @@
+package api
+
+import (
+ "net"
+ "net/http"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/ingest"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+type BuildInfo struct {
+ Version string `json:"version"`
+ Commit string `json:"commit"`
+ BuildDate string `json:"build_date"`
+ GoVersion string `json:"go_version"`
+}
+
+type serverMetrics struct {
+ registry *prometheus.Registry
+ checkpointDuration prometheus.Histogram
+ checkpointSequence prometheus.Gauge
+}
+
+func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.Service) *serverMetrics {
+ reg := prometheus.NewRegistry()
+ var diskMu sync.Mutex
+ var diskCached int64
+ var diskCachedAt time.Time
+ m := &serverMetrics{
+ registry: reg,
+ checkpointDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
+ Name: "checkpoint_duration_seconds",
+ Help: "Time spent generating and signing checkpoints.",
+ Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 20, 60},
+ }),
+ checkpointSequence: prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: "checkpoint_sequence",
+ Help: "Latest checkpoint sequence generated by this mirror.",
+ }),
+ }
+ reg.MustRegister(m.checkpointDuration, m.checkpointSequence)
+ reg.MustRegister(prometheus.NewCounterFunc(
+ prometheus.CounterOpts{
+ Name: "ingest_ops_total",
+ Help: "Total operations persisted by the mirror.",
+ },
+ func() float64 {
+ seq, err := store.GetGlobalSeq()
+ if err != nil {
+ return 0
+ }
+ return float64(seq)
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Name: "ingest_ops_per_second",
+ Help: "Ingestion operations per second (process-average).",
+ },
+ func() float64 {
+ if ingestor == nil {
+ return 0
+ }
+ return ingestor.Stats().IngestOpsPerSec
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Name: "ingest_lag_ops",
+ Help: "Difference between latest observed upstream sequence and committed global sequence.",
+ },
+ func() float64 {
+ if ingestor == nil {
+ return 0
+ }
+ return float64(ingestor.Stats().LagOps)
+ },
+ ))
+ reg.MustRegister(prometheus.NewCounterFunc(
+ prometheus.CounterOpts{
+ Name: "verify_failures_total",
+ Help: "Total signature/link verification failures seen during ingestion.",
+ },
+ func() float64 {
+ if ingestor == nil {
+ return 0
+ }
+ return float64(ingestor.Stats().VerifyFailures)
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Name: "disk_bytes_total",
+ Help: "Total bytes used by the configured data directory.",
+ },
+ func() float64 {
+ diskMu.Lock()
+ defer diskMu.Unlock()
+ if diskCachedAt.IsZero() || time.Since(diskCachedAt) > 5*time.Second {
+ size, err := dirSize(cfg.DataDir)
+ if err == nil {
+ diskCached = size
+ diskCachedAt = time.Now()
+ }
+ }
+ return float64(diskCached)
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Name: "did_count",
+ Help: "Number of DIDs currently materialized in state storage.",
+ },
+ func() float64 {
+ if ingestor != nil {
+ return float64(ingestor.Stats().DIDCount)
+ }
+ count := uint64(0)
+ _ = store.ForEachState(func(_ types.StateV1) error {
+ count++
+ return nil
+ })
+ return float64(count)
+ },
+ ))
+ return m
+}
+
+func (m *serverMetrics) Handler() http.Handler {
+ return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{
+ EnableOpenMetrics: true,
+ })
+}
+
+func (m *serverMetrics) ObserveCheckpoint(duration time.Duration, sequence uint64) {
+ m.checkpointDuration.Observe(duration.Seconds())
+ m.checkpointSequence.Set(float64(sequence))
+}
+
+type serverOption func(*Server)
+
+func WithBuildInfo(info BuildInfo) serverOption {
+ return func(s *Server) {
+ s.build = info
+ }
+}
+
+type limiterClass int
+
+const (
+ limiterResolve limiterClass = iota
+ limiterProof
+)
+
+type tokenBucket struct {
+ tokens float64
+ last time.Time
+ lastSeen time.Time
+}
+
+type endpointPolicy struct {
+ rps float64
+ burst float64
+}
+
+type ipRateLimiter struct {
+ mu sync.Mutex
+ buckets map[string]*tokenBucket
+ resolve endpointPolicy
+ proof endpointPolicy
+ lastSweep time.Time
+}
+
+func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter {
+ def := config.Default().RateLimit
+ if cfg.ResolveRPS <= 0 {
+ cfg.ResolveRPS = def.ResolveRPS
+ }
+ if cfg.ResolveBurst <= 0 {
+ cfg.ResolveBurst = def.ResolveBurst
+ }
+ if cfg.ProofRPS <= 0 {
+ cfg.ProofRPS = def.ProofRPS
+ }
+ if cfg.ProofBurst <= 0 {
+ cfg.ProofBurst = def.ProofBurst
+ }
+ return &ipRateLimiter{
+ buckets: map[string]*tokenBucket{},
+ resolve: endpointPolicy{
+ rps: cfg.ResolveRPS,
+ burst: float64(cfg.ResolveBurst),
+ },
+ proof: endpointPolicy{
+ rps: cfg.ProofRPS,
+ burst: float64(cfg.ProofBurst),
+ },
+ lastSweep: time.Now(),
+ }
+}
+
+func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool {
+ now := time.Now()
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ if now.Sub(l.lastSweep) > 2*time.Minute {
+ for key, bucket := range l.buckets {
+ if now.Sub(bucket.lastSeen) > 15*time.Minute {
+ delete(l.buckets, key)
+ }
+ }
+ l.lastSweep = now
+ }
+
+ policy := l.resolve
+ routeKey := "resolve"
+ if class == limiterProof {
+ policy = l.proof
+ routeKey = "proof"
+ }
+ key := routeKey + "|" + ip
+ b, ok := l.buckets[key]
+ if !ok {
+ l.buckets[key] = &tokenBucket{
+ tokens: policy.burst - 1,
+ last: now,
+ lastSeen: now,
+ }
+ return true
+ }
+ elapsed := now.Sub(b.last).Seconds()
+ if elapsed > 0 {
+ b.tokens += elapsed * policy.rps
+ if b.tokens > policy.burst {
+ b.tokens = policy.burst
+ }
+ }
+ b.last = now
+ b.lastSeen = now
+ if b.tokens < 1 {
+ return false
+ }
+ b.tokens--
+ return true
+}
+
+func clientIP(r *http.Request) string {
+ if forwarded := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); forwarded != "" {
+ parts := strings.Split(forwarded, ",")
+ if len(parts) > 0 {
+ if ip := strings.TrimSpace(parts[0]); ip != "" {
+ return ip
+ }
+ }
+ }
+ if realIP := strings.TrimSpace(r.Header.Get("X-Real-IP")); realIP != "" {
+ return realIP
+ }
+ host, _, err := net.SplitHostPort(r.RemoteAddr)
+ if err == nil && host != "" {
+ return host
+ }
+ return r.RemoteAddr
+}
+
+func dirSize(path string) (int64, error) {
+ var total int64
+ err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error {
+ if err != nil {
+ return err
+ }
+ if d.IsDir() {
+ return nil
+ }
+ info, err := d.Info()
+ if err != nil {
+ return err
+ }
+ total += info.Size()
+ return nil
+ })
+ return total, err
+}