aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-26 15:07:03 -0800
committerFuwn <[email protected]>2026-02-26 15:07:03 -0800
commitf1b26a68534e4299de7b5fe6b15d2b248fac40e1 (patch)
tree4003d041bc46866b7d4b7f0bfb6c6924d991cbb6
parentfeat: initial Plutia release — verifiable high-performance PLC mirror (mirr... (diff)
downloadplutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.tar.xz
plutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.zip
feat: harden launch readiness with versioning, metrics, and resilience
-rw-r--r--Makefile16
-rw-r--r--README.md225
-rw-r--r--VERSION1
-rw-r--r--cmd/plutia/main.go132
-rw-r--r--cmd/plutia/version_test.go31
-rw-r--r--config.yaml9
-rw-r--r--docker-compose.yml11
-rw-r--r--go.mod2
-rw-r--r--internal/api/observability.go293
-rw-r--r--internal/api/server.go110
-rw-r--r--internal/api/server_hardening_test.go119
-rw-r--r--internal/checkpoint/checkpoint.go6
-rw-r--r--internal/checkpoint/checkpoint_test.go51
-rw-r--r--internal/config/config.go64
-rw-r--r--internal/ingest/client.go148
-rw-r--r--internal/ingest/client_retry_test.go63
-rw-r--r--internal/ingest/service.go99
-rw-r--r--internal/ingest/service_integration_test.go69
18 files changed, 1319 insertions, 130 deletions
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..f71992c
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,16 @@
+VERSION ?= $(shell cat VERSION 2>/dev/null || echo dev)
+COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
+BUILD_DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
+
+LDFLAGS := -s -w \
+ -X main.version=$(VERSION) \
+ -X main.commit=$(COMMIT) \
+ -X main.buildDate=$(BUILD_DATE)
+
+.PHONY: build test
+
+build:
+ go build -trimpath -ldflags "$(LDFLAGS)" -o ./bin/plutia ./cmd/plutia
+
+test:
+ go test ./...
diff --git a/README.md b/README.md
index 2a4f6f1..ef52544 100644
--- a/README.md
+++ b/README.md
@@ -1,109 +1,174 @@
# Plutia
-Plutia is a verifiable PLC mirror for `plc.directory`, implemented in Go.
-
-It supports:
-
-- `resolver` mode: compact state-only storage.
-- `mirror` mode: full historical operation archive with compressed append-only blocks, Merkle checkpoints, and inclusion proofs.
-
-## Features
-
-- Pebble-backed KV/index store (`data/index`).
-- Mirror-mode append-only operation log (`data/ops/*.zst`) with zstd compression.
-- Deterministic JSON canonicalization (`json.Compact`) with stable operation hashing.
-- Ed25519 signature verification with configurable policy (`full`, `lazy`, `state-only`).
-- Prev-link chain validation for tamper-evident DID histories.
-- Signed checkpoints with DID and block Merkle roots.
-- HTTP API for resolution, proofs, status, and checkpoints.
-- CLI for serving, replay, DID verification, and snapshots.
-- Benchmark command with rolling throughput/CPU/RSS/disk reporting.
-
-## Project Layout
-
-- `cmd/plutia`: CLI entrypoint.
-- `internal/config`: YAML/env configuration.
-- `internal/types`: operation/state/checkpoint models + canonicalization.
-- `internal/storage`: Pebble store + zstd append-only block log.
-- `internal/verify`: signature and chain-link validation.
-- `internal/state`: state materialization and KV updates.
-- `internal/merkle`: Merkle root/proof generation.
-- `internal/checkpoint`: checkpoint creation/signing/persistence.
-- `internal/ingest`: replay/poll ingestion pipeline.
-- `internal/api`: HTTP handlers.
-- `pkg/proof`: proof response structures.
-
-## Configuration
-
-Use `config.yaml` (see included example) and/or environment variables:
-
-- `PLUTIA_MODE`
-- `PLUTIA_DATA_DIR`
-- `PLUTIA_PLC_SOURCE`
-- `PLUTIA_VERIFY`
-- `PLUTIA_ZSTD_LEVEL`
-- `PLUTIA_BLOCK_SIZE_MB`
-- `PLUTIA_CHECKPOINT_INTERVAL`
-- `PLUTIA_COMMIT_BATCH_SIZE`
-- `PLUTIA_VERIFY_WORKERS`
-- `PLUTIA_LISTEN_ADDR`
-- `PLUTIA_MIRROR_PRIVATE_KEY_PATH`
-- `PLUTIA_POLL_INTERVAL`
-
-## Mirror Key
-
-Checkpoints are signed using `mirror_private_key_path`.
-Accepted key formats:
-
-- raw ed25519 seed/private key (`base64url`, `base64`, or `hex`)
-- JSON object: `{"private_key":"..."}`
-
-## CLI
+Plutia is a deterministic, verifiable PLC mirror for `plc.directory` with signed checkpoints and a proof-serving API.
+
+## Key Capabilities
+
+- Mirror and resolver modes.
+- Pebble-backed state/index storage.
+- Compressed append-only operation blocks (`zstd`) in mirror mode.
+- Deterministic canonical operation handling and signature-chain verification.
+- Signed Merkle checkpoints and DID inclusion proof API.
+- Corruption detection and restart-safe ingestion.
+- High-density storage scaling around ~1.2KB/op in benchmarked runs.
+- ~45x faster than naive replay in benchmarked runs.
+
+## Trust Model
+
+- Plutia mirrors data from `https://plc.directory`.
+- Plutia validates operation signature chains and prev-link continuity according to configured verification policy.
+- Plutia does **not** alter PLC authority or introduce consensus.
+- Checkpoints are mirror commitments about what this mirror observed and verified, not global consensus.
+
+## Modes
+
+- `mirror`: stores full verifiable operation history (`data/ops/*.zst`) + state + proofs/checkpoints.
+- `resolver`: stores resolved DID state/index only (no op block archive).
+
+## Quick Start
+
+```bash
+go test ./...
+go build ./cmd/plutia
+./plutia serve --config=config.yaml
+```
+
+### CLI Commands
```bash
-plutia serve --config=config.yaml
-plutia replay --config=config.yaml
-plutia verify --config=config.yaml --did=did:plc:xyz
+plutia serve --config=config.yaml [--max-ops=0]
+plutia replay --config=config.yaml [--max-ops=0]
+plutia verify --config=config.yaml --did=did:plc:example
plutia snapshot --config=config.yaml
plutia bench --config=config.yaml --max-ops=200000
+plutia compare --config=config.yaml --remote=https://mirror.example.com
+plutia version
```
+## Versioning and Reproducible Builds
+
+Plutia follows semantic versioning, starting at `v0.1.0`.
+
+`plutia version` prints:
+
+- `Version` (defaults to `dev` if not injected)
+- `Commit`
+- `BuildDate` (UTC RFC3339)
+- `GoVersion`
+
+Build metadata is injected through ldflags:
+
+```bash
+go build -trimpath \
+ -ldflags "-X main.version=v0.1.0 -X main.commit=$(git rev-parse --short HEAD) -X main.buildDate=$(date -u +%Y-%m-%dT%H:%M:%SZ)" \
+ -o ./bin/plutia ./cmd/plutia
+```
+
+`make build` provides the same pattern.
+
## HTTP API
- `GET /health`
-- `GET /metrics`
-- `GET /status`
+- `GET /metrics` (Prometheus)
+- `GET /status` (includes build/version metadata)
- `GET /did/{did}`
- `GET /did/{did}/proof`
- `GET /checkpoints/latest`
- `GET /checkpoints/{sequence}`
-`/did/{did}` returns `did_document`, `chain_tip_hash`, and latest checkpoint reference when available.
+## Metrics and Observability
-`/did/{did}/proof` returns chain-tip reference, operation sequence references, Merkle inclusion proof, and checkpoint signature metadata.
+Prometheus series exposed at `/metrics` include:
-## Ingestion Behavior
+- `ingest_ops_total`
+- `ingest_ops_per_second`
+- `ingest_lag_ops`
+- `verify_failures_total`
+- `checkpoint_duration_seconds`
+- `checkpoint_sequence`
+- `disk_bytes_total`
+- `did_count`
-- Replay from genesis via `plutia replay`.
-- Poll-based incremental ingestion in `serve` mode.
-- Resume from `meta:global_seq`.
-- Parallel DID-partitioned verification with ordered commit.
-- Configurable synced commit batching (`commit_batch_size`).
-- Safe flush on shutdown.
+Operational hardening includes:
-## Verification Policies
+- Per-IP token-bucket rate limits (stricter on proof endpoints).
+- Per-request timeout (default `10s`) with cancellation propagation.
+- Upstream ingestion retries with exponential backoff and `429` handling.
+- Graceful SIGINT/SIGTERM shutdown with flush-before-exit behavior.
-- `full`: verify all operations on ingest.
-- `lazy`: ingest without immediate signature verification.
-- `state-only`: verify latest operations per DID.
+## Running Your Own Mirror
-## Build and Test
+### System Requirements
+
+- Go 1.25+
+- SSD-backed storage recommended
+- RAM: 4GB minimum, 8GB+ recommended for larger throughput
+- CPU: multi-core recommended for parallel verification workers
+
+### Disk Projections
+
+Using benchmarked density (~1.2KB/op total):
+
+- 5,000,000 ops: ~6GB
+- 10,000,000 ops: ~12GB
+
+Always keep extra headroom for compaction, checkpoints, and operational buffers.
+
+### Example `config.yaml`
+
+See [`config.yaml`](./config.yaml). Core knobs:
+
+- `mode`
+- `verify`
+- `commit_batch_size`
+- `verify_workers`
+- `checkpoint_interval`
+- `rate_limit.*`
+- `request_timeout`
+
+### Example `docker-compose.yml`
+
+```yaml
+services:
+ plutia:
+ image: golang:1.25
+ working_dir: /app
+ command: sh -lc "go build -trimpath -o /app/bin/plutia ./cmd/plutia && /app/bin/plutia serve --config=/app/config.yaml"
+ ports:
+ - "8080:8080"
+ volumes:
+ - ./:/app
+ - ./data:/app/data
+ restart: unless-stopped
+```
+
+### Upgrade and Backup Guidance
+
+- Stop the process cleanly (`SIGTERM`) to flush pending writes.
+- Back up `data/index`, `data/ops`, and `data/checkpoints` together.
+- Keep the same `mode` per data directory across restarts.
+- Upgrade binaries first in staging, then production using the same on-disk data.
+
+## Mirror Comparison
+
+Use:
```bash
-go test ./...
-go build ./cmd/plutia
+plutia compare --config=config.yaml --remote=https://mirror.example.com
```
+The command fetches remote `/checkpoints/latest` and compares:
+
+- checkpoint sequence
+- DID Merkle root
+- signature presence
+
+Behavior:
+
+- same sequence + different root => divergence warning and non-zero exit
+- different sequences => reports which mirror is ahead and exits non-zero
+- matching sequence/root/signature presence => success
+
## License
MIT OR Apache-2.0
diff --git a/VERSION b/VERSION
new file mode 100644
index 0000000..b82608c
--- /dev/null
+++ b/VERSION
@@ -0,0 +1 @@
+v0.1.0
diff --git a/cmd/plutia/main.go b/cmd/plutia/main.go
index 0849e0a..2685a26 100644
--- a/cmd/plutia/main.go
+++ b/cmd/plutia/main.go
@@ -2,6 +2,7 @@ package main
import (
"context"
+ "encoding/json"
"errors"
"flag"
"fmt"
@@ -11,6 +12,7 @@ import (
"os/exec"
"os/signal"
"path/filepath"
+ "runtime"
"strconv"
"strings"
"syscall"
@@ -21,6 +23,13 @@ import (
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/ingest"
"github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+var (
+ version = "dev"
+ commit = "unknown"
+ buildDate = "unknown"
)
type app struct {
@@ -58,6 +67,14 @@ func main() {
if err := runBench(os.Args[2:]); err != nil {
log.Fatal(err)
}
+ case "compare":
+ if err := runCompare(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ case "version":
+ if err := runVersion(); err != nil {
+ log.Fatal(err)
+ }
default:
usage()
os.Exit(2)
@@ -96,6 +113,7 @@ func runServe(args []string) error {
}
errCh := make(chan error, 2)
+ pollDone := make(chan struct{})
go func() {
log.Printf("HTTP server listening on %s", app.cfg.ListenAddr)
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
@@ -104,24 +122,32 @@ func runServe(args []string) error {
}()
if !app.service.IsCorrupted() {
go func() {
+ defer close(pollDone)
if err := app.service.Poll(ctx); err != nil && !errors.Is(err, context.Canceled) {
errCh <- err
}
}()
+ } else {
+ close(pollDone)
}
+ var runErr error
select {
case <-ctx.Done():
case err := <-errCh:
- return err
+ runErr = err
+ stop()
}
+ <-pollDone
- shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+ shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
- if err := app.service.Flush(shutdownCtx); err != nil {
- return err
+ flushErr := app.service.Flush(shutdownCtx)
+ httpErr := httpSrv.Shutdown(shutdownCtx)
+ if runErr != nil || flushErr != nil || httpErr != nil {
+ return errors.Join(runErr, flushErr, httpErr)
}
- return httpSrv.Shutdown(shutdownCtx)
+ return nil
}
func runReplay(args []string) error {
@@ -270,6 +296,87 @@ func runBench(args []string) error {
return nil
}
+func runCompare(args []string) error {
+ fs := flag.NewFlagSet("compare", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ remote := fs.String("remote", "", "remote mirror base URL")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ if strings.TrimSpace(*remote) == "" {
+ return fmt.Errorf("--remote is required")
+ }
+
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+
+ local, ok, err := app.store.GetLatestCheckpoint()
+ if err != nil {
+ return fmt.Errorf("load local checkpoint: %w", err)
+ }
+ if !ok {
+ return fmt.Errorf("local mirror has no checkpoints")
+ }
+
+ url := strings.TrimRight(*remote, "/") + "/checkpoints/latest"
+ req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
+ if err != nil {
+ return err
+ }
+ httpClient := &http.Client{Timeout: app.cfg.RequestTimeout}
+ resp, err := httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("fetch remote checkpoint: %w", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("remote checkpoint response status=%d", resp.StatusCode)
+ }
+ var remoteCP types.CheckpointV1
+ if err := json.NewDecoder(resp.Body).Decode(&remoteCP); err != nil {
+ return fmt.Errorf("decode remote checkpoint: %w", err)
+ }
+
+ fmt.Printf("local sequence=%d root=%s signature_present=%t\n", local.Sequence, local.DIDMerkleRoot, local.Signature != "")
+ fmt.Printf("remote sequence=%d root=%s signature_present=%t\n", remoteCP.Sequence, remoteCP.DIDMerkleRoot, remoteCP.Signature != "")
+
+ mismatch := false
+ if local.Sequence == remoteCP.Sequence && local.DIDMerkleRoot != remoteCP.DIDMerkleRoot {
+ fmt.Printf("divergence: matching sequence %d with different merkle roots\n", local.Sequence)
+ mismatch = true
+ }
+ if local.Sequence > remoteCP.Sequence {
+ fmt.Printf("local mirror is ahead by %d checkpoints\n", local.Sequence-remoteCP.Sequence)
+ mismatch = true
+ } else if remoteCP.Sequence > local.Sequence {
+ fmt.Printf("remote mirror is ahead by %d checkpoints\n", remoteCP.Sequence-local.Sequence)
+ mismatch = true
+ }
+ if (local.Signature == "") != (remoteCP.Signature == "") {
+ fmt.Println("signature presence mismatch between local and remote checkpoints")
+ mismatch = true
+ }
+ if mismatch {
+ return fmt.Errorf("mirror comparison mismatch")
+ }
+ fmt.Println("mirrors match at latest checkpoint")
+ return nil
+}
+
+func runVersion() error {
+ fmt.Print(formatVersion())
+ return nil
+}
+
+func formatVersion() string {
+ return fmt.Sprintf("Version: %s\nCommit: %s\nBuildDate: %s\nGoVersion: %s\n",
+ version, commit, buildDate, runtime.Version())
+}
+
func bootstrap(path string) (*app, error) {
cfg, err := config.Load(path)
if err != nil {
@@ -303,10 +410,19 @@ func bootstrap(path string) (*app, error) {
return nil, err
}
}
- client := ingest.NewClient(cfg.PLCSource)
+ client := ingest.NewClient(cfg.PLCSource, ingest.ClientOptions{
+ MaxAttempts: cfg.HTTPRetryMaxAttempts,
+ BaseDelay: cfg.HTTPRetryBaseDelay,
+ MaxDelay: cfg.HTTPRetryMaxDelay,
+ })
checkpointMgr := checkpoint.NewManager(store, cfg.DataDir, cfg.MirrorPrivateKeyPath)
service := ingest.NewService(cfg, store, client, blockLog, checkpointMgr)
- apiServer := api.NewServer(cfg, store, service, checkpointMgr)
+ apiServer := api.NewServer(cfg, store, service, checkpointMgr, api.WithBuildInfo(api.BuildInfo{
+ Version: version,
+ Commit: commit,
+ BuildDate: buildDate,
+ GoVersion: runtime.Version(),
+ }))
return &app{cfg: cfg, store: store, service: service, apiServer: apiServer, checkpointM: checkpointMgr}, nil
}
@@ -319,6 +435,8 @@ Commands:
plutia verify --config=config.yaml --did=did:plc:xyz
plutia snapshot --config=config.yaml
plutia bench --config=config.yaml [--max-ops=200000] [--interval=10s]
+ plutia compare --config=config.yaml --remote=https://mirror.example.com
+ plutia version
`)
}
diff --git a/cmd/plutia/version_test.go b/cmd/plutia/version_test.go
new file mode 100644
index 0000000..3869de6
--- /dev/null
+++ b/cmd/plutia/version_test.go
@@ -0,0 +1,31 @@
+package main
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestFormatVersionIncludesBuildMetadata(t *testing.T) {
+ oldVersion, oldCommit, oldBuildDate := version, commit, buildDate
+ version = "v0.1.0"
+ commit = "abc123"
+ buildDate = "2026-02-26T00:00:00Z"
+ t.Cleanup(func() {
+ version = oldVersion
+ commit = oldCommit
+ buildDate = oldBuildDate
+ })
+
+ out := formatVersion()
+ checks := []string{
+ "Version: v0.1.0",
+ "Commit: abc123",
+ "BuildDate: 2026-02-26T00:00:00Z",
+ "GoVersion: go",
+ }
+ for _, want := range checks {
+ if !strings.Contains(out, want) {
+ t.Fatalf("version output missing %q: %s", want, out)
+ }
+ }
+}
diff --git a/config.yaml b/config.yaml
index 8373155..8e09923 100644
--- a/config.yaml
+++ b/config.yaml
@@ -10,3 +10,12 @@ verify_workers: 10
listen_addr: :8080
mirror_private_key_path: ./mirror.key
poll_interval: 5s
+request_timeout: 10s
+http_retry_max_attempts: 8
+http_retry_base_delay: 250ms
+http_retry_max_delay: 10s
+rate_limit:
+ resolve_rps: 30
+ resolve_burst: 60
+ proof_rps: 10
+ proof_burst: 20
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..03e430e
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,11 @@
+services:
+ plutia:
+ image: golang:1.25
+ working_dir: /app
+ command: sh -lc "go build -trimpath -o /app/bin/plutia ./cmd/plutia && /app/bin/plutia serve --config=/app/config.yaml"
+ ports:
+ - "8080:8080"
+ volumes:
+ - ./:/app
+ - ./data:/app/data
+ restart: unless-stopped
diff --git a/go.mod b/go.mod
index e86efe7..6ac4ba0 100644
--- a/go.mod
+++ b/go.mod
@@ -8,6 +8,7 @@ require (
github.com/fxamacker/cbor/v2 v2.9.0
github.com/klauspost/compress v1.17.11
github.com/mr-tron/base58 v1.2.0
+ github.com/prometheus/client_golang v1.17.0
gopkg.in/yaml.v3 v3.0.1
)
@@ -27,7 +28,6 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
- github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
diff --git a/internal/api/observability.go b/internal/api/observability.go
new file mode 100644
index 0000000..ebd7711
--- /dev/null
+++ b/internal/api/observability.go
@@ -0,0 +1,293 @@
+package api
+
+import (
+ "net"
+ "net/http"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/ingest"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+type BuildInfo struct {
+ Version string `json:"version"`
+ Commit string `json:"commit"`
+ BuildDate string `json:"build_date"`
+ GoVersion string `json:"go_version"`
+}
+
+type serverMetrics struct {
+ registry *prometheus.Registry
+ checkpointDuration prometheus.Histogram
+ checkpointSequence prometheus.Gauge
+}
+
+func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.Service) *serverMetrics {
+ reg := prometheus.NewRegistry()
+ var diskMu sync.Mutex
+ var diskCached int64
+ var diskCachedAt time.Time
+ m := &serverMetrics{
+ registry: reg,
+ checkpointDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
+ Name: "checkpoint_duration_seconds",
+ Help: "Time spent generating and signing checkpoints.",
+ Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 20, 60},
+ }),
+ checkpointSequence: prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: "checkpoint_sequence",
+ Help: "Latest checkpoint sequence generated by this mirror.",
+ }),
+ }
+ reg.MustRegister(m.checkpointDuration, m.checkpointSequence)
+ reg.MustRegister(prometheus.NewCounterFunc(
+ prometheus.CounterOpts{
+ Name: "ingest_ops_total",
+ Help: "Total operations persisted by the mirror.",
+ },
+ func() float64 {
+ seq, err := store.GetGlobalSeq()
+ if err != nil {
+ return 0
+ }
+ return float64(seq)
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Name: "ingest_ops_per_second",
+ Help: "Ingestion operations per second (process-average).",
+ },
+ func() float64 {
+ if ingestor == nil {
+ return 0
+ }
+ return ingestor.Stats().IngestOpsPerSec
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Name: "ingest_lag_ops",
+ Help: "Difference between latest observed upstream sequence and committed global sequence.",
+ },
+ func() float64 {
+ if ingestor == nil {
+ return 0
+ }
+ return float64(ingestor.Stats().LagOps)
+ },
+ ))
+ reg.MustRegister(prometheus.NewCounterFunc(
+ prometheus.CounterOpts{
+ Name: "verify_failures_total",
+ Help: "Total signature/link verification failures seen during ingestion.",
+ },
+ func() float64 {
+ if ingestor == nil {
+ return 0
+ }
+ return float64(ingestor.Stats().VerifyFailures)
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Name: "disk_bytes_total",
+ Help: "Total bytes used by the configured data directory.",
+ },
+ func() float64 {
+ diskMu.Lock()
+ defer diskMu.Unlock()
+ if diskCachedAt.IsZero() || time.Since(diskCachedAt) > 5*time.Second {
+ size, err := dirSize(cfg.DataDir)
+ if err == nil {
+ diskCached = size
+ diskCachedAt = time.Now()
+ }
+ }
+ return float64(diskCached)
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Name: "did_count",
+ Help: "Number of DIDs currently materialized in state storage.",
+ },
+ func() float64 {
+ if ingestor != nil {
+ return float64(ingestor.Stats().DIDCount)
+ }
+ count := uint64(0)
+ _ = store.ForEachState(func(_ types.StateV1) error {
+ count++
+ return nil
+ })
+ return float64(count)
+ },
+ ))
+ return m
+}
+
+func (m *serverMetrics) Handler() http.Handler {
+ return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{
+ EnableOpenMetrics: true,
+ })
+}
+
+func (m *serverMetrics) ObserveCheckpoint(duration time.Duration, sequence uint64) {
+ m.checkpointDuration.Observe(duration.Seconds())
+ m.checkpointSequence.Set(float64(sequence))
+}
+
+type serverOption func(*Server)
+
+func WithBuildInfo(info BuildInfo) serverOption {
+ return func(s *Server) {
+ s.build = info
+ }
+}
+
+type limiterClass int
+
+const (
+ limiterResolve limiterClass = iota
+ limiterProof
+)
+
+type tokenBucket struct {
+ tokens float64
+ last time.Time
+ lastSeen time.Time
+}
+
+type endpointPolicy struct {
+ rps float64
+ burst float64
+}
+
+type ipRateLimiter struct {
+ mu sync.Mutex
+ buckets map[string]*tokenBucket
+ resolve endpointPolicy
+ proof endpointPolicy
+ lastSweep time.Time
+}
+
+func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter {
+ def := config.Default().RateLimit
+ if cfg.ResolveRPS <= 0 {
+ cfg.ResolveRPS = def.ResolveRPS
+ }
+ if cfg.ResolveBurst <= 0 {
+ cfg.ResolveBurst = def.ResolveBurst
+ }
+ if cfg.ProofRPS <= 0 {
+ cfg.ProofRPS = def.ProofRPS
+ }
+ if cfg.ProofBurst <= 0 {
+ cfg.ProofBurst = def.ProofBurst
+ }
+ return &ipRateLimiter{
+ buckets: map[string]*tokenBucket{},
+ resolve: endpointPolicy{
+ rps: cfg.ResolveRPS,
+ burst: float64(cfg.ResolveBurst),
+ },
+ proof: endpointPolicy{
+ rps: cfg.ProofRPS,
+ burst: float64(cfg.ProofBurst),
+ },
+ lastSweep: time.Now(),
+ }
+}
+
+func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool {
+ now := time.Now()
+ l.mu.Lock()
+ defer l.mu.Unlock()
+
+ if now.Sub(l.lastSweep) > 2*time.Minute {
+ for key, bucket := range l.buckets {
+ if now.Sub(bucket.lastSeen) > 15*time.Minute {
+ delete(l.buckets, key)
+ }
+ }
+ l.lastSweep = now
+ }
+
+ policy := l.resolve
+ routeKey := "resolve"
+ if class == limiterProof {
+ policy = l.proof
+ routeKey = "proof"
+ }
+ key := routeKey + "|" + ip
+ b, ok := l.buckets[key]
+ if !ok {
+ l.buckets[key] = &tokenBucket{
+ tokens: policy.burst - 1,
+ last: now,
+ lastSeen: now,
+ }
+ return true
+ }
+ elapsed := now.Sub(b.last).Seconds()
+ if elapsed > 0 {
+ b.tokens += elapsed * policy.rps
+ if b.tokens > policy.burst {
+ b.tokens = policy.burst
+ }
+ }
+ b.last = now
+ b.lastSeen = now
+ if b.tokens < 1 {
+ return false
+ }
+ b.tokens--
+ return true
+}
+
+func clientIP(r *http.Request) string {
+ if forwarded := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); forwarded != "" {
+ parts := strings.Split(forwarded, ",")
+ if len(parts) > 0 {
+ if ip := strings.TrimSpace(parts[0]); ip != "" {
+ return ip
+ }
+ }
+ }
+ if realIP := strings.TrimSpace(r.Header.Get("X-Real-IP")); realIP != "" {
+ return realIP
+ }
+ host, _, err := net.SplitHostPort(r.RemoteAddr)
+ if err == nil && host != "" {
+ return host
+ }
+ return r.RemoteAddr
+}
+
+func dirSize(path string) (int64, error) {
+ var total int64
+ err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error {
+ if err != nil {
+ return err
+ }
+ if d.IsDir() {
+ return nil
+ }
+ info, err := d.Info()
+ if err != nil {
+ return err
+ }
+ total += info.Size()
+ return nil
+ })
+ return total, err
+}
diff --git a/internal/api/server.go b/internal/api/server.go
index 2159029..f1ffc7c 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -1,12 +1,15 @@
package api
import (
+ "context"
"encoding/hex"
"encoding/json"
+ "errors"
"fmt"
"net/http"
"strconv"
"strings"
+ "time"
"github.com/Fuwn/plutia/internal/checkpoint"
"github.com/Fuwn/plutia/internal/config"
@@ -22,20 +25,46 @@ type Server struct {
store storage.Store
ingestor *ingest.Service
checkpoints *checkpoint.Manager
+ build BuildInfo
+ limiter *ipRateLimiter
+ metrics *serverMetrics
}
-func NewServer(cfg config.Config, store storage.Store, ingestor *ingest.Service, checkpoints *checkpoint.Manager) *Server {
- return &Server{cfg: cfg, store: store, ingestor: ingestor, checkpoints: checkpoints}
+func NewServer(cfg config.Config, store storage.Store, ingestor *ingest.Service, checkpoints *checkpoint.Manager, opts ...serverOption) *Server {
+ s := &Server{
+ cfg: cfg,
+ store: store,
+ ingestor: ingestor,
+ checkpoints: checkpoints,
+ build: BuildInfo{
+ Version: "dev",
+ Commit: "unknown",
+ BuildDate: "unknown",
+ GoVersion: "unknown",
+ },
+ limiter: newIPRateLimiter(cfg.RateLimit),
+ }
+ for _, opt := range opts {
+ opt(s)
+ }
+ s.metrics = newServerMetrics(cfg, store, ingestor)
+ if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok {
+ s.metrics.checkpointSequence.Set(float64(cp.Sequence))
+ }
+ if ingestor != nil {
+ ingestor.SetMetricsSink(s.metrics)
+ }
+ return s
}
func (s *Server) Handler() http.Handler {
mux := http.NewServeMux()
- mux.HandleFunc("/health", s.handleHealth)
- mux.HandleFunc("/metrics", s.handleMetrics)
- mux.HandleFunc("/status", s.handleStatus)
- mux.HandleFunc("/checkpoints/latest", s.handleLatestCheckpoint)
- mux.HandleFunc("/checkpoints/", s.handleCheckpointBySequence)
- mux.HandleFunc("/did/", s.handleDID)
+ mux.Handle("/health", s.withTimeout(http.HandlerFunc(s.handleHealth)))
+ mux.Handle("/metrics", s.metrics.Handler())
+ mux.Handle("/status", s.withTimeout(http.HandlerFunc(s.handleStatus)))
+ mux.Handle("/checkpoints/latest", s.withTimeout(http.HandlerFunc(s.handleLatestCheckpoint)))
+ mux.Handle("/checkpoints/", s.withTimeout(http.HandlerFunc(s.handleCheckpointBySequence)))
+ mux.Handle("/did/", s.withTimeout(http.HandlerFunc(s.handleDID)))
return mux
}
@@ -43,15 +72,6 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
}
-func (s *Server) handleMetrics(w http.ResponseWriter, r *http.Request) {
- st := s.ingestor.Stats()
- seq, _ := s.store.GetGlobalSeq()
- w.Header().Set("Content-Type", "text/plain; version=0.0.4")
- _, _ = fmt.Fprintf(w, "plutia_ingested_ops %d\n", st.IngestedOps)
- _, _ = fmt.Fprintf(w, "plutia_ingest_errors %d\n", st.Errors)
- _, _ = fmt.Fprintf(w, "plutia_last_seq %d\n", seq)
-}
-
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
seq, err := s.store.GetGlobalSeq()
if err != nil {
@@ -63,15 +83,22 @@ func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
writeErr(w, http.StatusInternalServerError, err)
return
}
+ stats := ingest.Stats{}
+ if s.ingestor != nil {
+ stats = s.ingestor.Stats()
+ }
payload := map[string]any{
"mode": s.cfg.Mode,
"verify_policy": s.cfg.VerifyPolicy,
"global_seq": seq,
- "stats": s.ingestor.Stats(),
- "corrupted": s.ingestor.IsCorrupted(),
+ "stats": stats,
+ "build": s.build,
}
- if err := s.ingestor.CorruptionError(); err != nil {
- payload["corruption_error"] = err.Error()
+ if s.ingestor != nil {
+ payload["corrupted"] = s.ingestor.IsCorrupted()
+ if err := s.ingestor.CorruptionError(); err != nil {
+ payload["corruption_error"] = err.Error()
+ }
}
if ok {
payload["latest_checkpoint"] = cp
@@ -123,9 +150,17 @@ func (s *Server) handleDID(w http.ResponseWriter, r *http.Request) {
}
if strings.HasSuffix(path, "/proof") {
did := strings.TrimSuffix(path, "/proof")
+ if !s.allowRequest(r, limiterProof) {
+ writeErr(w, http.StatusTooManyRequests, fmt.Errorf("proof rate limit exceeded"))
+ return
+ }
s.handleDIDProof(w, r, did)
return
}
+ if !s.allowRequest(r, limiterResolve) {
+ writeErr(w, http.StatusTooManyRequests, fmt.Errorf("resolve rate limit exceeded"))
+ return
+ }
s.handleDIDResolve(w, r, path)
}
@@ -159,6 +194,10 @@ func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did st
}
func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did string) {
+ if s.ingestor == nil {
+ writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+ return
+ }
if err := s.ingestor.CorruptionError(); err != nil {
writeErr(w, http.StatusServiceUnavailable, err)
return
@@ -172,11 +211,19 @@ func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did stri
tipHash, seqs, err := s.ingestor.RecomputeTipAtOrBefore(r.Context(), did, cp.Sequence)
if err != nil {
+ if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+ writeErr(w, http.StatusGatewayTimeout, err)
+ return
+ }
writeErr(w, http.StatusNotFound, err)
return
}
- siblings, leafHash, found, err := s.checkpoints.BuildDIDProofAtCheckpoint(did, tipHash, cp.Sequence)
+ siblings, leafHash, found, err := s.checkpoints.BuildDIDProofAtCheckpoint(r.Context(), did, tipHash, cp.Sequence)
if err != nil {
+ if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+ writeErr(w, http.StatusGatewayTimeout, err)
+ return
+ }
writeErr(w, http.StatusInternalServerError, err)
return
}
@@ -228,6 +275,25 @@ func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did stri
})
}
+func (s *Server) withTimeout(next http.Handler) http.Handler {
+ timeout := s.cfg.RequestTimeout
+ if timeout <= 0 {
+ timeout = 10 * time.Second
+ }
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ ctx, cancel := context.WithTimeout(r.Context(), timeout)
+ defer cancel()
+ next.ServeHTTP(w, r.WithContext(ctx))
+ })
+}
+
+func (s *Server) allowRequest(r *http.Request, class limiterClass) bool {
+ if s.limiter == nil {
+ return true
+ }
+ return s.limiter.Allow(clientIP(r), class)
+}
+
func (s *Server) selectCheckpointForProof(r *http.Request) (types.CheckpointV1, func() error, error) {
checkpointParam := strings.TrimSpace(r.URL.Query().Get("checkpoint"))
if checkpointParam == "" {
diff --git a/internal/api/server_hardening_test.go b/internal/api/server_hardening_test.go
new file mode 100644
index 0000000..bb9f24c
--- /dev/null
+++ b/internal/api/server_hardening_test.go
@@ -0,0 +1,119 @@
+package api
+
+import (
+ "encoding/json"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+func TestResolveRateLimitPerIP(t *testing.T) {
+ store, err := storage.OpenPebble(t.TempDir())
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+ if err := store.PutState(types.StateV1{Version: 1, DID: "did:plc:alice", DIDDocument: []byte(`{"id":"did:plc:alice"}`), ChainTipHash: "tip", LatestOpSeq: 1, UpdatedAt: time.Now().UTC()}); err != nil {
+ t.Fatalf("put state: %v", err)
+ }
+
+ cfg := config.Default()
+ cfg.RequestTimeout = 10 * time.Second
+ cfg.RateLimit.ResolveRPS = 1
+ cfg.RateLimit.ResolveBurst = 1
+ cfg.RateLimit.ProofRPS = 1
+ cfg.RateLimit.ProofBurst = 1
+ h := NewServer(cfg, store, nil, nil).Handler()
+
+ req1 := httptest.NewRequest(http.MethodGet, "/did/did:plc:alice", nil)
+ req1.RemoteAddr = "203.0.113.7:12345"
+ rr1 := httptest.NewRecorder()
+ h.ServeHTTP(rr1, req1)
+ if rr1.Code != http.StatusOK {
+ t.Fatalf("first request status: got %d want %d", rr1.Code, http.StatusOK)
+ }
+
+ req2 := httptest.NewRequest(http.MethodGet, "/did/did:plc:alice", nil)
+ req2.RemoteAddr = "203.0.113.7:12345"
+ rr2 := httptest.NewRecorder()
+ h.ServeHTTP(rr2, req2)
+ if rr2.Code != http.StatusTooManyRequests {
+ t.Fatalf("second request status: got %d want %d", rr2.Code, http.StatusTooManyRequests)
+ }
+}
+
+func TestStatusIncludesBuildInfo(t *testing.T) {
+ store, err := storage.OpenPebble(t.TempDir())
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ h := NewServer(config.Default(), store, nil, nil, WithBuildInfo(BuildInfo{
+ Version: "v0.1.0",
+ Commit: "abc123",
+ BuildDate: "2026-02-26T00:00:00Z",
+ GoVersion: "go1.test",
+ })).Handler()
+
+ req := httptest.NewRequest(http.MethodGet, "/status", nil)
+ rr := httptest.NewRecorder()
+ h.ServeHTTP(rr, req)
+ if rr.Code != http.StatusOK {
+ t.Fatalf("status code: got %d want %d", rr.Code, http.StatusOK)
+ }
+
+ var payload map[string]any
+ if err := json.Unmarshal(rr.Body.Bytes(), &payload); err != nil {
+ t.Fatalf("decode status: %v", err)
+ }
+ build, ok := payload["build"].(map[string]any)
+ if !ok {
+ t.Fatalf("missing build section: %v", payload)
+ }
+ if got := build["version"]; got != "v0.1.0" {
+ t.Fatalf("unexpected build version: %v", got)
+ }
+ if got := build["commit"]; got != "abc123" {
+ t.Fatalf("unexpected build commit: %v", got)
+ }
+}
+
+func TestMetricsExposeRequiredSeries(t *testing.T) {
+ store, err := storage.OpenPebble(t.TempDir())
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ h := NewServer(config.Default(), store, nil, nil).Handler()
+ req := httptest.NewRequest(http.MethodGet, "/metrics", nil)
+ rr := httptest.NewRecorder()
+ h.ServeHTTP(rr, req)
+ if rr.Code != http.StatusOK {
+ t.Fatalf("metrics status: got %d want %d", rr.Code, http.StatusOK)
+ }
+ body, _ := io.ReadAll(rr.Body)
+ text := string(body)
+ for _, metric := range []string{
+ "ingest_ops_total",
+ "ingest_ops_per_second",
+ "ingest_lag_ops",
+ "verify_failures_total",
+ "checkpoint_duration_seconds",
+ "checkpoint_sequence",
+ "disk_bytes_total",
+ "did_count",
+ } {
+ if !strings.Contains(text, metric) {
+ t.Fatalf("metrics output missing %q", metric)
+ }
+ }
+}
diff --git a/internal/checkpoint/checkpoint.go b/internal/checkpoint/checkpoint.go
index 840fac3..c3ed0da 100644
--- a/internal/checkpoint/checkpoint.go
+++ b/internal/checkpoint/checkpoint.go
@@ -2,6 +2,7 @@ package checkpoint
import (
"bufio"
+ "context"
"crypto/ed25519"
"crypto/sha256"
"encoding/base64"
@@ -107,7 +108,7 @@ func (m *Manager) signAndPersist(sequence uint64, didRoot string, blockHashes []
return unsigned, nil
}
-func (m *Manager) BuildDIDProofAtCheckpoint(did, chainTipHash string, checkpointSeq uint64) ([]merkle.Sibling, string, bool, error) {
+func (m *Manager) BuildDIDProofAtCheckpoint(ctx context.Context, did, chainTipHash string, checkpointSeq uint64) ([]merkle.Sibling, string, bool, error) {
snapshot, err := m.LoadStateSnapshot(checkpointSeq)
if err != nil {
return nil, "", false, err
@@ -116,6 +117,9 @@ func (m *Manager) BuildDIDProofAtCheckpoint(did, chainTipHash string, checkpoint
index := -1
leafHashHex := ""
for i, s := range snapshot.Leaves {
+ if err := ctx.Err(); err != nil {
+ return nil, "", false, err
+ }
h := merkle.HashLeaf([]byte(s.DID + s.ChainTipHash))
leaves[i] = h
if s.DID == did && s.ChainTipHash == chainTipHash {
diff --git a/internal/checkpoint/checkpoint_test.go b/internal/checkpoint/checkpoint_test.go
index 9ca9b2a..8149129 100644
--- a/internal/checkpoint/checkpoint_test.go
+++ b/internal/checkpoint/checkpoint_test.go
@@ -1,9 +1,11 @@
package checkpoint
import (
+ "context"
"crypto/ed25519"
"crypto/rand"
"encoding/base64"
+ "encoding/json"
"os"
"path/filepath"
"testing"
@@ -103,3 +105,52 @@ func TestCheckpointRootStability(t *testing.T) {
t.Fatalf("block root changed for identical block set: %s vs %s", cp1.BlockMerkleRoot, cp2.BlockMerkleRoot)
}
}
+
+func TestBuildDIDProofAtCheckpointHonorsContextCancellation(t *testing.T) {
+ tmp := t.TempDir()
+ store, err := storage.OpenPebble(tmp)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ _, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key: %v", err)
+ }
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(priv)), 0o600); err != nil {
+ t.Fatalf("write key: %v", err)
+ }
+ checkpointsDir := filepath.Join(tmp, "checkpoints")
+ if err := os.MkdirAll(checkpointsDir, 0o755); err != nil {
+ t.Fatalf("mkdir checkpoints: %v", err)
+ }
+ leaves := make([]types.DIDLeaf, 500)
+ for i := range leaves {
+ leaves[i] = types.DIDLeaf{
+ DID: "did:plc:test-" + string(rune('a'+(i%26))),
+ ChainTipHash: "tip",
+ }
+ }
+ snapshot := types.CheckpointStateSnapshotV1{
+ Version: 1,
+ Sequence: 10,
+ CreatedAt: "2026-02-26T00:00:00Z",
+ Leaves: leaves,
+ }
+ b, err := json.Marshal(snapshot)
+ if err != nil {
+ t.Fatalf("marshal snapshot: %v", err)
+ }
+ if err := os.WriteFile(filepath.Join(checkpointsDir, "00000000000000000010.state.json"), b, 0o644); err != nil {
+ t.Fatalf("write snapshot: %v", err)
+ }
+
+ mgr := NewManager(store, tmp, keyPath)
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ if _, _, _, err := mgr.BuildDIDProofAtCheckpoint(ctx, "did:plc:test-a", "tip", 10); err == nil {
+ t.Fatalf("expected context cancellation error")
+ }
+}
diff --git a/internal/config/config.go b/internal/config/config.go
index 5e6e467..609bde5 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -34,6 +34,18 @@ type Config struct {
ListenAddr string `yaml:"listen_addr"`
MirrorPrivateKeyPath string `yaml:"mirror_private_key_path"`
PollInterval time.Duration `yaml:"poll_interval"`
+ RequestTimeout time.Duration `yaml:"request_timeout"`
+ RateLimit RateLimit `yaml:"rate_limit"`
+ HTTPRetryMaxAttempts int `yaml:"http_retry_max_attempts"`
+ HTTPRetryBaseDelay time.Duration `yaml:"http_retry_base_delay"`
+ HTTPRetryMaxDelay time.Duration `yaml:"http_retry_max_delay"`
+}
+
+type RateLimit struct {
+ ResolveRPS float64 `yaml:"resolve_rps"`
+ ResolveBurst int `yaml:"resolve_burst"`
+ ProofRPS float64 `yaml:"proof_rps"`
+ ProofBurst int `yaml:"proof_burst"`
}
func Default() Config {
@@ -50,6 +62,16 @@ func Default() Config {
ListenAddr: ":8080",
MirrorPrivateKeyPath: "./mirror.key",
PollInterval: 5 * time.Second,
+ RequestTimeout: 10 * time.Second,
+ RateLimit: RateLimit{
+ ResolveRPS: 30,
+ ResolveBurst: 60,
+ ProofRPS: 10,
+ ProofBurst: 20,
+ },
+ HTTPRetryMaxAttempts: 8,
+ HTTPRetryBaseDelay: 250 * time.Millisecond,
+ HTTPRetryMaxDelay: 10 * time.Second,
}
}
@@ -77,6 +99,13 @@ func applyEnv(cfg *Config) {
*dst = v
}
}
+ setFloat64 := func(key string, dst *float64) {
+ if v := strings.TrimSpace(os.Getenv(key)); v != "" {
+ if n, err := strconv.ParseFloat(v, 64); err == nil {
+ *dst = n
+ }
+ }
+ }
setInt := func(key string, dst *int) {
if v := strings.TrimSpace(os.Getenv(key)); v != "" {
if n, err := strconv.Atoi(v); err == nil {
@@ -111,6 +140,14 @@ func applyEnv(cfg *Config) {
setString("PLUTIA_LISTEN_ADDR", &cfg.ListenAddr)
setString("PLUTIA_MIRROR_PRIVATE_KEY_PATH", &cfg.MirrorPrivateKeyPath)
setDuration("PLUTIA_POLL_INTERVAL", &cfg.PollInterval)
+ setDuration("PLUTIA_REQUEST_TIMEOUT", &cfg.RequestTimeout)
+ setFloat64("PLUTIA_RATE_LIMIT_RESOLVE_RPS", &cfg.RateLimit.ResolveRPS)
+ setInt("PLUTIA_RATE_LIMIT_RESOLVE_BURST", &cfg.RateLimit.ResolveBurst)
+ setFloat64("PLUTIA_RATE_LIMIT_PROOF_RPS", &cfg.RateLimit.ProofRPS)
+ setInt("PLUTIA_RATE_LIMIT_PROOF_BURST", &cfg.RateLimit.ProofBurst)
+ setInt("PLUTIA_HTTP_RETRY_MAX_ATTEMPTS", &cfg.HTTPRetryMaxAttempts)
+ setDuration("PLUTIA_HTTP_RETRY_BASE_DELAY", &cfg.HTTPRetryBaseDelay)
+ setDuration("PLUTIA_HTTP_RETRY_MAX_DELAY", &cfg.HTTPRetryMaxDelay)
}
func (c Config) Validate() error {
@@ -149,5 +186,32 @@ func (c Config) Validate() error {
if c.PollInterval <= 0 {
return errors.New("poll_interval must be > 0")
}
+ if c.RequestTimeout <= 0 {
+ return errors.New("request_timeout must be > 0")
+ }
+ if c.RateLimit.ResolveRPS <= 0 {
+ return errors.New("rate_limit.resolve_rps must be > 0")
+ }
+ if c.RateLimit.ResolveBurst <= 0 {
+ return errors.New("rate_limit.resolve_burst must be > 0")
+ }
+ if c.RateLimit.ProofRPS <= 0 {
+ return errors.New("rate_limit.proof_rps must be > 0")
+ }
+ if c.RateLimit.ProofBurst <= 0 {
+ return errors.New("rate_limit.proof_burst must be > 0")
+ }
+ if c.HTTPRetryMaxAttempts < 1 || c.HTTPRetryMaxAttempts > 32 {
+ return fmt.Errorf("http_retry_max_attempts must be between 1 and 32, got %d", c.HTTPRetryMaxAttempts)
+ }
+ if c.HTTPRetryBaseDelay <= 0 {
+ return errors.New("http_retry_base_delay must be > 0")
+ }
+ if c.HTTPRetryMaxDelay <= 0 {
+ return errors.New("http_retry_max_delay must be > 0")
+ }
+ if c.HTTPRetryBaseDelay > c.HTTPRetryMaxDelay {
+ return errors.New("http_retry_base_delay must be <= http_retry_max_delay")
+ }
return nil
}
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
index 0f9ad43..d25b73f 100644
--- a/internal/ingest/client.go
+++ b/internal/ingest/client.go
@@ -8,11 +8,13 @@ import (
"errors"
"fmt"
"io"
+ "log"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
+ "strconv"
"strings"
"time"
@@ -22,9 +24,32 @@ import (
type Client struct {
source string
http *http.Client
+ opts ClientOptions
}
-func NewClient(source string) *Client {
+type ClientOptions struct {
+ MaxAttempts int
+ BaseDelay time.Duration
+ MaxDelay time.Duration
+}
+
+func NewClient(source string, opts ...ClientOptions) *Client {
+ cfg := ClientOptions{
+ MaxAttempts: 8,
+ BaseDelay: 250 * time.Millisecond,
+ MaxDelay: 10 * time.Second,
+ }
+ if len(opts) > 0 {
+ if opts[0].MaxAttempts > 0 {
+ cfg.MaxAttempts = opts[0].MaxAttempts
+ }
+ if opts[0].BaseDelay > 0 {
+ cfg.BaseDelay = opts[0].BaseDelay
+ }
+ if opts[0].MaxDelay > 0 {
+ cfg.MaxDelay = opts[0].MaxDelay
+ }
+ }
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
@@ -37,6 +62,7 @@ func NewClient(source string) *Client {
}
return &Client{
source: strings.TrimRight(source, "/"),
+ opts: cfg,
http: &http.Client{
Timeout: 60 * time.Second,
Transport: transport,
@@ -52,6 +78,7 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") {
return c.fetchFromFile(after, limit)
}
+
u, err := url.Parse(c.source)
if err != nil {
return nil, fmt.Errorf("parse plc source: %w", err)
@@ -65,16 +92,127 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
}
- resp, err := c.http.Do(req)
+ maxAttempts := c.opts.MaxAttempts
+ if maxAttempts < 1 {
+ maxAttempts = 1
+ }
+ var lastErr error
+ for attempt := 1; attempt <= maxAttempts; attempt++ {
+ records, retryAfter, retryable, err := c.fetchExportOnce(req, limit)
+ if err == nil {
+ return records, nil
+ }
+ lastErr = err
+ if !retryable || attempt == maxAttempts || ctx.Err() != nil {
+ break
+ }
+ delay := retryAfter
+ if delay <= 0 {
+ delay = c.backoffDelay(attempt)
+ }
+ log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err)
+ timer := time.NewTimer(delay)
+ select {
+ case <-ctx.Done():
+ timer.Stop()
+ return nil, ctx.Err()
+ case <-timer.C:
+ }
+ }
+ return nil, lastErr
+}
+
+type httpStatusError struct {
+ StatusCode int
+ Body string
+}
+
+func (e httpStatusError) Error() string {
+ return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body)
+}
+
+func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) {
+ reqClone := req.Clone(req.Context())
+ reqClone.Header.Set("Accept-Encoding", "gzip")
+ resp, err := c.http.Do(reqClone)
if err != nil {
- return nil, fmt.Errorf("fetch export: %w", err)
+ return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err)
}
defer resp.Body.Close()
+
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024))
- return nil, fmt.Errorf("export response %d: %s", resp.StatusCode, strings.TrimSpace(string(b)))
+ body := strings.TrimSpace(string(b))
+ retryDelay := parseRetryAfter(resp.Header.Get("Retry-After"))
+ err := httpStatusError{StatusCode: resp.StatusCode, Body: body}
+ return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err
+ }
+ records, err := decodeExportBody(resp.Body, limit)
+ if err != nil {
+ return nil, 0, false, err
+ }
+ return records, 0, false, nil
+}
+
+func (c *Client) backoffDelay(attempt int) time.Duration {
+ if attempt < 1 {
+ attempt = 1
+ }
+ delay := c.opts.BaseDelay
+ if delay <= 0 {
+ delay = 250 * time.Millisecond
+ }
+ maxDelay := c.opts.MaxDelay
+ if maxDelay <= 0 {
+ maxDelay = 10 * time.Second
+ }
+ for i := 1; i < attempt; i++ {
+ delay *= 2
+ if delay >= maxDelay {
+ return maxDelay
+ }
+ }
+ if delay > maxDelay {
+ return maxDelay
+ }
+ return delay
+}
+
+func shouldRetryStatus(status int) bool {
+ switch status {
+ case http.StatusTooManyRequests, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
+ return true
+ default:
+ return false
+ }
+}
+
+func parseRetryAfter(v string) time.Duration {
+ v = strings.TrimSpace(v)
+ if v == "" {
+ return 0
+ }
+ if secs, err := strconv.Atoi(v); err == nil && secs > 0 {
+ return time.Duration(secs) * time.Second
+ }
+ if ts, err := http.ParseTime(v); err == nil {
+ delay := time.Until(ts)
+ if delay > 0 {
+ return delay
+ }
+ }
+ return 0
+}
+
+func isTransientNetworkErr(err error) bool {
+ if err == nil {
+ return false
+ }
+ var netErr net.Error
+ if errors.As(err, &netErr) {
+ return true
}
- return decodeExportBody(resp.Body, limit)
+ return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF)
}
func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {
diff --git a/internal/ingest/client_retry_test.go b/internal/ingest/client_retry_test.go
new file mode 100644
index 0000000..a23fceb
--- /dev/null
+++ b/internal/ingest/client_retry_test.go
@@ -0,0 +1,63 @@
+package ingest
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestFetchExportLimitedRetries429ThenSucceeds(t *testing.T) {
+ var attempts atomic.Int32
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ n := attempts.Add(1)
+ if n <= 2 {
+ w.Header().Set("Retry-After", "0")
+ http.Error(w, "rate limited", http.StatusTooManyRequests)
+ return
+ }
+ _, _ = fmt.Fprintln(w, `{"seq":1,"did":"did:plc:alice","cid":"cid1","operation":{"x":1}}`)
+ }))
+ defer ts.Close()
+
+ client := NewClient(ts.URL, ClientOptions{
+ MaxAttempts: 5,
+ BaseDelay: time.Millisecond,
+ MaxDelay: 2 * time.Millisecond,
+ })
+ records, err := client.FetchExportLimited(context.Background(), 0, 0)
+ if err != nil {
+ t.Fatalf("fetch export: %v", err)
+ }
+ if len(records) != 1 {
+ t.Fatalf("record count mismatch: got %d want 1", len(records))
+ }
+ if got := attempts.Load(); got != 3 {
+ t.Fatalf("attempt count mismatch: got %d want 3", got)
+ }
+}
+
+func TestFetchExportLimitedDoesNotRetry400(t *testing.T) {
+ var attempts atomic.Int32
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ attempts.Add(1)
+ http.Error(w, "bad request", http.StatusBadRequest)
+ }))
+ defer ts.Close()
+
+ client := NewClient(ts.URL, ClientOptions{
+ MaxAttempts: 5,
+ BaseDelay: time.Millisecond,
+ MaxDelay: 2 * time.Millisecond,
+ })
+ _, err := client.FetchExportLimited(context.Background(), 0, 0)
+ if err == nil {
+ t.Fatalf("expected error for 400 response")
+ }
+ if got := attempts.Load(); got != 1 {
+ t.Fatalf("unexpected retries on 400: got attempts=%d want 1", got)
+ }
+}
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
index 8451246..817a8bd 100644
--- a/internal/ingest/service.go
+++ b/internal/ingest/service.go
@@ -24,9 +24,14 @@ import (
)
type Stats struct {
- IngestedOps uint64 `json:"ingested_ops"`
- Errors uint64 `json:"errors"`
- LastSeq uint64 `json:"last_seq"`
+ IngestedOps uint64 `json:"ingested_ops"`
+ Errors uint64 `json:"errors"`
+ LastSeq uint64 `json:"last_seq"`
+ VerifyFailures uint64 `json:"verify_failures"`
+ LagOps uint64 `json:"lag_ops"`
+ DIDCount uint64 `json:"did_count"`
+ CheckpointSeq uint64 `json:"checkpoint_sequence"`
+ IngestOpsPerSec float64 `json:"ingest_ops_per_second"`
}
type Service struct {
@@ -43,6 +48,12 @@ type Service struct {
runOps uint64
corrupted atomic.Bool
corruptErr atomic.Value
+ startedAt time.Time
+ metricsSink MetricsSink
+}
+
+type MetricsSink interface {
+ ObserveCheckpoint(duration time.Duration, sequence uint64)
}
func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service {
@@ -54,6 +65,13 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog
engine: state.New(store, cfg.Mode),
checkpoints: checkpointMgr,
blockLog: blockLog,
+ startedAt: time.Now(),
+ }
+ if didCount, err := countStates(store); err == nil {
+ atomic.StoreUint64(&s.stats.DIDCount, didCount)
+ }
+ if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok {
+ atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
}
if cfg.Mode == config.ModeMirror && blockLog != nil {
s.appender = newBlockAppender(blockLog, 1024)
@@ -119,14 +137,22 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) {
return false, err
}
if len(records) == 0 {
+ atomic.StoreUint64(&s.stats.LagOps, 0)
return false, nil
}
+ latestSourceSeq := records[len(records)-1].Seq
committed, err := s.processRecords(ctx, records)
s.runOps += committed
if err != nil {
atomic.AddUint64(&s.stats.Errors, 1)
return committed > 0, err
}
+ lastCommitted := atomic.LoadUint64(&s.stats.LastSeq)
+ if latestSourceSeq > lastCommitted {
+ atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted)
+ } else {
+ atomic.StoreUint64(&s.stats.LagOps, 0)
+ }
return true, nil
}
@@ -136,10 +162,11 @@ type verifyTask struct {
}
type verifyResult struct {
- index int
- op types.ParsedOperation
- state types.StateV1
- err error
+ index int
+ op types.ParsedOperation
+ state types.StateV1
+ newDID bool
+ err error
}
func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) {
@@ -178,6 +205,7 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor
continue
}
if err := s.verifier.VerifyOperation(op, existing); err != nil {
+ atomic.AddUint64(&s.stats.VerifyFailures, 1)
results <- verifyResult{index: task.index, err: err}
continue
}
@@ -187,7 +215,7 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor
continue
}
cache[op.DID] = cloneState(&next)
- results <- verifyResult{index: task.index, op: op, state: next}
+ results <- verifyResult{index: task.index, op: op, state: next, newDID: existing == nil}
}
}(queues[i])
}
@@ -287,6 +315,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
pendingOps := make([]storage.OperationMutation, 0, batchSize)
pendingSeqs := make([]uint64, 0, batchSize)
pendingBlockHashes := map[uint64]string{}
+ pendingNewDIDs := map[string]struct{}{}
var committed uint64
commit := func() error {
@@ -314,6 +343,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
lastSeq := pendingSeqs[len(pendingSeqs)-1]
atomic.StoreUint64(&s.stats.LastSeq, lastSeq)
atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps)))
+ atomic.AddUint64(&s.stats.DIDCount, uint64(len(pendingNewDIDs)))
committed += uint64(len(pendingOps))
if s.checkpoints != nil {
@@ -329,6 +359,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
pendingOps = pendingOps[:0]
pendingSeqs = pendingSeqs[:0]
clear(pendingBlockHashes)
+ clear(pendingNewDIDs)
return nil
}
@@ -350,6 +381,9 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
}
pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref})
pendingSeqs = append(pendingSeqs, v.op.Sequence)
+ if v.newDID {
+ pendingNewDIDs[v.op.DID] = struct{}{}
+ }
if len(pendingOps) >= batchSize {
if err := commit(); err != nil {
return committed, err
@@ -387,6 +421,10 @@ func (s *Service) createCheckpoint(sequence uint64) error {
float64(usage.RSSPeakKB)/1024.0,
time.Now().UnixMilli(),
)
+ atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
+ if s.metricsSink != nil {
+ s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence)
+ }
return nil
}
@@ -411,10 +449,14 @@ func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) {
if err != nil {
return types.CheckpointV1{}, err
}
- cp, err := s.checkpoints.BuildAndStoreFromStore(seq, blocks)
+ cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(seq, blocks)
if err != nil {
return types.CheckpointV1{}, err
}
+ atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
+ if s.metricsSink != nil {
+ s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence)
+ }
return cp, nil
}
@@ -473,7 +515,6 @@ func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) {
}
func (s *Service) VerifyDID(ctx context.Context, did string) error {
- _ = ctx
if err := s.CorruptionError(); err != nil {
return err
}
@@ -497,6 +538,9 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error {
var previous *types.StateV1
for _, seq := range seqs {
+ if err := ctx.Err(); err != nil {
+ return err
+ }
ref, ok, err := s.store.GetOpSeqRef(seq)
if err != nil {
return err
@@ -523,7 +567,6 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error {
}
func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequence uint64) (string, []uint64, error) {
- _ = ctx
if err := s.CorruptionError(); err != nil {
return "", nil, err
}
@@ -551,6 +594,9 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen
var previous *types.StateV1
var tip string
for _, seq := range filtered {
+ if err := ctx.Err(); err != nil {
+ return "", nil, err
+ }
ref, ok, err := s.store.GetOpSeqRef(seq)
if err != nil {
return "", nil, err
@@ -669,12 +715,37 @@ func (s *Service) Close() {
}
}
+func (s *Service) SetMetricsSink(sink MetricsSink) {
+ s.metricsSink = sink
+}
+
func (s *Service) Stats() Stats {
+ ingested := atomic.LoadUint64(&s.stats.IngestedOps)
+ opsPerSec := 0.0
+ if elapsed := time.Since(s.startedAt).Seconds(); elapsed > 0 {
+ opsPerSec = float64(ingested) / elapsed
+ }
return Stats{
- IngestedOps: atomic.LoadUint64(&s.stats.IngestedOps),
- Errors: atomic.LoadUint64(&s.stats.Errors),
- LastSeq: atomic.LoadUint64(&s.stats.LastSeq),
+ IngestedOps: ingested,
+ Errors: atomic.LoadUint64(&s.stats.Errors),
+ LastSeq: atomic.LoadUint64(&s.stats.LastSeq),
+ VerifyFailures: atomic.LoadUint64(&s.stats.VerifyFailures),
+ LagOps: atomic.LoadUint64(&s.stats.LagOps),
+ DIDCount: atomic.LoadUint64(&s.stats.DIDCount),
+ CheckpointSeq: atomic.LoadUint64(&s.stats.CheckpointSeq),
+ IngestOpsPerSec: opsPerSec,
+ }
+}
+
+func countStates(store storage.Store) (uint64, error) {
+ var count uint64
+ if err := store.ForEachState(func(types.StateV1) error {
+ count++
+ return nil
+ }); err != nil {
+ return 0, err
}
+ return count, nil
}
type appendResult struct {
diff --git a/internal/ingest/service_integration_test.go b/internal/ingest/service_integration_test.go
index d7bd1b6..e01dd4e 100644
--- a/internal/ingest/service_integration_test.go
+++ b/internal/ingest/service_integration_test.go
@@ -155,3 +155,72 @@ func buildSignedRecords(t *testing.T) []types.ExportRecord {
out = append(out, rec1, rec2, rec3)
return out
}
+
+func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) {
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data dir: %v", err)
+ }
+
+ keySeed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(keySeed); err != nil {
+ t.Fatalf("rand seed: %v", err)
+ }
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil {
+ t.Fatalf("write mirror key: %v", err)
+ }
+
+ records := buildSignedRecords(t)
+ sourcePath := filepath.Join(tmp, "sample.ndjson")
+ file, err := os.Create(sourcePath)
+ if err != nil {
+ t.Fatalf("create source: %v", err)
+ }
+ for _, rec := range records {
+ b, _ := json.Marshal(rec)
+ if _, err := fmt.Fprintln(file, string(b)); err != nil {
+ t.Fatalf("write source: %v", err)
+ }
+ }
+ file.Close()
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: sourcePath,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 2,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ PollInterval: 5,
+ }
+ service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath))
+ if err := service.Replay(context.Background()); err != nil {
+ t.Fatalf("replay: %v", err)
+ }
+ if err := service.Flush(context.Background()); err != nil {
+ t.Fatalf("flush: %v", err)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ if _, _, err := service.RecomputeTipAtOrBefore(ctx, "did:plc:alice", 2); err == nil {
+ t.Fatalf("expected cancellation error")
+ }
+}