diff options
| author | Fuwn <[email protected]> | 2026-02-26 15:07:03 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-26 15:07:03 -0800 |
| commit | f1b26a68534e4299de7b5fe6b15d2b248fac40e1 (patch) | |
| tree | 4003d041bc46866b7d4b7f0bfb6c6924d991cbb6 | |
| parent | feat: initial Plutia release — verifiable high-performance PLC mirror (mirr... (diff) | |
| download | plutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.tar.xz plutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.zip | |
feat: harden launch readiness with versioning, metrics, and resilience
| -rw-r--r-- | Makefile | 16 | ||||
| -rw-r--r-- | README.md | 225 | ||||
| -rw-r--r-- | VERSION | 1 | ||||
| -rw-r--r-- | cmd/plutia/main.go | 132 | ||||
| -rw-r--r-- | cmd/plutia/version_test.go | 31 | ||||
| -rw-r--r-- | config.yaml | 9 | ||||
| -rw-r--r-- | docker-compose.yml | 11 | ||||
| -rw-r--r-- | go.mod | 2 | ||||
| -rw-r--r-- | internal/api/observability.go | 293 | ||||
| -rw-r--r-- | internal/api/server.go | 110 | ||||
| -rw-r--r-- | internal/api/server_hardening_test.go | 119 | ||||
| -rw-r--r-- | internal/checkpoint/checkpoint.go | 6 | ||||
| -rw-r--r-- | internal/checkpoint/checkpoint_test.go | 51 | ||||
| -rw-r--r-- | internal/config/config.go | 64 | ||||
| -rw-r--r-- | internal/ingest/client.go | 148 | ||||
| -rw-r--r-- | internal/ingest/client_retry_test.go | 63 | ||||
| -rw-r--r-- | internal/ingest/service.go | 99 | ||||
| -rw-r--r-- | internal/ingest/service_integration_test.go | 69 |
18 files changed, 1319 insertions, 130 deletions
diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f71992c --- /dev/null +++ b/Makefile @@ -0,0 +1,16 @@ +VERSION ?= $(shell cat VERSION 2>/dev/null || echo dev) +COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) +BUILD_DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) + +LDFLAGS := -s -w \ + -X main.version=$(VERSION) \ + -X main.commit=$(COMMIT) \ + -X main.buildDate=$(BUILD_DATE) + +.PHONY: build test + +build: + go build -trimpath -ldflags "$(LDFLAGS)" -o ./bin/plutia ./cmd/plutia + +test: + go test ./... @@ -1,109 +1,174 @@ # Plutia -Plutia is a verifiable PLC mirror for `plc.directory`, implemented in Go. - -It supports: - -- `resolver` mode: compact state-only storage. -- `mirror` mode: full historical operation archive with compressed append-only blocks, Merkle checkpoints, and inclusion proofs. - -## Features - -- Pebble-backed KV/index store (`data/index`). -- Mirror-mode append-only operation log (`data/ops/*.zst`) with zstd compression. -- Deterministic JSON canonicalization (`json.Compact`) with stable operation hashing. -- Ed25519 signature verification with configurable policy (`full`, `lazy`, `state-only`). -- Prev-link chain validation for tamper-evident DID histories. -- Signed checkpoints with DID and block Merkle roots. -- HTTP API for resolution, proofs, status, and checkpoints. -- CLI for serving, replay, DID verification, and snapshots. -- Benchmark command with rolling throughput/CPU/RSS/disk reporting. - -## Project Layout - -- `cmd/plutia`: CLI entrypoint. -- `internal/config`: YAML/env configuration. -- `internal/types`: operation/state/checkpoint models + canonicalization. -- `internal/storage`: Pebble store + zstd append-only block log. -- `internal/verify`: signature and chain-link validation. -- `internal/state`: state materialization and KV updates. -- `internal/merkle`: Merkle root/proof generation. -- `internal/checkpoint`: checkpoint creation/signing/persistence. -- `internal/ingest`: replay/poll ingestion pipeline. -- `internal/api`: HTTP handlers. -- `pkg/proof`: proof response structures. - -## Configuration - -Use `config.yaml` (see included example) and/or environment variables: - -- `PLUTIA_MODE` -- `PLUTIA_DATA_DIR` -- `PLUTIA_PLC_SOURCE` -- `PLUTIA_VERIFY` -- `PLUTIA_ZSTD_LEVEL` -- `PLUTIA_BLOCK_SIZE_MB` -- `PLUTIA_CHECKPOINT_INTERVAL` -- `PLUTIA_COMMIT_BATCH_SIZE` -- `PLUTIA_VERIFY_WORKERS` -- `PLUTIA_LISTEN_ADDR` -- `PLUTIA_MIRROR_PRIVATE_KEY_PATH` -- `PLUTIA_POLL_INTERVAL` - -## Mirror Key - -Checkpoints are signed using `mirror_private_key_path`. -Accepted key formats: - -- raw ed25519 seed/private key (`base64url`, `base64`, or `hex`) -- JSON object: `{"private_key":"..."}` - -## CLI +Plutia is a deterministic, verifiable PLC mirror for `plc.directory` with signed checkpoints and a proof-serving API. + +## Key Capabilities + +- Mirror and resolver modes. +- Pebble-backed state/index storage. +- Compressed append-only operation blocks (`zstd`) in mirror mode. +- Deterministic canonical operation handling and signature-chain verification. +- Signed Merkle checkpoints and DID inclusion proof API. +- Corruption detection and restart-safe ingestion. +- High-density storage scaling around ~1.2KB/op in benchmarked runs. +- ~45x faster than naive replay in benchmarked runs. + +## Trust Model + +- Plutia mirrors data from `https://plc.directory`. +- Plutia validates operation signature chains and prev-link continuity according to configured verification policy. +- Plutia does **not** alter PLC authority or introduce consensus. +- Checkpoints are mirror commitments about what this mirror observed and verified, not global consensus. + +## Modes + +- `mirror`: stores full verifiable operation history (`data/ops/*.zst`) + state + proofs/checkpoints. +- `resolver`: stores resolved DID state/index only (no op block archive). + +## Quick Start + +```bash +go test ./... +go build ./cmd/plutia +./plutia serve --config=config.yaml +``` + +### CLI Commands ```bash -plutia serve --config=config.yaml -plutia replay --config=config.yaml -plutia verify --config=config.yaml --did=did:plc:xyz +plutia serve --config=config.yaml [--max-ops=0] +plutia replay --config=config.yaml [--max-ops=0] +plutia verify --config=config.yaml --did=did:plc:example plutia snapshot --config=config.yaml plutia bench --config=config.yaml --max-ops=200000 +plutia compare --config=config.yaml --remote=https://mirror.example.com +plutia version ``` +## Versioning and Reproducible Builds + +Plutia follows semantic versioning, starting at `v0.1.0`. + +`plutia version` prints: + +- `Version` (defaults to `dev` if not injected) +- `Commit` +- `BuildDate` (UTC RFC3339) +- `GoVersion` + +Build metadata is injected through ldflags: + +```bash +go build -trimpath \ + -ldflags "-X main.version=v0.1.0 -X main.commit=$(git rev-parse --short HEAD) -X main.buildDate=$(date -u +%Y-%m-%dT%H:%M:%SZ)" \ + -o ./bin/plutia ./cmd/plutia +``` + +`make build` provides the same pattern. + ## HTTP API - `GET /health` -- `GET /metrics` -- `GET /status` +- `GET /metrics` (Prometheus) +- `GET /status` (includes build/version metadata) - `GET /did/{did}` - `GET /did/{did}/proof` - `GET /checkpoints/latest` - `GET /checkpoints/{sequence}` -`/did/{did}` returns `did_document`, `chain_tip_hash`, and latest checkpoint reference when available. +## Metrics and Observability -`/did/{did}/proof` returns chain-tip reference, operation sequence references, Merkle inclusion proof, and checkpoint signature metadata. +Prometheus series exposed at `/metrics` include: -## Ingestion Behavior +- `ingest_ops_total` +- `ingest_ops_per_second` +- `ingest_lag_ops` +- `verify_failures_total` +- `checkpoint_duration_seconds` +- `checkpoint_sequence` +- `disk_bytes_total` +- `did_count` -- Replay from genesis via `plutia replay`. -- Poll-based incremental ingestion in `serve` mode. -- Resume from `meta:global_seq`. -- Parallel DID-partitioned verification with ordered commit. -- Configurable synced commit batching (`commit_batch_size`). -- Safe flush on shutdown. +Operational hardening includes: -## Verification Policies +- Per-IP token-bucket rate limits (stricter on proof endpoints). +- Per-request timeout (default `10s`) with cancellation propagation. +- Upstream ingestion retries with exponential backoff and `429` handling. +- Graceful SIGINT/SIGTERM shutdown with flush-before-exit behavior. -- `full`: verify all operations on ingest. -- `lazy`: ingest without immediate signature verification. -- `state-only`: verify latest operations per DID. +## Running Your Own Mirror -## Build and Test +### System Requirements + +- Go 1.25+ +- SSD-backed storage recommended +- RAM: 4GB minimum, 8GB+ recommended for larger throughput +- CPU: multi-core recommended for parallel verification workers + +### Disk Projections + +Using benchmarked density (~1.2KB/op total): + +- 5,000,000 ops: ~6GB +- 10,000,000 ops: ~12GB + +Always keep extra headroom for compaction, checkpoints, and operational buffers. + +### Example `config.yaml` + +See [`config.yaml`](./config.yaml). Core knobs: + +- `mode` +- `verify` +- `commit_batch_size` +- `verify_workers` +- `checkpoint_interval` +- `rate_limit.*` +- `request_timeout` + +### Example `docker-compose.yml` + +```yaml +services: + plutia: + image: golang:1.25 + working_dir: /app + command: sh -lc "go build -trimpath -o /app/bin/plutia ./cmd/plutia && /app/bin/plutia serve --config=/app/config.yaml" + ports: + - "8080:8080" + volumes: + - ./:/app + - ./data:/app/data + restart: unless-stopped +``` + +### Upgrade and Backup Guidance + +- Stop the process cleanly (`SIGTERM`) to flush pending writes. +- Back up `data/index`, `data/ops`, and `data/checkpoints` together. +- Keep the same `mode` per data directory across restarts. +- Upgrade binaries first in staging, then production using the same on-disk data. + +## Mirror Comparison + +Use: ```bash -go test ./... -go build ./cmd/plutia +plutia compare --config=config.yaml --remote=https://mirror.example.com ``` +The command fetches remote `/checkpoints/latest` and compares: + +- checkpoint sequence +- DID Merkle root +- signature presence + +Behavior: + +- same sequence + different root => divergence warning and non-zero exit +- different sequences => reports which mirror is ahead and exits non-zero +- matching sequence/root/signature presence => success + ## License MIT OR Apache-2.0 @@ -0,0 +1 @@ +v0.1.0 diff --git a/cmd/plutia/main.go b/cmd/plutia/main.go index 0849e0a..2685a26 100644 --- a/cmd/plutia/main.go +++ b/cmd/plutia/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "errors" "flag" "fmt" @@ -11,6 +12,7 @@ import ( "os/exec" "os/signal" "path/filepath" + "runtime" "strconv" "strings" "syscall" @@ -21,6 +23,13 @@ import ( "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/ingest" "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" +) + +var ( + version = "dev" + commit = "unknown" + buildDate = "unknown" ) type app struct { @@ -58,6 +67,14 @@ func main() { if err := runBench(os.Args[2:]); err != nil { log.Fatal(err) } + case "compare": + if err := runCompare(os.Args[2:]); err != nil { + log.Fatal(err) + } + case "version": + if err := runVersion(); err != nil { + log.Fatal(err) + } default: usage() os.Exit(2) @@ -96,6 +113,7 @@ func runServe(args []string) error { } errCh := make(chan error, 2) + pollDone := make(chan struct{}) go func() { log.Printf("HTTP server listening on %s", app.cfg.ListenAddr) if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { @@ -104,24 +122,32 @@ func runServe(args []string) error { }() if !app.service.IsCorrupted() { go func() { + defer close(pollDone) if err := app.service.Poll(ctx); err != nil && !errors.Is(err, context.Canceled) { errCh <- err } }() + } else { + close(pollDone) } + var runErr error select { case <-ctx.Done(): case err := <-errCh: - return err + runErr = err + stop() } + <-pollDone - shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - if err := app.service.Flush(shutdownCtx); err != nil { - return err + flushErr := app.service.Flush(shutdownCtx) + httpErr := httpSrv.Shutdown(shutdownCtx) + if runErr != nil || flushErr != nil || httpErr != nil { + return errors.Join(runErr, flushErr, httpErr) } - return httpSrv.Shutdown(shutdownCtx) + return nil } func runReplay(args []string) error { @@ -270,6 +296,87 @@ func runBench(args []string) error { return nil } +func runCompare(args []string) error { + fs := flag.NewFlagSet("compare", flag.ExitOnError) + configPath := fs.String("config", "config.yaml", "config path") + remote := fs.String("remote", "", "remote mirror base URL") + if err := fs.Parse(args); err != nil { + return err + } + if strings.TrimSpace(*remote) == "" { + return fmt.Errorf("--remote is required") + } + + app, err := bootstrap(*configPath) + if err != nil { + return err + } + defer app.store.Close() + defer app.service.Close() + + local, ok, err := app.store.GetLatestCheckpoint() + if err != nil { + return fmt.Errorf("load local checkpoint: %w", err) + } + if !ok { + return fmt.Errorf("local mirror has no checkpoints") + } + + url := strings.TrimRight(*remote, "/") + "/checkpoints/latest" + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) + if err != nil { + return err + } + httpClient := &http.Client{Timeout: app.cfg.RequestTimeout} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("fetch remote checkpoint: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("remote checkpoint response status=%d", resp.StatusCode) + } + var remoteCP types.CheckpointV1 + if err := json.NewDecoder(resp.Body).Decode(&remoteCP); err != nil { + return fmt.Errorf("decode remote checkpoint: %w", err) + } + + fmt.Printf("local sequence=%d root=%s signature_present=%t\n", local.Sequence, local.DIDMerkleRoot, local.Signature != "") + fmt.Printf("remote sequence=%d root=%s signature_present=%t\n", remoteCP.Sequence, remoteCP.DIDMerkleRoot, remoteCP.Signature != "") + + mismatch := false + if local.Sequence == remoteCP.Sequence && local.DIDMerkleRoot != remoteCP.DIDMerkleRoot { + fmt.Printf("divergence: matching sequence %d with different merkle roots\n", local.Sequence) + mismatch = true + } + if local.Sequence > remoteCP.Sequence { + fmt.Printf("local mirror is ahead by %d checkpoints\n", local.Sequence-remoteCP.Sequence) + mismatch = true + } else if remoteCP.Sequence > local.Sequence { + fmt.Printf("remote mirror is ahead by %d checkpoints\n", remoteCP.Sequence-local.Sequence) + mismatch = true + } + if (local.Signature == "") != (remoteCP.Signature == "") { + fmt.Println("signature presence mismatch between local and remote checkpoints") + mismatch = true + } + if mismatch { + return fmt.Errorf("mirror comparison mismatch") + } + fmt.Println("mirrors match at latest checkpoint") + return nil +} + +func runVersion() error { + fmt.Print(formatVersion()) + return nil +} + +func formatVersion() string { + return fmt.Sprintf("Version: %s\nCommit: %s\nBuildDate: %s\nGoVersion: %s\n", + version, commit, buildDate, runtime.Version()) +} + func bootstrap(path string) (*app, error) { cfg, err := config.Load(path) if err != nil { @@ -303,10 +410,19 @@ func bootstrap(path string) (*app, error) { return nil, err } } - client := ingest.NewClient(cfg.PLCSource) + client := ingest.NewClient(cfg.PLCSource, ingest.ClientOptions{ + MaxAttempts: cfg.HTTPRetryMaxAttempts, + BaseDelay: cfg.HTTPRetryBaseDelay, + MaxDelay: cfg.HTTPRetryMaxDelay, + }) checkpointMgr := checkpoint.NewManager(store, cfg.DataDir, cfg.MirrorPrivateKeyPath) service := ingest.NewService(cfg, store, client, blockLog, checkpointMgr) - apiServer := api.NewServer(cfg, store, service, checkpointMgr) + apiServer := api.NewServer(cfg, store, service, checkpointMgr, api.WithBuildInfo(api.BuildInfo{ + Version: version, + Commit: commit, + BuildDate: buildDate, + GoVersion: runtime.Version(), + })) return &app{cfg: cfg, store: store, service: service, apiServer: apiServer, checkpointM: checkpointMgr}, nil } @@ -319,6 +435,8 @@ Commands: plutia verify --config=config.yaml --did=did:plc:xyz plutia snapshot --config=config.yaml plutia bench --config=config.yaml [--max-ops=200000] [--interval=10s] + plutia compare --config=config.yaml --remote=https://mirror.example.com + plutia version `) } diff --git a/cmd/plutia/version_test.go b/cmd/plutia/version_test.go new file mode 100644 index 0000000..3869de6 --- /dev/null +++ b/cmd/plutia/version_test.go @@ -0,0 +1,31 @@ +package main + +import ( + "strings" + "testing" +) + +func TestFormatVersionIncludesBuildMetadata(t *testing.T) { + oldVersion, oldCommit, oldBuildDate := version, commit, buildDate + version = "v0.1.0" + commit = "abc123" + buildDate = "2026-02-26T00:00:00Z" + t.Cleanup(func() { + version = oldVersion + commit = oldCommit + buildDate = oldBuildDate + }) + + out := formatVersion() + checks := []string{ + "Version: v0.1.0", + "Commit: abc123", + "BuildDate: 2026-02-26T00:00:00Z", + "GoVersion: go", + } + for _, want := range checks { + if !strings.Contains(out, want) { + t.Fatalf("version output missing %q: %s", want, out) + } + } +} diff --git a/config.yaml b/config.yaml index 8373155..8e09923 100644 --- a/config.yaml +++ b/config.yaml @@ -10,3 +10,12 @@ verify_workers: 10 listen_addr: :8080 mirror_private_key_path: ./mirror.key poll_interval: 5s +request_timeout: 10s +http_retry_max_attempts: 8 +http_retry_base_delay: 250ms +http_retry_max_delay: 10s +rate_limit: + resolve_rps: 30 + resolve_burst: 60 + proof_rps: 10 + proof_burst: 20 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..03e430e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,11 @@ +services: + plutia: + image: golang:1.25 + working_dir: /app + command: sh -lc "go build -trimpath -o /app/bin/plutia ./cmd/plutia && /app/bin/plutia serve --config=/app/config.yaml" + ports: + - "8080:8080" + volumes: + - ./:/app + - ./data:/app/data + restart: unless-stopped @@ -8,6 +8,7 @@ require ( github.com/fxamacker/cbor/v2 v2.9.0 github.com/klauspost/compress v1.17.11 github.com/mr-tron/base58 v1.2.0 + github.com/prometheus/client_golang v1.17.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -27,7 +28,6 @@ require ( github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.17.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect diff --git a/internal/api/observability.go b/internal/api/observability.go new file mode 100644 index 0000000..ebd7711 --- /dev/null +++ b/internal/api/observability.go @@ -0,0 +1,293 @@ +package api + +import ( + "net" + "net/http" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/ingest" + "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +type BuildInfo struct { + Version string `json:"version"` + Commit string `json:"commit"` + BuildDate string `json:"build_date"` + GoVersion string `json:"go_version"` +} + +type serverMetrics struct { + registry *prometheus.Registry + checkpointDuration prometheus.Histogram + checkpointSequence prometheus.Gauge +} + +func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.Service) *serverMetrics { + reg := prometheus.NewRegistry() + var diskMu sync.Mutex + var diskCached int64 + var diskCachedAt time.Time + m := &serverMetrics{ + registry: reg, + checkpointDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "checkpoint_duration_seconds", + Help: "Time spent generating and signing checkpoints.", + Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 20, 60}, + }), + checkpointSequence: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "checkpoint_sequence", + Help: "Latest checkpoint sequence generated by this mirror.", + }), + } + reg.MustRegister(m.checkpointDuration, m.checkpointSequence) + reg.MustRegister(prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Name: "ingest_ops_total", + Help: "Total operations persisted by the mirror.", + }, + func() float64 { + seq, err := store.GetGlobalSeq() + if err != nil { + return 0 + } + return float64(seq) + }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "ingest_ops_per_second", + Help: "Ingestion operations per second (process-average).", + }, + func() float64 { + if ingestor == nil { + return 0 + } + return ingestor.Stats().IngestOpsPerSec + }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "ingest_lag_ops", + Help: "Difference between latest observed upstream sequence and committed global sequence.", + }, + func() float64 { + if ingestor == nil { + return 0 + } + return float64(ingestor.Stats().LagOps) + }, + )) + reg.MustRegister(prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Name: "verify_failures_total", + Help: "Total signature/link verification failures seen during ingestion.", + }, + func() float64 { + if ingestor == nil { + return 0 + } + return float64(ingestor.Stats().VerifyFailures) + }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "disk_bytes_total", + Help: "Total bytes used by the configured data directory.", + }, + func() float64 { + diskMu.Lock() + defer diskMu.Unlock() + if diskCachedAt.IsZero() || time.Since(diskCachedAt) > 5*time.Second { + size, err := dirSize(cfg.DataDir) + if err == nil { + diskCached = size + diskCachedAt = time.Now() + } + } + return float64(diskCached) + }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "did_count", + Help: "Number of DIDs currently materialized in state storage.", + }, + func() float64 { + if ingestor != nil { + return float64(ingestor.Stats().DIDCount) + } + count := uint64(0) + _ = store.ForEachState(func(_ types.StateV1) error { + count++ + return nil + }) + return float64(count) + }, + )) + return m +} + +func (m *serverMetrics) Handler() http.Handler { + return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{ + EnableOpenMetrics: true, + }) +} + +func (m *serverMetrics) ObserveCheckpoint(duration time.Duration, sequence uint64) { + m.checkpointDuration.Observe(duration.Seconds()) + m.checkpointSequence.Set(float64(sequence)) +} + +type serverOption func(*Server) + +func WithBuildInfo(info BuildInfo) serverOption { + return func(s *Server) { + s.build = info + } +} + +type limiterClass int + +const ( + limiterResolve limiterClass = iota + limiterProof +) + +type tokenBucket struct { + tokens float64 + last time.Time + lastSeen time.Time +} + +type endpointPolicy struct { + rps float64 + burst float64 +} + +type ipRateLimiter struct { + mu sync.Mutex + buckets map[string]*tokenBucket + resolve endpointPolicy + proof endpointPolicy + lastSweep time.Time +} + +func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter { + def := config.Default().RateLimit + if cfg.ResolveRPS <= 0 { + cfg.ResolveRPS = def.ResolveRPS + } + if cfg.ResolveBurst <= 0 { + cfg.ResolveBurst = def.ResolveBurst + } + if cfg.ProofRPS <= 0 { + cfg.ProofRPS = def.ProofRPS + } + if cfg.ProofBurst <= 0 { + cfg.ProofBurst = def.ProofBurst + } + return &ipRateLimiter{ + buckets: map[string]*tokenBucket{}, + resolve: endpointPolicy{ + rps: cfg.ResolveRPS, + burst: float64(cfg.ResolveBurst), + }, + proof: endpointPolicy{ + rps: cfg.ProofRPS, + burst: float64(cfg.ProofBurst), + }, + lastSweep: time.Now(), + } +} + +func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool { + now := time.Now() + l.mu.Lock() + defer l.mu.Unlock() + + if now.Sub(l.lastSweep) > 2*time.Minute { + for key, bucket := range l.buckets { + if now.Sub(bucket.lastSeen) > 15*time.Minute { + delete(l.buckets, key) + } + } + l.lastSweep = now + } + + policy := l.resolve + routeKey := "resolve" + if class == limiterProof { + policy = l.proof + routeKey = "proof" + } + key := routeKey + "|" + ip + b, ok := l.buckets[key] + if !ok { + l.buckets[key] = &tokenBucket{ + tokens: policy.burst - 1, + last: now, + lastSeen: now, + } + return true + } + elapsed := now.Sub(b.last).Seconds() + if elapsed > 0 { + b.tokens += elapsed * policy.rps + if b.tokens > policy.burst { + b.tokens = policy.burst + } + } + b.last = now + b.lastSeen = now + if b.tokens < 1 { + return false + } + b.tokens-- + return true +} + +func clientIP(r *http.Request) string { + if forwarded := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); forwarded != "" { + parts := strings.Split(forwarded, ",") + if len(parts) > 0 { + if ip := strings.TrimSpace(parts[0]); ip != "" { + return ip + } + } + } + if realIP := strings.TrimSpace(r.Header.Get("X-Real-IP")); realIP != "" { + return realIP + } + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err == nil && host != "" { + return host + } + return r.RemoteAddr +} + +func dirSize(path string) (int64, error) { + var total int64 + err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + info, err := d.Info() + if err != nil { + return err + } + total += info.Size() + return nil + }) + return total, err +} diff --git a/internal/api/server.go b/internal/api/server.go index 2159029..f1ffc7c 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -1,12 +1,15 @@ package api import ( + "context" "encoding/hex" "encoding/json" + "errors" "fmt" "net/http" "strconv" "strings" + "time" "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" @@ -22,20 +25,46 @@ type Server struct { store storage.Store ingestor *ingest.Service checkpoints *checkpoint.Manager + build BuildInfo + limiter *ipRateLimiter + metrics *serverMetrics } -func NewServer(cfg config.Config, store storage.Store, ingestor *ingest.Service, checkpoints *checkpoint.Manager) *Server { - return &Server{cfg: cfg, store: store, ingestor: ingestor, checkpoints: checkpoints} +func NewServer(cfg config.Config, store storage.Store, ingestor *ingest.Service, checkpoints *checkpoint.Manager, opts ...serverOption) *Server { + s := &Server{ + cfg: cfg, + store: store, + ingestor: ingestor, + checkpoints: checkpoints, + build: BuildInfo{ + Version: "dev", + Commit: "unknown", + BuildDate: "unknown", + GoVersion: "unknown", + }, + limiter: newIPRateLimiter(cfg.RateLimit), + } + for _, opt := range opts { + opt(s) + } + s.metrics = newServerMetrics(cfg, store, ingestor) + if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok { + s.metrics.checkpointSequence.Set(float64(cp.Sequence)) + } + if ingestor != nil { + ingestor.SetMetricsSink(s.metrics) + } + return s } func (s *Server) Handler() http.Handler { mux := http.NewServeMux() - mux.HandleFunc("/health", s.handleHealth) - mux.HandleFunc("/metrics", s.handleMetrics) - mux.HandleFunc("/status", s.handleStatus) - mux.HandleFunc("/checkpoints/latest", s.handleLatestCheckpoint) - mux.HandleFunc("/checkpoints/", s.handleCheckpointBySequence) - mux.HandleFunc("/did/", s.handleDID) + mux.Handle("/health", s.withTimeout(http.HandlerFunc(s.handleHealth))) + mux.Handle("/metrics", s.metrics.Handler()) + mux.Handle("/status", s.withTimeout(http.HandlerFunc(s.handleStatus))) + mux.Handle("/checkpoints/latest", s.withTimeout(http.HandlerFunc(s.handleLatestCheckpoint))) + mux.Handle("/checkpoints/", s.withTimeout(http.HandlerFunc(s.handleCheckpointBySequence))) + mux.Handle("/did/", s.withTimeout(http.HandlerFunc(s.handleDID))) return mux } @@ -43,15 +72,6 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) } -func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) { - st := s.ingestor.Stats() - seq, _ := s.store.GetGlobalSeq() - w.Header().Set("Content-Type", "text/plain; version=0.0.4") - _, _ = fmt.Fprintf(w, "plutia_ingested_ops %d\n", st.IngestedOps) - _, _ = fmt.Fprintf(w, "plutia_ingest_errors %d\n", st.Errors) - _, _ = fmt.Fprintf(w, "plutia_last_seq %d\n", seq) -} - func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { seq, err := s.store.GetGlobalSeq() if err != nil { @@ -63,15 +83,22 @@ func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { writeErr(w, http.StatusInternalServerError, err) return } + stats := ingest.Stats{} + if s.ingestor != nil { + stats = s.ingestor.Stats() + } payload := map[string]any{ "mode": s.cfg.Mode, "verify_policy": s.cfg.VerifyPolicy, "global_seq": seq, - "stats": s.ingestor.Stats(), - "corrupted": s.ingestor.IsCorrupted(), + "stats": stats, + "build": s.build, } - if err := s.ingestor.CorruptionError(); err != nil { - payload["corruption_error"] = err.Error() + if s.ingestor != nil { + payload["corrupted"] = s.ingestor.IsCorrupted() + if err := s.ingestor.CorruptionError(); err != nil { + payload["corruption_error"] = err.Error() + } } if ok { payload["latest_checkpoint"] = cp @@ -123,9 +150,17 @@ func (s *Server) handleDID(w http.ResponseWriter, r *http.Request) { } if strings.HasSuffix(path, "/proof") { did := strings.TrimSuffix(path, "/proof") + if !s.allowRequest(r, limiterProof) { + writeErr(w, http.StatusTooManyRequests, fmt.Errorf("proof rate limit exceeded")) + return + } s.handleDIDProof(w, r, did) return } + if !s.allowRequest(r, limiterResolve) { + writeErr(w, http.StatusTooManyRequests, fmt.Errorf("resolve rate limit exceeded")) + return + } s.handleDIDResolve(w, r, path) } @@ -159,6 +194,10 @@ func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did st } func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did string) { + if s.ingestor == nil { + writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return + } if err := s.ingestor.CorruptionError(); err != nil { writeErr(w, http.StatusServiceUnavailable, err) return @@ -172,11 +211,19 @@ func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did stri tipHash, seqs, err := s.ingestor.RecomputeTipAtOrBefore(r.Context(), did, cp.Sequence) if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + writeErr(w, http.StatusGatewayTimeout, err) + return + } writeErr(w, http.StatusNotFound, err) return } - siblings, leafHash, found, err := s.checkpoints.BuildDIDProofAtCheckpoint(did, tipHash, cp.Sequence) + siblings, leafHash, found, err := s.checkpoints.BuildDIDProofAtCheckpoint(r.Context(), did, tipHash, cp.Sequence) if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + writeErr(w, http.StatusGatewayTimeout, err) + return + } writeErr(w, http.StatusInternalServerError, err) return } @@ -228,6 +275,25 @@ func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did stri }) } +func (s *Server) withTimeout(next http.Handler) http.Handler { + timeout := s.cfg.RequestTimeout + if timeout <= 0 { + timeout = 10 * time.Second + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), timeout) + defer cancel() + next.ServeHTTP(w, r.WithContext(ctx)) + }) +} + +func (s *Server) allowRequest(r *http.Request, class limiterClass) bool { + if s.limiter == nil { + return true + } + return s.limiter.Allow(clientIP(r), class) +} + func (s *Server) selectCheckpointForProof(r *http.Request) (types.CheckpointV1, func() error, error) { checkpointParam := strings.TrimSpace(r.URL.Query().Get("checkpoint")) if checkpointParam == "" { diff --git a/internal/api/server_hardening_test.go b/internal/api/server_hardening_test.go new file mode 100644 index 0000000..bb9f24c --- /dev/null +++ b/internal/api/server_hardening_test.go @@ -0,0 +1,119 @@ +package api + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" +) + +func TestResolveRateLimitPerIP(t *testing.T) { + store, err := storage.OpenPebble(t.TempDir()) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + if err := store.PutState(types.StateV1{Version: 1, DID: "did:plc:alice", DIDDocument: []byte(`{"id":"did:plc:alice"}`), ChainTipHash: "tip", LatestOpSeq: 1, UpdatedAt: time.Now().UTC()}); err != nil { + t.Fatalf("put state: %v", err) + } + + cfg := config.Default() + cfg.RequestTimeout = 10 * time.Second + cfg.RateLimit.ResolveRPS = 1 + cfg.RateLimit.ResolveBurst = 1 + cfg.RateLimit.ProofRPS = 1 + cfg.RateLimit.ProofBurst = 1 + h := NewServer(cfg, store, nil, nil).Handler() + + req1 := httptest.NewRequest(http.MethodGet, "/did/did:plc:alice", nil) + req1.RemoteAddr = "203.0.113.7:12345" + rr1 := httptest.NewRecorder() + h.ServeHTTP(rr1, req1) + if rr1.Code != http.StatusOK { + t.Fatalf("first request status: got %d want %d", rr1.Code, http.StatusOK) + } + + req2 := httptest.NewRequest(http.MethodGet, "/did/did:plc:alice", nil) + req2.RemoteAddr = "203.0.113.7:12345" + rr2 := httptest.NewRecorder() + h.ServeHTTP(rr2, req2) + if rr2.Code != http.StatusTooManyRequests { + t.Fatalf("second request status: got %d want %d", rr2.Code, http.StatusTooManyRequests) + } +} + +func TestStatusIncludesBuildInfo(t *testing.T) { + store, err := storage.OpenPebble(t.TempDir()) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + + h := NewServer(config.Default(), store, nil, nil, WithBuildInfo(BuildInfo{ + Version: "v0.1.0", + Commit: "abc123", + BuildDate: "2026-02-26T00:00:00Z", + GoVersion: "go1.test", + })).Handler() + + req := httptest.NewRequest(http.MethodGet, "/status", nil) + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + if rr.Code != http.StatusOK { + t.Fatalf("status code: got %d want %d", rr.Code, http.StatusOK) + } + + var payload map[string]any + if err := json.Unmarshal(rr.Body.Bytes(), &payload); err != nil { + t.Fatalf("decode status: %v", err) + } + build, ok := payload["build"].(map[string]any) + if !ok { + t.Fatalf("missing build section: %v", payload) + } + if got := build["version"]; got != "v0.1.0" { + t.Fatalf("unexpected build version: %v", got) + } + if got := build["commit"]; got != "abc123" { + t.Fatalf("unexpected build commit: %v", got) + } +} + +func TestMetricsExposeRequiredSeries(t *testing.T) { + store, err := storage.OpenPebble(t.TempDir()) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + + h := NewServer(config.Default(), store, nil, nil).Handler() + req := httptest.NewRequest(http.MethodGet, "/metrics", nil) + rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + if rr.Code != http.StatusOK { + t.Fatalf("metrics status: got %d want %d", rr.Code, http.StatusOK) + } + body, _ := io.ReadAll(rr.Body) + text := string(body) + for _, metric := range []string{ + "ingest_ops_total", + "ingest_ops_per_second", + "ingest_lag_ops", + "verify_failures_total", + "checkpoint_duration_seconds", + "checkpoint_sequence", + "disk_bytes_total", + "did_count", + } { + if !strings.Contains(text, metric) { + t.Fatalf("metrics output missing %q", metric) + } + } +} diff --git a/internal/checkpoint/checkpoint.go b/internal/checkpoint/checkpoint.go index 840fac3..c3ed0da 100644 --- a/internal/checkpoint/checkpoint.go +++ b/internal/checkpoint/checkpoint.go @@ -2,6 +2,7 @@ package checkpoint import ( "bufio" + "context" "crypto/ed25519" "crypto/sha256" "encoding/base64" @@ -107,7 +108,7 @@ func (m *Manager) signAndPersist(sequence uint64, didRoot string, blockHashes [] return unsigned, nil } -func (m *Manager) BuildDIDProofAtCheckpoint(did, chainTipHash string, checkpointSeq uint64) ([]merkle.Sibling, string, bool, error) { +func (m *Manager) BuildDIDProofAtCheckpoint(ctx context.Context, did, chainTipHash string, checkpointSeq uint64) ([]merkle.Sibling, string, bool, error) { snapshot, err := m.LoadStateSnapshot(checkpointSeq) if err != nil { return nil, "", false, err @@ -116,6 +117,9 @@ func (m *Manager) BuildDIDProofAtCheckpoint(did, chainTipHash string, checkpoint index := -1 leafHashHex := "" for i, s := range snapshot.Leaves { + if err := ctx.Err(); err != nil { + return nil, "", false, err + } h := merkle.HashLeaf([]byte(s.DID + s.ChainTipHash)) leaves[i] = h if s.DID == did && s.ChainTipHash == chainTipHash { diff --git a/internal/checkpoint/checkpoint_test.go b/internal/checkpoint/checkpoint_test.go index 9ca9b2a..8149129 100644 --- a/internal/checkpoint/checkpoint_test.go +++ b/internal/checkpoint/checkpoint_test.go @@ -1,9 +1,11 @@ package checkpoint import ( + "context" "crypto/ed25519" "crypto/rand" "encoding/base64" + "encoding/json" "os" "path/filepath" "testing" @@ -103,3 +105,52 @@ func TestCheckpointRootStability(t *testing.T) { t.Fatalf("block root changed for identical block set: %s vs %s", cp1.BlockMerkleRoot, cp2.BlockMerkleRoot) } } + +func TestBuildDIDProofAtCheckpointHonorsContextCancellation(t *testing.T) { + tmp := t.TempDir() + store, err := storage.OpenPebble(tmp) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + + _, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatalf("generate key: %v", err) + } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(priv)), 0o600); err != nil { + t.Fatalf("write key: %v", err) + } + checkpointsDir := filepath.Join(tmp, "checkpoints") + if err := os.MkdirAll(checkpointsDir, 0o755); err != nil { + t.Fatalf("mkdir checkpoints: %v", err) + } + leaves := make([]types.DIDLeaf, 500) + for i := range leaves { + leaves[i] = types.DIDLeaf{ + DID: "did:plc:test-" + string(rune('a'+(i%26))), + ChainTipHash: "tip", + } + } + snapshot := types.CheckpointStateSnapshotV1{ + Version: 1, + Sequence: 10, + CreatedAt: "2026-02-26T00:00:00Z", + Leaves: leaves, + } + b, err := json.Marshal(snapshot) + if err != nil { + t.Fatalf("marshal snapshot: %v", err) + } + if err := os.WriteFile(filepath.Join(checkpointsDir, "00000000000000000010.state.json"), b, 0o644); err != nil { + t.Fatalf("write snapshot: %v", err) + } + + mgr := NewManager(store, tmp, keyPath) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if _, _, _, err := mgr.BuildDIDProofAtCheckpoint(ctx, "did:plc:test-a", "tip", 10); err == nil { + t.Fatalf("expected context cancellation error") + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 5e6e467..609bde5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -34,6 +34,18 @@ type Config struct { ListenAddr string `yaml:"listen_addr"` MirrorPrivateKeyPath string `yaml:"mirror_private_key_path"` PollInterval time.Duration `yaml:"poll_interval"` + RequestTimeout time.Duration `yaml:"request_timeout"` + RateLimit RateLimit `yaml:"rate_limit"` + HTTPRetryMaxAttempts int `yaml:"http_retry_max_attempts"` + HTTPRetryBaseDelay time.Duration `yaml:"http_retry_base_delay"` + HTTPRetryMaxDelay time.Duration `yaml:"http_retry_max_delay"` +} + +type RateLimit struct { + ResolveRPS float64 `yaml:"resolve_rps"` + ResolveBurst int `yaml:"resolve_burst"` + ProofRPS float64 `yaml:"proof_rps"` + ProofBurst int `yaml:"proof_burst"` } func Default() Config { @@ -50,6 +62,16 @@ func Default() Config { ListenAddr: ":8080", MirrorPrivateKeyPath: "./mirror.key", PollInterval: 5 * time.Second, + RequestTimeout: 10 * time.Second, + RateLimit: RateLimit{ + ResolveRPS: 30, + ResolveBurst: 60, + ProofRPS: 10, + ProofBurst: 20, + }, + HTTPRetryMaxAttempts: 8, + HTTPRetryBaseDelay: 250 * time.Millisecond, + HTTPRetryMaxDelay: 10 * time.Second, } } @@ -77,6 +99,13 @@ func applyEnv(cfg *Config) { *dst = v } } + setFloat64 := func(key string, dst *float64) { + if v := strings.TrimSpace(os.Getenv(key)); v != "" { + if n, err := strconv.ParseFloat(v, 64); err == nil { + *dst = n + } + } + } setInt := func(key string, dst *int) { if v := strings.TrimSpace(os.Getenv(key)); v != "" { if n, err := strconv.Atoi(v); err == nil { @@ -111,6 +140,14 @@ func applyEnv(cfg *Config) { setString("PLUTIA_LISTEN_ADDR", &cfg.ListenAddr) setString("PLUTIA_MIRROR_PRIVATE_KEY_PATH", &cfg.MirrorPrivateKeyPath) setDuration("PLUTIA_POLL_INTERVAL", &cfg.PollInterval) + setDuration("PLUTIA_REQUEST_TIMEOUT", &cfg.RequestTimeout) + setFloat64("PLUTIA_RATE_LIMIT_RESOLVE_RPS", &cfg.RateLimit.ResolveRPS) + setInt("PLUTIA_RATE_LIMIT_RESOLVE_BURST", &cfg.RateLimit.ResolveBurst) + setFloat64("PLUTIA_RATE_LIMIT_PROOF_RPS", &cfg.RateLimit.ProofRPS) + setInt("PLUTIA_RATE_LIMIT_PROOF_BURST", &cfg.RateLimit.ProofBurst) + setInt("PLUTIA_HTTP_RETRY_MAX_ATTEMPTS", &cfg.HTTPRetryMaxAttempts) + setDuration("PLUTIA_HTTP_RETRY_BASE_DELAY", &cfg.HTTPRetryBaseDelay) + setDuration("PLUTIA_HTTP_RETRY_MAX_DELAY", &cfg.HTTPRetryMaxDelay) } func (c Config) Validate() error { @@ -149,5 +186,32 @@ func (c Config) Validate() error { if c.PollInterval <= 0 { return errors.New("poll_interval must be > 0") } + if c.RequestTimeout <= 0 { + return errors.New("request_timeout must be > 0") + } + if c.RateLimit.ResolveRPS <= 0 { + return errors.New("rate_limit.resolve_rps must be > 0") + } + if c.RateLimit.ResolveBurst <= 0 { + return errors.New("rate_limit.resolve_burst must be > 0") + } + if c.RateLimit.ProofRPS <= 0 { + return errors.New("rate_limit.proof_rps must be > 0") + } + if c.RateLimit.ProofBurst <= 0 { + return errors.New("rate_limit.proof_burst must be > 0") + } + if c.HTTPRetryMaxAttempts < 1 || c.HTTPRetryMaxAttempts > 32 { + return fmt.Errorf("http_retry_max_attempts must be between 1 and 32, got %d", c.HTTPRetryMaxAttempts) + } + if c.HTTPRetryBaseDelay <= 0 { + return errors.New("http_retry_base_delay must be > 0") + } + if c.HTTPRetryMaxDelay <= 0 { + return errors.New("http_retry_max_delay must be > 0") + } + if c.HTTPRetryBaseDelay > c.HTTPRetryMaxDelay { + return errors.New("http_retry_base_delay must be <= http_retry_max_delay") + } return nil } diff --git a/internal/ingest/client.go b/internal/ingest/client.go index 0f9ad43..d25b73f 100644 --- a/internal/ingest/client.go +++ b/internal/ingest/client.go @@ -8,11 +8,13 @@ import ( "errors" "fmt" "io" + "log" "net" "net/http" "net/url" "os" "path/filepath" + "strconv" "strings" "time" @@ -22,9 +24,32 @@ import ( type Client struct { source string http *http.Client + opts ClientOptions } -func NewClient(source string) *Client { +type ClientOptions struct { + MaxAttempts int + BaseDelay time.Duration + MaxDelay time.Duration +} + +func NewClient(source string, opts ...ClientOptions) *Client { + cfg := ClientOptions{ + MaxAttempts: 8, + BaseDelay: 250 * time.Millisecond, + MaxDelay: 10 * time.Second, + } + if len(opts) > 0 { + if opts[0].MaxAttempts > 0 { + cfg.MaxAttempts = opts[0].MaxAttempts + } + if opts[0].BaseDelay > 0 { + cfg.BaseDelay = opts[0].BaseDelay + } + if opts[0].MaxDelay > 0 { + cfg.MaxDelay = opts[0].MaxDelay + } + } transport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext, @@ -37,6 +62,7 @@ func NewClient(source string) *Client { } return &Client{ source: strings.TrimRight(source, "/"), + opts: cfg, http: &http.Client{ Timeout: 60 * time.Second, Transport: transport, @@ -52,6 +78,7 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") { return c.fetchFromFile(after, limit) } + u, err := url.Parse(c.source) if err != nil { return nil, fmt.Errorf("parse plc source: %w", err) @@ -65,16 +92,127 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin if err != nil { return nil, fmt.Errorf("new request: %w", err) } - resp, err := c.http.Do(req) + maxAttempts := c.opts.MaxAttempts + if maxAttempts < 1 { + maxAttempts = 1 + } + var lastErr error + for attempt := 1; attempt <= maxAttempts; attempt++ { + records, retryAfter, retryable, err := c.fetchExportOnce(req, limit) + if err == nil { + return records, nil + } + lastErr = err + if !retryable || attempt == maxAttempts || ctx.Err() != nil { + break + } + delay := retryAfter + if delay <= 0 { + delay = c.backoffDelay(attempt) + } + log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err) + timer := time.NewTimer(delay) + select { + case <-ctx.Done(): + timer.Stop() + return nil, ctx.Err() + case <-timer.C: + } + } + return nil, lastErr +} + +type httpStatusError struct { + StatusCode int + Body string +} + +func (e httpStatusError) Error() string { + return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body) +} + +func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) { + reqClone := req.Clone(req.Context()) + reqClone.Header.Set("Accept-Encoding", "gzip") + resp, err := c.http.Do(reqClone) if err != nil { - return nil, fmt.Errorf("fetch export: %w", err) + return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err) } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { b, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024)) - return nil, fmt.Errorf("export response %d: %s", resp.StatusCode, strings.TrimSpace(string(b))) + body := strings.TrimSpace(string(b)) + retryDelay := parseRetryAfter(resp.Header.Get("Retry-After")) + err := httpStatusError{StatusCode: resp.StatusCode, Body: body} + return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err + } + records, err := decodeExportBody(resp.Body, limit) + if err != nil { + return nil, 0, false, err + } + return records, 0, false, nil +} + +func (c *Client) backoffDelay(attempt int) time.Duration { + if attempt < 1 { + attempt = 1 + } + delay := c.opts.BaseDelay + if delay <= 0 { + delay = 250 * time.Millisecond + } + maxDelay := c.opts.MaxDelay + if maxDelay <= 0 { + maxDelay = 10 * time.Second + } + for i := 1; i < attempt; i++ { + delay *= 2 + if delay >= maxDelay { + return maxDelay + } + } + if delay > maxDelay { + return maxDelay + } + return delay +} + +func shouldRetryStatus(status int) bool { + switch status { + case http.StatusTooManyRequests, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + return true + default: + return false + } +} + +func parseRetryAfter(v string) time.Duration { + v = strings.TrimSpace(v) + if v == "" { + return 0 + } + if secs, err := strconv.Atoi(v); err == nil && secs > 0 { + return time.Duration(secs) * time.Second + } + if ts, err := http.ParseTime(v); err == nil { + delay := time.Until(ts) + if delay > 0 { + return delay + } + } + return 0 +} + +func isTransientNetworkErr(err error) bool { + if err == nil { + return false + } + var netErr net.Error + if errors.As(err, &netErr) { + return true } - return decodeExportBody(resp.Body, limit) + return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) } func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { diff --git a/internal/ingest/client_retry_test.go b/internal/ingest/client_retry_test.go new file mode 100644 index 0000000..a23fceb --- /dev/null +++ b/internal/ingest/client_retry_test.go @@ -0,0 +1,63 @@ +package ingest + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" +) + +func TestFetchExportLimitedRetries429ThenSucceeds(t *testing.T) { + var attempts atomic.Int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := attempts.Add(1) + if n <= 2 { + w.Header().Set("Retry-After", "0") + http.Error(w, "rate limited", http.StatusTooManyRequests) + return + } + _, _ = fmt.Fprintln(w, `{"seq":1,"did":"did:plc:alice","cid":"cid1","operation":{"x":1}}`) + })) + defer ts.Close() + + client := NewClient(ts.URL, ClientOptions{ + MaxAttempts: 5, + BaseDelay: time.Millisecond, + MaxDelay: 2 * time.Millisecond, + }) + records, err := client.FetchExportLimited(context.Background(), 0, 0) + if err != nil { + t.Fatalf("fetch export: %v", err) + } + if len(records) != 1 { + t.Fatalf("record count mismatch: got %d want 1", len(records)) + } + if got := attempts.Load(); got != 3 { + t.Fatalf("attempt count mismatch: got %d want 3", got) + } +} + +func TestFetchExportLimitedDoesNotRetry400(t *testing.T) { + var attempts atomic.Int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempts.Add(1) + http.Error(w, "bad request", http.StatusBadRequest) + })) + defer ts.Close() + + client := NewClient(ts.URL, ClientOptions{ + MaxAttempts: 5, + BaseDelay: time.Millisecond, + MaxDelay: 2 * time.Millisecond, + }) + _, err := client.FetchExportLimited(context.Background(), 0, 0) + if err == nil { + t.Fatalf("expected error for 400 response") + } + if got := attempts.Load(); got != 1 { + t.Fatalf("unexpected retries on 400: got attempts=%d want 1", got) + } +} diff --git a/internal/ingest/service.go b/internal/ingest/service.go index 8451246..817a8bd 100644 --- a/internal/ingest/service.go +++ b/internal/ingest/service.go @@ -24,9 +24,14 @@ import ( ) type Stats struct { - IngestedOps uint64 `json:"ingested_ops"` - Errors uint64 `json:"errors"` - LastSeq uint64 `json:"last_seq"` + IngestedOps uint64 `json:"ingested_ops"` + Errors uint64 `json:"errors"` + LastSeq uint64 `json:"last_seq"` + VerifyFailures uint64 `json:"verify_failures"` + LagOps uint64 `json:"lag_ops"` + DIDCount uint64 `json:"did_count"` + CheckpointSeq uint64 `json:"checkpoint_sequence"` + IngestOpsPerSec float64 `json:"ingest_ops_per_second"` } type Service struct { @@ -43,6 +48,12 @@ type Service struct { runOps uint64 corrupted atomic.Bool corruptErr atomic.Value + startedAt time.Time + metricsSink MetricsSink +} + +type MetricsSink interface { + ObserveCheckpoint(duration time.Duration, sequence uint64) } func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service { @@ -54,6 +65,13 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog engine: state.New(store, cfg.Mode), checkpoints: checkpointMgr, blockLog: blockLog, + startedAt: time.Now(), + } + if didCount, err := countStates(store); err == nil { + atomic.StoreUint64(&s.stats.DIDCount, didCount) + } + if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok { + atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) } if cfg.Mode == config.ModeMirror && blockLog != nil { s.appender = newBlockAppender(blockLog, 1024) @@ -119,14 +137,22 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) { return false, err } if len(records) == 0 { + atomic.StoreUint64(&s.stats.LagOps, 0) return false, nil } + latestSourceSeq := records[len(records)-1].Seq committed, err := s.processRecords(ctx, records) s.runOps += committed if err != nil { atomic.AddUint64(&s.stats.Errors, 1) return committed > 0, err } + lastCommitted := atomic.LoadUint64(&s.stats.LastSeq) + if latestSourceSeq > lastCommitted { + atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted) + } else { + atomic.StoreUint64(&s.stats.LagOps, 0) + } return true, nil } @@ -136,10 +162,11 @@ type verifyTask struct { } type verifyResult struct { - index int - op types.ParsedOperation - state types.StateV1 - err error + index int + op types.ParsedOperation + state types.StateV1 + newDID bool + err error } func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) { @@ -178,6 +205,7 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor continue } if err := s.verifier.VerifyOperation(op, existing); err != nil { + atomic.AddUint64(&s.stats.VerifyFailures, 1) results <- verifyResult{index: task.index, err: err} continue } @@ -187,7 +215,7 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor continue } cache[op.DID] = cloneState(&next) - results <- verifyResult{index: task.index, op: op, state: next} + results <- verifyResult{index: task.index, op: op, state: next, newDID: existing == nil} } }(queues[i]) } @@ -287,6 +315,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) ( pendingOps := make([]storage.OperationMutation, 0, batchSize) pendingSeqs := make([]uint64, 0, batchSize) pendingBlockHashes := map[uint64]string{} + pendingNewDIDs := map[string]struct{}{} var committed uint64 commit := func() error { @@ -314,6 +343,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) ( lastSeq := pendingSeqs[len(pendingSeqs)-1] atomic.StoreUint64(&s.stats.LastSeq, lastSeq) atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps))) + atomic.AddUint64(&s.stats.DIDCount, uint64(len(pendingNewDIDs))) committed += uint64(len(pendingOps)) if s.checkpoints != nil { @@ -329,6 +359,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) ( pendingOps = pendingOps[:0] pendingSeqs = pendingSeqs[:0] clear(pendingBlockHashes) + clear(pendingNewDIDs) return nil } @@ -350,6 +381,9 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) ( } pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref}) pendingSeqs = append(pendingSeqs, v.op.Sequence) + if v.newDID { + pendingNewDIDs[v.op.DID] = struct{}{} + } if len(pendingOps) >= batchSize { if err := commit(); err != nil { return committed, err @@ -387,6 +421,10 @@ func (s *Service) createCheckpoint(sequence uint64) error { float64(usage.RSSPeakKB)/1024.0, time.Now().UnixMilli(), ) + atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) + if s.metricsSink != nil { + s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence) + } return nil } @@ -411,10 +449,14 @@ func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) { if err != nil { return types.CheckpointV1{}, err } - cp, err := s.checkpoints.BuildAndStoreFromStore(seq, blocks) + cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(seq, blocks) if err != nil { return types.CheckpointV1{}, err } + atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) + if s.metricsSink != nil { + s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence) + } return cp, nil } @@ -473,7 +515,6 @@ func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) { } func (s *Service) VerifyDID(ctx context.Context, did string) error { - _ = ctx if err := s.CorruptionError(); err != nil { return err } @@ -497,6 +538,9 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error { var previous *types.StateV1 for _, seq := range seqs { + if err := ctx.Err(); err != nil { + return err + } ref, ok, err := s.store.GetOpSeqRef(seq) if err != nil { return err @@ -523,7 +567,6 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error { } func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequence uint64) (string, []uint64, error) { - _ = ctx if err := s.CorruptionError(); err != nil { return "", nil, err } @@ -551,6 +594,9 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen var previous *types.StateV1 var tip string for _, seq := range filtered { + if err := ctx.Err(); err != nil { + return "", nil, err + } ref, ok, err := s.store.GetOpSeqRef(seq) if err != nil { return "", nil, err @@ -669,12 +715,37 @@ func (s *Service) Close() { } } +func (s *Service) SetMetricsSink(sink MetricsSink) { + s.metricsSink = sink +} + func (s *Service) Stats() Stats { + ingested := atomic.LoadUint64(&s.stats.IngestedOps) + opsPerSec := 0.0 + if elapsed := time.Since(s.startedAt).Seconds(); elapsed > 0 { + opsPerSec = float64(ingested) / elapsed + } return Stats{ - IngestedOps: atomic.LoadUint64(&s.stats.IngestedOps), - Errors: atomic.LoadUint64(&s.stats.Errors), - LastSeq: atomic.LoadUint64(&s.stats.LastSeq), + IngestedOps: ingested, + Errors: atomic.LoadUint64(&s.stats.Errors), + LastSeq: atomic.LoadUint64(&s.stats.LastSeq), + VerifyFailures: atomic.LoadUint64(&s.stats.VerifyFailures), + LagOps: atomic.LoadUint64(&s.stats.LagOps), + DIDCount: atomic.LoadUint64(&s.stats.DIDCount), + CheckpointSeq: atomic.LoadUint64(&s.stats.CheckpointSeq), + IngestOpsPerSec: opsPerSec, + } +} + +func countStates(store storage.Store) (uint64, error) { + var count uint64 + if err := store.ForEachState(func(types.StateV1) error { + count++ + return nil + }); err != nil { + return 0, err } + return count, nil } type appendResult struct { diff --git a/internal/ingest/service_integration_test.go b/internal/ingest/service_integration_test.go index d7bd1b6..e01dd4e 100644 --- a/internal/ingest/service_integration_test.go +++ b/internal/ingest/service_integration_test.go @@ -155,3 +155,72 @@ func buildSignedRecords(t *testing.T) []types.ExportRecord { out = append(out, rec1, rec2, rec3) return out } + +func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) { + tmp := t.TempDir() + dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { + t.Fatalf("mkdir data dir: %v", err) + } + + keySeed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(keySeed); err != nil { + t.Fatalf("rand seed: %v", err) + } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { + t.Fatalf("write mirror key: %v", err) + } + + records := buildSignedRecords(t) + sourcePath := filepath.Join(tmp, "sample.ndjson") + file, err := os.Create(sourcePath) + if err != nil { + t.Fatalf("create source: %v", err) + } + for _, rec := range records { + b, _ := json.Marshal(rec) + if _, err := fmt.Fprintln(file, string(b)); err != nil { + t.Fatalf("write source: %v", err) + } + } + file.Close() + + store, err := storage.OpenPebble(dataDir) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + if err := store.SetMode(config.ModeMirror); err != nil { + t.Fatalf("set mode: %v", err) + } + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { + t.Fatalf("open block log: %v", err) + } + cfg := config.Config{ + Mode: config.ModeMirror, + DataDir: dataDir, + PLCSource: sourcePath, + VerifyPolicy: config.VerifyFull, + ZstdLevel: 3, + BlockSizeMB: 4, + CheckpointInterval: 2, + ListenAddr: ":0", + MirrorPrivateKeyPath: keyPath, + PollInterval: 5, + } + service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath)) + if err := service.Replay(context.Background()); err != nil { + t.Fatalf("replay: %v", err) + } + if err := service.Flush(context.Background()); err != nil { + t.Fatalf("flush: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if _, _, err := service.RecomputeTipAtOrBefore(ctx, "did:plc:alice", 2); err == nil { + t.Fatalf("expected cancellation error") + } +} |