diff options
Diffstat (limited to 'internal/api/observability.go')
| -rw-r--r-- | internal/api/observability.go | 48 |
1 files changed, 47 insertions, 1 deletions
diff --git a/internal/api/observability.go b/internal/api/observability.go index ebd7711..4654b28 100644 --- a/internal/api/observability.go +++ b/internal/api/observability.go @@ -8,7 +8,6 @@ import ( "strings" "sync" "time" - "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/ingest" "github.com/Fuwn/plutia/internal/storage" @@ -32,9 +31,11 @@ type serverMetrics struct { func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.Service) *serverMetrics { reg := prometheus.NewRegistry() + var diskMu sync.Mutex var diskCached int64 var diskCachedAt time.Time + m := &serverMetrics{ registry: reg, checkpointDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -47,6 +48,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S Help: "Latest checkpoint sequence generated by this mirror.", }), } + reg.MustRegister(m.checkpointDuration, m.checkpointSequence) reg.MustRegister(prometheus.NewCounterFunc( prometheus.CounterOpts{ @@ -55,9 +57,11 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S }, func() float64 { seq, err := store.GetGlobalSeq() + if err != nil { return 0 } + return float64(seq) }, )) @@ -70,6 +74,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S if ingestor == nil { return 0 } + return ingestor.Stats().IngestOpsPerSec }, )) @@ -82,6 +87,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S if ingestor == nil { return 0 } + return float64(ingestor.Stats().LagOps) }, )) @@ -94,6 +100,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S if ingestor == nil { return 0 } + return float64(ingestor.Stats().VerifyFailures) }, )) @@ -104,14 +111,18 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S }, func() float64 { diskMu.Lock() + defer diskMu.Unlock() + if diskCachedAt.IsZero() || time.Since(diskCachedAt) > 5*time.Second { size, err := dirSize(cfg.DataDir) + if err == nil { diskCached = size diskCachedAt = time.Now() } } + return float64(diskCached) }, )) @@ -124,14 +135,18 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S if ingestor != nil { return float64(ingestor.Stats().DIDCount) } + count := uint64(0) _ = store.ForEachState(func(_ types.StateV1) error { count++ + return nil }) + return float64(count) }, )) + return m } @@ -182,18 +197,23 @@ type ipRateLimiter struct { func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter { def := config.Default().RateLimit + if cfg.ResolveRPS <= 0 { cfg.ResolveRPS = def.ResolveRPS } + if cfg.ResolveBurst <= 0 { cfg.ResolveBurst = def.ResolveBurst } + if cfg.ProofRPS <= 0 { cfg.ProofRPS = def.ProofRPS } + if cfg.ProofBurst <= 0 { cfg.ProofBurst = def.ProofBurst } + return &ipRateLimiter{ buckets: map[string]*tokenBucket{}, resolve: endpointPolicy{ @@ -210,7 +230,9 @@ func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter { func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool { now := time.Now() + l.mu.Lock() + defer l.mu.Unlock() if now.Sub(l.lastSweep) > 2*time.Minute { @@ -219,75 +241,99 @@ func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool { delete(l.buckets, key) } } + l.lastSweep = now } policy := l.resolve routeKey := "resolve" + if class == limiterProof { policy = l.proof routeKey = "proof" } + key := routeKey + "|" + ip b, ok := l.buckets[key] + if !ok { l.buckets[key] = &tokenBucket{ tokens: policy.burst - 1, last: now, lastSeen: now, } + return true } + elapsed := now.Sub(b.last).Seconds() + if elapsed > 0 { b.tokens += elapsed * policy.rps + if b.tokens > policy.burst { b.tokens = policy.burst } } + b.last = now b.lastSeen = now + if b.tokens < 1 { return false } + b.tokens-- + return true } func clientIP(r *http.Request) string { if forwarded := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); forwarded != "" { parts := strings.Split(forwarded, ",") + if len(parts) > 0 { if ip := strings.TrimSpace(parts[0]); ip != "" { return ip } } } + if realIP := strings.TrimSpace(r.Header.Get("X-Real-IP")); realIP != "" { return realIP } + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err == nil && host != "" { return host } + return r.RemoteAddr } func dirSize(path string) (int64, error) { var total int64 + err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error { if err != nil { return err } + if d.IsDir() { return nil } + info, err := d.Info() + if err != nil { return err } + total += info.Size() + return nil }) + return total, err } |