aboutsummaryrefslogtreecommitdiff
path: root/internal/api/observability.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/api/observability.go')
-rw-r--r--internal/api/observability.go48
1 files changed, 47 insertions, 1 deletions
diff --git a/internal/api/observability.go b/internal/api/observability.go
index ebd7711..4654b28 100644
--- a/internal/api/observability.go
+++ b/internal/api/observability.go
@@ -8,7 +8,6 @@ import (
"strings"
"sync"
"time"
-
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/ingest"
"github.com/Fuwn/plutia/internal/storage"
@@ -32,9 +31,11 @@ type serverMetrics struct {
func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.Service) *serverMetrics {
reg := prometheus.NewRegistry()
+
var diskMu sync.Mutex
var diskCached int64
var diskCachedAt time.Time
+
m := &serverMetrics{
registry: reg,
checkpointDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
@@ -47,6 +48,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
Help: "Latest checkpoint sequence generated by this mirror.",
}),
}
+
reg.MustRegister(m.checkpointDuration, m.checkpointSequence)
reg.MustRegister(prometheus.NewCounterFunc(
prometheus.CounterOpts{
@@ -55,9 +57,11 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
},
func() float64 {
seq, err := store.GetGlobalSeq()
+
if err != nil {
return 0
}
+
return float64(seq)
},
))
@@ -70,6 +74,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
if ingestor == nil {
return 0
}
+
return ingestor.Stats().IngestOpsPerSec
},
))
@@ -82,6 +87,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
if ingestor == nil {
return 0
}
+
return float64(ingestor.Stats().LagOps)
},
))
@@ -94,6 +100,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
if ingestor == nil {
return 0
}
+
return float64(ingestor.Stats().VerifyFailures)
},
))
@@ -104,14 +111,18 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
},
func() float64 {
diskMu.Lock()
+
defer diskMu.Unlock()
+
if diskCachedAt.IsZero() || time.Since(diskCachedAt) > 5*time.Second {
size, err := dirSize(cfg.DataDir)
+
if err == nil {
diskCached = size
diskCachedAt = time.Now()
}
}
+
return float64(diskCached)
},
))
@@ -124,14 +135,18 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
if ingestor != nil {
return float64(ingestor.Stats().DIDCount)
}
+
count := uint64(0)
_ = store.ForEachState(func(_ types.StateV1) error {
count++
+
return nil
})
+
return float64(count)
},
))
+
return m
}
@@ -182,18 +197,23 @@ type ipRateLimiter struct {
func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter {
def := config.Default().RateLimit
+
if cfg.ResolveRPS <= 0 {
cfg.ResolveRPS = def.ResolveRPS
}
+
if cfg.ResolveBurst <= 0 {
cfg.ResolveBurst = def.ResolveBurst
}
+
if cfg.ProofRPS <= 0 {
cfg.ProofRPS = def.ProofRPS
}
+
if cfg.ProofBurst <= 0 {
cfg.ProofBurst = def.ProofBurst
}
+
return &ipRateLimiter{
buckets: map[string]*tokenBucket{},
resolve: endpointPolicy{
@@ -210,7 +230,9 @@ func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter {
func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool {
now := time.Now()
+
l.mu.Lock()
+
defer l.mu.Unlock()
if now.Sub(l.lastSweep) > 2*time.Minute {
@@ -219,75 +241,99 @@ func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool {
delete(l.buckets, key)
}
}
+
l.lastSweep = now
}
policy := l.resolve
routeKey := "resolve"
+
if class == limiterProof {
policy = l.proof
routeKey = "proof"
}
+
key := routeKey + "|" + ip
b, ok := l.buckets[key]
+
if !ok {
l.buckets[key] = &tokenBucket{
tokens: policy.burst - 1,
last: now,
lastSeen: now,
}
+
return true
}
+
elapsed := now.Sub(b.last).Seconds()
+
if elapsed > 0 {
b.tokens += elapsed * policy.rps
+
if b.tokens > policy.burst {
b.tokens = policy.burst
}
}
+
b.last = now
b.lastSeen = now
+
if b.tokens < 1 {
return false
}
+
b.tokens--
+
return true
}
func clientIP(r *http.Request) string {
if forwarded := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); forwarded != "" {
parts := strings.Split(forwarded, ",")
+
if len(parts) > 0 {
if ip := strings.TrimSpace(parts[0]); ip != "" {
return ip
}
}
}
+
if realIP := strings.TrimSpace(r.Header.Get("X-Real-IP")); realIP != "" {
return realIP
}
+
host, _, err := net.SplitHostPort(r.RemoteAddr)
+
if err == nil && host != "" {
return host
}
+
return r.RemoteAddr
}
func dirSize(path string) (int64, error) {
var total int64
+
err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error {
if err != nil {
return err
}
+
if d.IsDir() {
return nil
}
+
info, err := d.Info()
+
if err != nil {
return err
}
+
total += info.Size()
+
return nil
})
+
return total, err
}