package api import ( "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/ingest" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "net" "net/http" "os" "path/filepath" "strings" "sync" "time" ) type BuildInfo struct { Version string `json:"version"` Commit string `json:"commit"` BuildDate string `json:"build_date"` GoVersion string `json:"go_version"` } type serverMetrics struct { registry *prometheus.Registry checkpointDuration prometheus.Histogram checkpointSequence prometheus.Gauge } func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.Service) *serverMetrics { reg := prometheus.NewRegistry() var diskMu sync.Mutex var diskCached int64 var diskCachedAt time.Time m := &serverMetrics{ registry: reg, checkpointDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "checkpoint_duration_seconds", Help: "Time spent generating and signing checkpoints.", Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 20, 60}, }), checkpointSequence: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "checkpoint_sequence", Help: "Latest checkpoint sequence generated by this mirror.", }), } reg.MustRegister(m.checkpointDuration, m.checkpointSequence) reg.MustRegister(prometheus.NewCounterFunc( prometheus.CounterOpts{ Name: "ingest_ops_total", Help: "Total operations persisted by the mirror.", }, func() float64 { seq, err := store.GetGlobalSeq() if err != nil { return 0 } return float64(seq) }, )) reg.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Name: "ingest_ops_per_second", Help: "Ingestion operations per second (process-average).", }, func() float64 { if ingestor == nil { return 0 } return ingestor.Stats().IngestOpsPerSec }, )) reg.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Name: "ingest_lag_ops", Help: "Difference between latest observed upstream sequence and committed global sequence.", }, func() float64 { if ingestor == nil { return 0 } return float64(ingestor.Stats().LagOps) }, )) reg.MustRegister(prometheus.NewCounterFunc( prometheus.CounterOpts{ Name: "verify_failures_total", Help: "Total signature/link verification failures seen during ingestion.", }, func() float64 { if ingestor == nil { return 0 } return float64(ingestor.Stats().VerifyFailures) }, )) reg.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Name: "disk_bytes_total", Help: "Total bytes used by the configured data directory.", }, func() float64 { diskMu.Lock() defer diskMu.Unlock() if diskCachedAt.IsZero() || time.Since(diskCachedAt) > 5*time.Second { size, err := dirSize(cfg.DataDir) if err == nil { diskCached = size diskCachedAt = time.Now() } } return float64(diskCached) }, )) reg.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Name: "did_count", Help: "Number of DIDs currently materialized in state storage.", }, func() float64 { if ingestor != nil { return float64(ingestor.Stats().DIDCount) } count := uint64(0) _ = store.ForEachState(func(_ types.StateV1) error { count++ return nil }) return float64(count) }, )) reg.MustRegister(prometheus.NewCounterFunc( prometheus.CounterOpts{ Name: "thin_cache_hits_total", Help: "Total thin-mode cache hits.", }, func() float64 { if ingestor == nil { return 0 } return float64(ingestor.Stats().ThinCacheHits) }, )) reg.MustRegister(prometheus.NewCounterFunc( prometheus.CounterOpts{ Name: "thin_cache_misses_total", Help: "Total thin-mode cache misses.", }, func() float64 { if ingestor == nil { return 0 } return float64(ingestor.Stats().ThinCacheMisses) }, )) reg.MustRegister(prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Name: "thin_cache_entries", Help: "Current number of entries in the thin-mode DID cache.", }, func() float64 { if ingestor == nil { return 0 } return float64(ingestor.Stats().ThinCacheCount) }, )) reg.MustRegister(prometheus.NewCounterFunc( prometheus.CounterOpts{ Name: "thin_cache_evictions_total", Help: "Total entries evicted from thin-mode DID cache.", }, func() float64 { if ingestor == nil { return 0 } return float64(ingestor.Stats().ThinEvictions) }, )) return m } func (m *serverMetrics) Handler() http.Handler { return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{ EnableOpenMetrics: true, }) } func (m *serverMetrics) ObserveCheckpoint(duration time.Duration, sequence uint64) { m.checkpointDuration.Observe(duration.Seconds()) m.checkpointSequence.Set(float64(sequence)) } type serverOption func(*Server) func WithBuildInfo(info BuildInfo) serverOption { return func(s *Server) { s.build = info } } type limiterClass int const ( limiterResolve limiterClass = iota limiterProof ) type tokenBucket struct { tokens float64 last time.Time lastSeen time.Time } type endpointPolicy struct { rps float64 burst float64 } type ipRateLimiter struct { mu sync.Mutex buckets map[string]*tokenBucket resolve endpointPolicy proof endpointPolicy lastSweep time.Time } func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter { def := config.Default().RateLimit if cfg.ResolveRPS <= 0 { cfg.ResolveRPS = def.ResolveRPS } if cfg.ResolveBurst <= 0 { cfg.ResolveBurst = def.ResolveBurst } if cfg.ProofRPS <= 0 { cfg.ProofRPS = def.ProofRPS } if cfg.ProofBurst <= 0 { cfg.ProofBurst = def.ProofBurst } return &ipRateLimiter{ buckets: map[string]*tokenBucket{}, resolve: endpointPolicy{ rps: cfg.ResolveRPS, burst: float64(cfg.ResolveBurst), }, proof: endpointPolicy{ rps: cfg.ProofRPS, burst: float64(cfg.ProofBurst), }, lastSweep: time.Now(), } } func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool { now := time.Now() l.mu.Lock() defer l.mu.Unlock() if now.Sub(l.lastSweep) > 2*time.Minute { for key, bucket := range l.buckets { if now.Sub(bucket.lastSeen) > 15*time.Minute { delete(l.buckets, key) } } l.lastSweep = now } policy := l.resolve routeKey := "resolve" if class == limiterProof { policy = l.proof routeKey = "proof" } key := routeKey + "|" + ip b, ok := l.buckets[key] if !ok { l.buckets[key] = &tokenBucket{ tokens: policy.burst - 1, last: now, lastSeen: now, } return true } elapsed := now.Sub(b.last).Seconds() if elapsed > 0 { b.tokens += elapsed * policy.rps if b.tokens > policy.burst { b.tokens = policy.burst } } b.last = now b.lastSeen = now if b.tokens < 1 { return false } b.tokens-- return true } func clientIP(r *http.Request) string { if forwarded := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); forwarded != "" { parts := strings.Split(forwarded, ",") if len(parts) > 0 { if ip := strings.TrimSpace(parts[0]); ip != "" { return ip } } } if realIP := strings.TrimSpace(r.Header.Get("X-Real-IP")); realIP != "" { return realIP } host, _, err := net.SplitHostPort(r.RemoteAddr) if err == nil && host != "" { return host } return r.RemoteAddr } func dirSize(path string) (int64, error) { var total int64 err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error { if err != nil { return err } if d.IsDir() { return nil } info, err := d.Info() if err != nil { return err } total += info.Size() return nil }) return total, err }