aboutsummaryrefslogtreecommitdiff
path: root/internal/api
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-26 15:41:45 -0800
committerFuwn <[email protected]>2026-02-26 15:41:45 -0800
commitfec9114caaa7d274e524793d5eb0cf2ef2c5af11 (patch)
tree0897394ccdfaf6633e1a4ca8eb02bff49bb93c00 /internal/api
parentfeat: add read-only PLC API compatibility endpoints (diff)
downloadplutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.tar.xz
plutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.zip
feat: Apply Iku formatting
Diffstat (limited to 'internal/api')
-rw-r--r--internal/api/observability.go48
-rw-r--r--internal/api/plc_compatibility_test.go61
-rw-r--r--internal/api/server.go186
-rw-r--r--internal/api/server_checkpoint_test.go9
-rw-r--r--internal/api/server_hardening_test.go25
-rw-r--r--internal/api/server_integration_test.go65
6 files changed, 385 insertions, 9 deletions
diff --git a/internal/api/observability.go b/internal/api/observability.go
index ebd7711..4654b28 100644
--- a/internal/api/observability.go
+++ b/internal/api/observability.go
@@ -8,7 +8,6 @@ import (
"strings"
"sync"
"time"
-
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/ingest"
"github.com/Fuwn/plutia/internal/storage"
@@ -32,9 +31,11 @@ type serverMetrics struct {
func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.Service) *serverMetrics {
reg := prometheus.NewRegistry()
+
var diskMu sync.Mutex
var diskCached int64
var diskCachedAt time.Time
+
m := &serverMetrics{
registry: reg,
checkpointDuration: prometheus.NewHistogram(prometheus.HistogramOpts{
@@ -47,6 +48,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
Help: "Latest checkpoint sequence generated by this mirror.",
}),
}
+
reg.MustRegister(m.checkpointDuration, m.checkpointSequence)
reg.MustRegister(prometheus.NewCounterFunc(
prometheus.CounterOpts{
@@ -55,9 +57,11 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
},
func() float64 {
seq, err := store.GetGlobalSeq()
+
if err != nil {
return 0
}
+
return float64(seq)
},
))
@@ -70,6 +74,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
if ingestor == nil {
return 0
}
+
return ingestor.Stats().IngestOpsPerSec
},
))
@@ -82,6 +87,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
if ingestor == nil {
return 0
}
+
return float64(ingestor.Stats().LagOps)
},
))
@@ -94,6 +100,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
if ingestor == nil {
return 0
}
+
return float64(ingestor.Stats().VerifyFailures)
},
))
@@ -104,14 +111,18 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
},
func() float64 {
diskMu.Lock()
+
defer diskMu.Unlock()
+
if diskCachedAt.IsZero() || time.Since(diskCachedAt) > 5*time.Second {
size, err := dirSize(cfg.DataDir)
+
if err == nil {
diskCached = size
diskCachedAt = time.Now()
}
}
+
return float64(diskCached)
},
))
@@ -124,14 +135,18 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
if ingestor != nil {
return float64(ingestor.Stats().DIDCount)
}
+
count := uint64(0)
_ = store.ForEachState(func(_ types.StateV1) error {
count++
+
return nil
})
+
return float64(count)
},
))
+
return m
}
@@ -182,18 +197,23 @@ type ipRateLimiter struct {
func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter {
def := config.Default().RateLimit
+
if cfg.ResolveRPS <= 0 {
cfg.ResolveRPS = def.ResolveRPS
}
+
if cfg.ResolveBurst <= 0 {
cfg.ResolveBurst = def.ResolveBurst
}
+
if cfg.ProofRPS <= 0 {
cfg.ProofRPS = def.ProofRPS
}
+
if cfg.ProofBurst <= 0 {
cfg.ProofBurst = def.ProofBurst
}
+
return &ipRateLimiter{
buckets: map[string]*tokenBucket{},
resolve: endpointPolicy{
@@ -210,7 +230,9 @@ func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter {
func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool {
now := time.Now()
+
l.mu.Lock()
+
defer l.mu.Unlock()
if now.Sub(l.lastSweep) > 2*time.Minute {
@@ -219,75 +241,99 @@ func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool {
delete(l.buckets, key)
}
}
+
l.lastSweep = now
}
policy := l.resolve
routeKey := "resolve"
+
if class == limiterProof {
policy = l.proof
routeKey = "proof"
}
+
key := routeKey + "|" + ip
b, ok := l.buckets[key]
+
if !ok {
l.buckets[key] = &tokenBucket{
tokens: policy.burst - 1,
last: now,
lastSeen: now,
}
+
return true
}
+
elapsed := now.Sub(b.last).Seconds()
+
if elapsed > 0 {
b.tokens += elapsed * policy.rps
+
if b.tokens > policy.burst {
b.tokens = policy.burst
}
}
+
b.last = now
b.lastSeen = now
+
if b.tokens < 1 {
return false
}
+
b.tokens--
+
return true
}
func clientIP(r *http.Request) string {
if forwarded := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); forwarded != "" {
parts := strings.Split(forwarded, ",")
+
if len(parts) > 0 {
if ip := strings.TrimSpace(parts[0]); ip != "" {
return ip
}
}
}
+
if realIP := strings.TrimSpace(r.Header.Get("X-Real-IP")); realIP != "" {
return realIP
}
+
host, _, err := net.SplitHostPort(r.RemoteAddr)
+
if err == nil && host != "" {
return host
}
+
return r.RemoteAddr
}
func dirSize(path string) (int64, error) {
var total int64
+
err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error {
if err != nil {
return err
}
+
if d.IsDir() {
return nil
}
+
info, err := d.Info()
+
if err != nil {
return err
}
+
total += info.Size()
+
return nil
})
+
return total, err
}
diff --git a/internal/api/plc_compatibility_test.go b/internal/api/plc_compatibility_test.go
index 67cbafa..b914278 100644
--- a/internal/api/plc_compatibility_test.go
+++ b/internal/api/plc_compatibility_test.go
@@ -14,7 +14,6 @@ import (
"strings"
"testing"
"time"
-
"github.com/Fuwn/plutia/internal/checkpoint"
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/ingest"
@@ -24,28 +23,37 @@ import (
func TestPLCCompatibilityGetDIDMatchesStoredDocument(t *testing.T) {
ts, store, _, cleanup := newCompatibilityServer(t)
+
defer cleanup()
state, ok, err := store.GetState("did:plc:alice")
+
if err != nil {
t.Fatalf("get state: %v", err)
}
+
if !ok {
t.Fatalf("state not found")
}
resp, err := http.Get(ts.URL + "/did:plc:alice")
+
if err != nil {
t.Fatalf("get did: %v", err)
}
+
defer resp.Body.Close()
+
if resp.StatusCode != http.StatusOK {
t.Fatalf("status: got %d want 200", resp.StatusCode)
}
+
if got := resp.Header.Get("Content-Type"); !strings.Contains(got, "application/did+ld+json") {
t.Fatalf("content-type mismatch: %s", got)
}
+
body, _ := io.ReadAll(resp.Body)
+
if strings.TrimSpace(string(body)) != strings.TrimSpace(string(state.DIDDocument)) {
t.Fatalf("did document mismatch\n got: %s\nwant: %s", string(body), string(state.DIDDocument))
}
@@ -53,26 +61,35 @@ func TestPLCCompatibilityGetDIDMatchesStoredDocument(t *testing.T) {
func TestPLCCompatibilityGetLogOrdered(t *testing.T) {
ts, _, recs, cleanup := newCompatibilityServer(t)
+
defer cleanup()
resp, err := http.Get(ts.URL + "/did:plc:alice/log")
+
if err != nil {
t.Fatalf("get log: %v", err)
}
+
defer resp.Body.Close()
+
if resp.StatusCode != http.StatusOK {
t.Fatalf("status: got %d want 200", resp.StatusCode)
}
+
var ops []map[string]any
+
if err := json.NewDecoder(resp.Body).Decode(&ops); err != nil {
t.Fatalf("decode log: %v", err)
}
+
if len(ops) != 2 {
t.Fatalf("log length mismatch: got %d want 2", len(ops))
}
+
if _, ok := ops[0]["prev"]; ok {
t.Fatalf("first op should be genesis without prev")
}
+
if prev, _ := ops[1]["prev"].(string); prev != recs[0].CID {
t.Fatalf("second op prev mismatch: got %q want %q", prev, recs[0].CID)
}
@@ -80,29 +97,39 @@ func TestPLCCompatibilityGetLogOrdered(t *testing.T) {
func TestPLCCompatibilityExportCount(t *testing.T) {
ts, _, _, cleanup := newCompatibilityServer(t)
+
defer cleanup()
resp, err := http.Get(ts.URL + "/export?count=2")
+
if err != nil {
t.Fatalf("get export: %v", err)
}
+
defer resp.Body.Close()
+
if resp.StatusCode != http.StatusOK {
t.Fatalf("status: got %d want 200", resp.StatusCode)
}
+
if got := resp.Header.Get("Content-Type"); !strings.Contains(got, "application/jsonlines") {
t.Fatalf("content-type mismatch: %s", got)
}
+
body, _ := io.ReadAll(resp.Body)
lines := strings.Split(strings.TrimSpace(string(body)), "\n")
+
if len(lines) != 2 {
t.Fatalf("line count mismatch: got %d want 2", len(lines))
}
+
for _, line := range lines {
var entry map[string]any
+
if err := json.Unmarshal([]byte(line), &entry); err != nil {
t.Fatalf("decode export line: %v", err)
}
+
for _, key := range []string{"did", "operation", "cid", "nullified", "createdAt"} {
if _, ok := entry[key]; !ok {
t.Fatalf("missing export key %q in %v", key, entry)
@@ -113,20 +140,27 @@ func TestPLCCompatibilityExportCount(t *testing.T) {
func TestPLCCompatibilityPostIsMethodNotAllowed(t *testing.T) {
ts, _, _, cleanup := newCompatibilityServer(t)
+
defer cleanup()
req, err := http.NewRequest(http.MethodPost, ts.URL+"/did:plc:alice", strings.NewReader(`{}`))
+
if err != nil {
t.Fatalf("new request: %v", err)
}
+
resp, err := http.DefaultClient.Do(req)
+
if err != nil {
t.Fatalf("post did: %v", err)
}
+
defer resp.Body.Close()
+
if resp.StatusCode != http.StatusMethodNotAllowed {
t.Fatalf("status: got %d want 405", resp.StatusCode)
}
+
if allow := resp.Header.Get("Allow"); allow != http.MethodGet {
t.Fatalf("allow header mismatch: got %q want %q", allow, http.MethodGet)
}
@@ -134,14 +168,19 @@ func TestPLCCompatibilityPostIsMethodNotAllowed(t *testing.T) {
func TestPLCCompatibilityNoVerificationMetadataLeak(t *testing.T) {
ts, _, _, cleanup := newCompatibilityServer(t)
+
defer cleanup()
resp, err := http.Get(ts.URL + "/did:plc:alice")
+
if err != nil {
t.Fatalf("get did: %v", err)
}
+
defer resp.Body.Close()
+
body, _ := io.ReadAll(resp.Body)
+
if strings.Contains(string(body), "checkpoint_reference") {
t.Fatalf("compatibility endpoint leaked verification metadata: %s", string(body))
}
@@ -149,48 +188,63 @@ func TestPLCCompatibilityNoVerificationMetadataLeak(t *testing.T) {
func TestPLCCompatibilityProofEndpointStillWorks(t *testing.T) {
ts, _, _, cleanup := newCompatibilityServer(t)
+
defer cleanup()
resp, err := http.Get(ts.URL + "/did/did:plc:alice/proof")
+
if err != nil {
t.Fatalf("get proof: %v", err)
}
+
defer resp.Body.Close()
+
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
+
t.Fatalf("proof status: got %d want 200 body=%s", resp.StatusCode, string(body))
}
}
func newCompatibilityServer(t *testing.T) (*httptest.Server, *storage.PebbleStore, []types.ExportRecord, func()) {
t.Helper()
+
tmp := t.TempDir()
dataDir := filepath.Join(tmp, "data")
+
if err := os.MkdirAll(dataDir, 0o755); err != nil {
t.Fatalf("mkdir data: %v", err)
}
seed := make([]byte, ed25519.SeedSize)
+
if _, err := rand.Read(seed); err != nil {
t.Fatalf("seed: %v", err)
}
+
keyPath := filepath.Join(tmp, "mirror.key")
+
if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil {
t.Fatalf("write key: %v", err)
}
recs := buildCheckpointScenarioRecords(t)
sourcePath := filepath.Join(tmp, "records.ndjson")
+
writeRecordsFile(t, sourcePath, recs)
store, err := storage.OpenPebble(dataDir)
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
if err := store.SetMode(config.ModeMirror); err != nil {
t.Fatalf("set mode: %v", err)
}
+
bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+
if err != nil {
t.Fatalf("open block log: %v", err)
}
@@ -212,12 +266,15 @@ func newCompatibilityServer(t *testing.T) (*httptest.Server, *storage.PebbleStor
}
cpMgr := checkpoint.NewManager(store, dataDir, keyPath)
svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr)
+
if err := svc.Replay(context.Background()); err != nil {
t.Fatalf("replay: %v", err)
}
+
if err := svc.Flush(context.Background()); err != nil {
t.Fatalf("flush: %v", err)
}
+
if _, err := svc.Snapshot(context.Background()); err != nil {
t.Fatalf("snapshot: %v", err)
}
@@ -226,7 +283,9 @@ func newCompatibilityServer(t *testing.T) (*httptest.Server, *storage.PebbleStor
cleanup := func() {
ts.Close()
svc.Close()
+
_ = store.Close()
}
+
return ts, store, recs, cleanup
}
diff --git a/internal/api/server.go b/internal/api/server.go
index f773145..2a3c589 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -10,7 +10,6 @@ import (
"strconv"
"strings"
"time"
-
"github.com/Fuwn/plutia/internal/checkpoint"
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/ingest"
@@ -44,21 +43,27 @@ func NewServer(cfg config.Config, store storage.Store, ingestor *ingest.Service,
},
limiter: newIPRateLimiter(cfg.RateLimit),
}
+
for _, opt := range opts {
opt(s)
}
+
s.metrics = newServerMetrics(cfg, store, ingestor)
+
if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok {
s.metrics.checkpointSequence.Set(float64(cp.Sequence))
}
+
if ingestor != nil {
ingestor.SetMetricsSink(s.metrics)
}
+
return s
}
func (s *Server) Handler() http.Handler {
mux := http.NewServeMux()
+
mux.Handle("/health", s.withTimeout(http.HandlerFunc(s.handleHealth)))
mux.Handle("/metrics", s.metrics.Handler())
mux.Handle("/status", s.withTimeout(http.HandlerFunc(s.handleStatus)))
@@ -67,6 +72,7 @@ func (s *Server) Handler() http.Handler {
mux.Handle("/did/", s.withTimeout(http.HandlerFunc(s.handleDID)))
mux.Handle("/export", s.withTimeout(http.HandlerFunc(s.handleExportCompatibility)))
mux.Handle("/", s.withTimeout(http.HandlerFunc(s.handlePLCCompatibility)))
+
return mux
}
@@ -76,19 +82,27 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
seq, err := s.store.GetGlobalSeq()
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
cp, ok, err := s.store.GetLatestCheckpoint()
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
stats := ingest.Stats{}
+
if s.ingestor != nil {
stats = s.ingestor.Stats()
}
+
payload := map[string]any{
"mode": s.cfg.Mode,
"verify_policy": s.cfg.VerifyPolicy,
@@ -96,161 +110,225 @@ func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
"stats": stats,
"build": s.build,
}
+
if s.ingestor != nil {
payload["corrupted"] = s.ingestor.IsCorrupted()
+
if err := s.ingestor.CorruptionError(); err != nil {
payload["corruption_error"] = err.Error()
}
}
+
if ok {
payload["latest_checkpoint"] = cp
}
+
writeJSON(w, http.StatusOK, payload)
}
func (s *Server) handleLatestCheckpoint(w http.ResponseWriter, r *http.Request) {
cp, ok, err := s.store.GetLatestCheckpoint()
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
if !ok {
writeErr(w, http.StatusNotFound, fmt.Errorf("no checkpoint"))
+
return
}
+
writeJSON(w, http.StatusOK, cp)
}
func (s *Server) handleCheckpointBySequence(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/checkpoints/")
+
if path == "" {
writeErr(w, http.StatusNotFound, fmt.Errorf("missing checkpoint sequence"))
+
return
}
+
seq, err := strconv.ParseUint(path, 10, 64)
+
if err != nil {
writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid checkpoint sequence"))
+
return
}
+
cp, ok, err := s.store.GetCheckpoint(seq)
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
if !ok {
writeErr(w, http.StatusNotFound, fmt.Errorf("checkpoint not found"))
+
return
}
+
writeJSON(w, http.StatusOK, cp)
}
func (s *Server) handleDID(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/did/")
+
if path == "" {
writeErr(w, http.StatusBadRequest, fmt.Errorf("missing did"))
+
return
}
+
if strings.HasSuffix(path, "/proof") {
did := strings.TrimSuffix(path, "/proof")
+
if !s.allowRequest(r, limiterProof) {
writeErr(w, http.StatusTooManyRequests, fmt.Errorf("proof rate limit exceeded"))
+
return
}
+
s.handleDIDProof(w, r, did)
+
return
}
+
if !s.allowRequest(r, limiterResolve) {
writeErr(w, http.StatusTooManyRequests, fmt.Errorf("resolve rate limit exceeded"))
+
return
}
+
s.handleDIDResolve(w, r, path)
}
func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did string) {
state, ok, err := s.store.GetState(did)
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
if !ok {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
cp, cpOK, err := s.store.GetLatestCheckpoint()
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
resp := map[string]any{
"did": did,
"did_document": json.RawMessage(state.DIDDocument),
"chain_tip_hash": state.ChainTipHash,
}
+
if cpOK {
resp["checkpoint_reference"] = map[string]any{
"sequence": cp.Sequence,
"checkpoint_hash": cp.CheckpointHash,
}
}
+
writeJSON(w, http.StatusOK, resp)
}
func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did string) {
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
+
if err := s.ingestor.CorruptionError(); err != nil {
writeErr(w, http.StatusServiceUnavailable, err)
+
return
}
cp, verifyCheckpointUnchanged, err := s.selectCheckpointForProof(r)
+
if err != nil {
writeErr(w, http.StatusBadRequest, err)
+
return
}
tipHash, seqs, err := s.ingestor.RecomputeTipAtOrBefore(r.Context(), did, cp.Sequence)
+
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusNotFound, err)
+
return
}
+
siblings, leafHash, found, err := s.checkpoints.BuildDIDProofAtCheckpoint(r.Context(), did, tipHash, cp.Sequence)
+
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
if !found {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not present in checkpoint state"))
+
return
}
leafBytes, err := hex.DecodeString(leafHash)
+
if err != nil {
writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid leaf hash: %w", err))
+
return
}
+
root, err := hex.DecodeString(cp.DIDMerkleRoot)
+
if err != nil {
writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid checkpoint root"))
+
return
}
+
if !merkle.VerifyProof(leafBytes, siblings, root) {
writeErr(w, http.StatusInternalServerError, fmt.Errorf("inclusion proof failed consistency check"))
+
return
}
if err := verifyCheckpointUnchanged(); err != nil {
writeErr(w, http.StatusConflict, err)
+
return
}
@@ -265,6 +343,7 @@ func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did stri
CheckpointSig: cp.Signature,
CheckpointKeyID: cp.KeyID,
}
+
writeJSON(w, http.StatusOK, map[string]any{
"did": did,
"checkpoint_sequence": cp.Sequence,
@@ -287,28 +366,39 @@ type plcAuditEntry struct {
func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/")
+
if path == "" {
writeErr(w, http.StatusNotFound, fmt.Errorf("not found"))
+
return
}
+
if path == "export" {
s.handleExportCompatibility(w, r)
+
return
}
+
parts := strings.Split(path, "/")
did := parts[0]
+
if !strings.HasPrefix(did, "did:") {
writeErr(w, http.StatusNotFound, fmt.Errorf("not found"))
+
return
}
+
if r.Method == http.MethodPost && len(parts) == 1 {
w.Header().Set("Allow", http.MethodGet)
writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("write operations are not supported by this mirror"))
+
return
}
+
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed"))
+
return
}
@@ -330,90 +420,129 @@ func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request)
func (s *Server) handleGetDIDCompatibility(w http.ResponseWriter, did string) {
state, ok, err := s.store.GetState(did)
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
if !ok {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
status := http.StatusOK
+
if isTombstonedDIDDocument(state.DIDDocument) {
status = http.StatusGone
}
+
w.Header().Set("Content-Type", "application/did+ld+json")
w.WriteHeader(status)
+
_, _ = w.Write(state.DIDDocument)
}
func (s *Server) handleGetDIDLogCompatibility(w http.ResponseWriter, r *http.Request, did string) {
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
+
logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did)
+
if err != nil {
if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
ops := make([]json.RawMessage, 0, len(logEntries))
+
for _, rec := range logEntries {
ops = append(ops, rec.Operation)
}
+
writeJSONWithContentType(w, http.StatusOK, "application/json", ops)
}
func (s *Server) handleGetDIDLogLastCompatibility(w http.ResponseWriter, r *http.Request, did string) {
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
+
rec, err := s.ingestor.LoadLatestDIDOperation(r.Context(), did)
+
if err != nil {
if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
+
_, _ = w.Write(rec.Operation)
}
func (s *Server) handleGetDIDLogAuditCompatibility(w http.ResponseWriter, r *http.Request, did string) {
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
+
logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did)
+
if err != nil {
if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
audit := make([]plcAuditEntry, 0, len(logEntries))
+
for _, rec := range logEntries {
audit = append(audit, plcAuditEntry{
DID: did,
@@ -423,27 +552,37 @@ func (s *Server) handleGetDIDLogAuditCompatibility(w http.ResponseWriter, r *htt
CreatedAt: rec.CreatedAt,
})
}
+
writeJSONWithContentType(w, http.StatusOK, "application/json", audit)
}
func (s *Server) handleGetDIDDataCompatibility(w http.ResponseWriter, r *http.Request, did string) {
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
+
data, err := s.ingestor.LoadCurrentPLCData(r.Context(), did)
+
if err != nil {
if errors.Is(err, ingest.ErrDIDNotFound) {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
writeJSONWithContentType(w, http.StatusOK, "application/json", data)
}
@@ -451,38 +590,51 @@ func (s *Server) handleExportCompatibility(w http.ResponseWriter, r *http.Reques
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed"))
+
return
}
+
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
count := 1000
+
if rawCount := strings.TrimSpace(r.URL.Query().Get("count")); rawCount != "" {
n, err := strconv.Atoi(rawCount)
+
if err != nil || n < 1 {
writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid count query parameter"))
+
return
}
+
if n > 1000 {
n = 1000
}
+
count = n
}
var after time.Time
+
if rawAfter := strings.TrimSpace(r.URL.Query().Get("after")); rawAfter != "" {
parsed, err := time.Parse(time.RFC3339, rawAfter)
+
if err != nil {
writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid after query parameter"))
+
return
}
+
after = parsed
}
w.Header().Set("Content-Type", "application/jsonlines")
w.WriteHeader(http.StatusOK)
+
flusher, _ := w.(http.Flusher)
enc := json.NewEncoder(w)
err := s.ingestor.StreamExport(r.Context(), after, count, func(rec types.ExportRecord) error {
@@ -493,14 +645,18 @@ func (s *Server) handleExportCompatibility(w http.ResponseWriter, r *http.Reques
Nullified: rec.Nullified,
CreatedAt: rec.CreatedAt,
}
+
if err := enc.Encode(entry); err != nil {
return err
}
+
if flusher != nil {
flusher.Flush()
}
+
return nil
})
+
if err != nil {
// Response has already started; best effort termination.
return
@@ -511,22 +667,30 @@ func isTombstonedDIDDocument(raw []byte) bool {
if len(raw) == 0 {
return false
}
+
var doc map[string]any
+
if err := json.Unmarshal(raw, &doc); err != nil {
return false
}
+
deactivated, _ := doc["deactivated"].(bool)
+
return deactivated
}
func (s *Server) withTimeout(next http.Handler) http.Handler {
timeout := s.cfg.RequestTimeout
+
if timeout <= 0 {
timeout = 10 * time.Second
}
+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), timeout)
+
defer cancel()
+
next.ServeHTTP(w, r.WithContext(ctx))
})
}
@@ -535,53 +699,70 @@ func (s *Server) allowRequest(r *http.Request, class limiterClass) bool {
if s.limiter == nil {
return true
}
+
return s.limiter.Allow(clientIP(r), class)
}
func (s *Server) selectCheckpointForProof(r *http.Request) (types.CheckpointV1, func() error, error) {
checkpointParam := strings.TrimSpace(r.URL.Query().Get("checkpoint"))
+
if checkpointParam == "" {
cp, ok, err := s.store.GetLatestCheckpoint()
+
if err != nil {
return types.CheckpointV1{}, nil, err
}
+
if !ok {
return types.CheckpointV1{}, nil, fmt.Errorf("no checkpoint available")
}
+
return cp, func() error {
now, ok, err := s.store.GetLatestCheckpoint()
+
if err != nil {
return err
}
+
if !ok {
return fmt.Errorf("latest checkpoint disappeared during request")
}
+
if now.CheckpointHash != cp.CheckpointHash {
return fmt.Errorf("checkpoint advanced during proof generation")
}
+
return nil
}, nil
}
seq, err := strconv.ParseUint(checkpointParam, 10, 64)
+
if err != nil {
return types.CheckpointV1{}, nil, fmt.Errorf("invalid checkpoint query parameter")
}
+
cp, ok, err := s.store.GetCheckpoint(seq)
+
if err != nil {
return types.CheckpointV1{}, nil, err
}
+
if !ok {
return types.CheckpointV1{}, nil, fmt.Errorf("checkpoint %d unavailable", seq)
}
+
return cp, func() error {
again, ok, err := s.store.GetCheckpoint(seq)
+
if err != nil {
return err
}
+
if !ok || again.CheckpointHash != cp.CheckpointHash {
return fmt.Errorf("checkpoint %d changed during proof generation", seq)
}
+
return nil
}, nil
}
@@ -592,10 +773,13 @@ func writeJSON(w http.ResponseWriter, code int, v any) {
func writeJSONWithContentType(w http.ResponseWriter, code int, contentType string, v any) {
w.Header().Set("Content-Type", "application/json")
+
if strings.TrimSpace(contentType) != "" {
w.Header().Set("Content-Type", contentType)
}
+
w.WriteHeader(code)
+
_ = json.NewEncoder(w).Encode(v)
}
diff --git a/internal/api/server_checkpoint_test.go b/internal/api/server_checkpoint_test.go
index 9a12f61..531f24d 100644
--- a/internal/api/server_checkpoint_test.go
+++ b/internal/api/server_checkpoint_test.go
@@ -5,7 +5,6 @@ import (
"path/filepath"
"strings"
"testing"
-
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/storage"
"github.com/Fuwn/plutia/internal/types"
@@ -14,9 +13,11 @@ import (
func TestSelectCheckpointForProofDetectsLatestAdvance(t *testing.T) {
dataDir := t.TempDir()
store, err := storage.OpenPebble(dataDir)
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
if err := store.PutCheckpoint(types.CheckpointV1{
@@ -32,9 +33,11 @@ func TestSelectCheckpointForProofDetectsLatestAdvance(t *testing.T) {
srv := NewServer(config.Default(), store, nil, nil)
req := httptest.NewRequest("GET", "/did/did:plc:alice/proof", nil)
cp, verifyUnchanged, err := srv.selectCheckpointForProof(req)
+
if err != nil {
t.Fatalf("select checkpoint: %v", err)
}
+
if cp.Sequence != 10 {
t.Fatalf("selected unexpected checkpoint sequence: got %d want 10", cp.Sequence)
}
@@ -57,9 +60,11 @@ func TestSelectCheckpointForProofDetectsLatestAdvance(t *testing.T) {
func TestSelectCheckpointForProofDetectsHistoricalMutation(t *testing.T) {
dataDir := t.TempDir()
store, err := storage.OpenPebble(filepath.Clean(dataDir))
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
if err := store.PutCheckpoint(types.CheckpointV1{
@@ -75,9 +80,11 @@ func TestSelectCheckpointForProofDetectsHistoricalMutation(t *testing.T) {
srv := NewServer(config.Default(), store, nil, nil)
req := httptest.NewRequest("GET", "/did/did:plc:alice/proof?checkpoint=20", nil)
cp, verifyUnchanged, err := srv.selectCheckpointForProof(req)
+
if err != nil {
t.Fatalf("select historical checkpoint: %v", err)
}
+
if cp.CheckpointHash != "cp-20-a" {
t.Fatalf("selected unexpected checkpoint hash: got %s", cp.CheckpointHash)
}
diff --git a/internal/api/server_hardening_test.go b/internal/api/server_hardening_test.go
index bb9f24c..fcdf87b 100644
--- a/internal/api/server_hardening_test.go
+++ b/internal/api/server_hardening_test.go
@@ -8,7 +8,6 @@ import (
"strings"
"testing"
"time"
-
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/storage"
"github.com/Fuwn/plutia/internal/types"
@@ -16,10 +15,13 @@ import (
func TestResolveRateLimitPerIP(t *testing.T) {
store, err := storage.OpenPebble(t.TempDir())
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
+
if err := store.PutState(types.StateV1{Version: 1, DID: "did:plc:alice", DIDDocument: []byte(`{"id":"did:plc:alice"}`), ChainTipHash: "tip", LatestOpSeq: 1, UpdatedAt: time.Now().UTC()}); err != nil {
t.Fatalf("put state: %v", err)
}
@@ -31,11 +33,12 @@ func TestResolveRateLimitPerIP(t *testing.T) {
cfg.RateLimit.ProofRPS = 1
cfg.RateLimit.ProofBurst = 1
h := NewServer(cfg, store, nil, nil).Handler()
-
req1 := httptest.NewRequest(http.MethodGet, "/did/did:plc:alice", nil)
req1.RemoteAddr = "203.0.113.7:12345"
rr1 := httptest.NewRecorder()
+
h.ServeHTTP(rr1, req1)
+
if rr1.Code != http.StatusOK {
t.Fatalf("first request status: got %d want %d", rr1.Code, http.StatusOK)
}
@@ -43,7 +46,9 @@ func TestResolveRateLimitPerIP(t *testing.T) {
req2 := httptest.NewRequest(http.MethodGet, "/did/did:plc:alice", nil)
req2.RemoteAddr = "203.0.113.7:12345"
rr2 := httptest.NewRecorder()
+
h.ServeHTTP(rr2, req2)
+
if rr2.Code != http.StatusTooManyRequests {
t.Fatalf("second request status: got %d want %d", rr2.Code, http.StatusTooManyRequests)
}
@@ -51,9 +56,11 @@ func TestResolveRateLimitPerIP(t *testing.T) {
func TestStatusIncludesBuildInfo(t *testing.T) {
store, err := storage.OpenPebble(t.TempDir())
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
h := NewServer(config.Default(), store, nil, nil, WithBuildInfo(BuildInfo{
@@ -62,25 +69,31 @@ func TestStatusIncludesBuildInfo(t *testing.T) {
BuildDate: "2026-02-26T00:00:00Z",
GoVersion: "go1.test",
})).Handler()
-
req := httptest.NewRequest(http.MethodGet, "/status", nil)
rr := httptest.NewRecorder()
+
h.ServeHTTP(rr, req)
+
if rr.Code != http.StatusOK {
t.Fatalf("status code: got %d want %d", rr.Code, http.StatusOK)
}
var payload map[string]any
+
if err := json.Unmarshal(rr.Body.Bytes(), &payload); err != nil {
t.Fatalf("decode status: %v", err)
}
+
build, ok := payload["build"].(map[string]any)
+
if !ok {
t.Fatalf("missing build section: %v", payload)
}
+
if got := build["version"]; got != "v0.1.0" {
t.Fatalf("unexpected build version: %v", got)
}
+
if got := build["commit"]; got != "abc123" {
t.Fatalf("unexpected build commit: %v", got)
}
@@ -88,20 +101,26 @@ func TestStatusIncludesBuildInfo(t *testing.T) {
func TestMetricsExposeRequiredSeries(t *testing.T) {
store, err := storage.OpenPebble(t.TempDir())
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
h := NewServer(config.Default(), store, nil, nil).Handler()
req := httptest.NewRequest(http.MethodGet, "/metrics", nil)
rr := httptest.NewRecorder()
+
h.ServeHTTP(rr, req)
+
if rr.Code != http.StatusOK {
t.Fatalf("metrics status: got %d want %d", rr.Code, http.StatusOK)
}
+
body, _ := io.ReadAll(rr.Body)
text := string(body)
+
for _, metric := range []string{
"ingest_ops_total",
"ingest_ops_per_second",
diff --git a/internal/api/server_integration_test.go b/internal/api/server_integration_test.go
index 1736750..edb64bd 100644
--- a/internal/api/server_integration_test.go
+++ b/internal/api/server_integration_test.go
@@ -14,7 +14,6 @@ import (
"path/filepath"
"strings"
"testing"
-
"github.com/Fuwn/plutia/internal/checkpoint"
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/ingest"
@@ -25,32 +24,41 @@ import (
func TestProofAgainstOlderCheckpointAfterFurtherIngest(t *testing.T) {
tmp := t.TempDir()
dataDir := filepath.Join(tmp, "data")
+
if err := os.MkdirAll(dataDir, 0o755); err != nil {
t.Fatalf("mkdir data: %v", err)
}
keyPath := filepath.Join(tmp, "mirror.key")
seed := make([]byte, ed25519.SeedSize)
+
if _, err := rand.Read(seed); err != nil {
t.Fatalf("seed: %v", err)
}
+
if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil {
t.Fatalf("write key: %v", err)
}
recs := buildCheckpointScenarioRecords(t)
sourcePath := filepath.Join(tmp, "records.ndjson")
+
writeRecordsFile(t, sourcePath, recs)
store, err := storage.OpenPebble(dataDir)
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
+
if err := store.SetMode(config.ModeMirror); err != nil {
t.Fatalf("set mode: %v", err)
}
+
bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+
if err != nil {
t.Fatalf("open block log: %v", err)
}
@@ -69,29 +77,37 @@ func TestProofAgainstOlderCheckpointAfterFurtherIngest(t *testing.T) {
}
cpMgr := checkpoint.NewManager(store, dataDir, keyPath)
svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr)
+
if err := svc.Replay(context.Background()); err != nil {
t.Fatalf("replay: %v", err)
}
+
if err := svc.Flush(context.Background()); err != nil {
t.Fatalf("flush: %v", err)
}
cp2, ok, err := store.GetCheckpoint(2)
+
if err != nil || !ok {
t.Fatalf("checkpoint 2 missing: ok=%v err=%v", ok, err)
}
ts := httptest.NewServer(NewServer(cfg, store, svc, cpMgr).Handler())
+
defer ts.Close()
url := ts.URL + "/did/" + strings.ReplaceAll("did:plc:alice", ":", "%3A") + "/proof?checkpoint=2"
resp, err := http.Get(url)
+
if err != nil {
t.Fatalf("get proof: %v", err)
}
+
defer resp.Body.Close()
+
if resp.StatusCode != http.StatusOK {
bodyBytes, _ := io.ReadAll(resp.Body)
+
t.Fatalf("proof status: %d body=%s", resp.StatusCode, string(bodyBytes))
}
@@ -102,15 +118,19 @@ func TestProofAgainstOlderCheckpointAfterFurtherIngest(t *testing.T) {
MerkleRoot string `json:"merkle_root"`
} `json:"inclusion_proof"`
}
+
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
t.Fatalf("decode response: %v", err)
}
+
if body.CheckpointSequence != 2 {
t.Fatalf("unexpected checkpoint sequence: got %d want 2", body.CheckpointSequence)
}
+
if body.ChainTipReference != recs[0].CID {
t.Fatalf("expected old tip at checkpoint=2: got %s want %s", body.ChainTipReference, recs[0].CID)
}
+
if body.InclusionProof.MerkleRoot != cp2.DIDMerkleRoot {
t.Fatalf("merkle root mismatch: got %s want %s", body.InclusionProof.MerkleRoot, cp2.DIDMerkleRoot)
}
@@ -119,34 +139,44 @@ func TestProofAgainstOlderCheckpointAfterFurtherIngest(t *testing.T) {
func TestCorruptedBlockRefusesProof(t *testing.T) {
tmp := t.TempDir()
dataDir := filepath.Join(tmp, "data")
+
if err := os.MkdirAll(dataDir, 0o755); err != nil {
t.Fatalf("mkdir data: %v", err)
}
seed := make([]byte, ed25519.SeedSize)
+
if _, err := rand.Read(seed); err != nil {
t.Fatalf("seed: %v", err)
}
+
keyPath := filepath.Join(tmp, "mirror.key")
+
if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil {
t.Fatalf("write key: %v", err)
}
recs := buildCheckpointScenarioRecords(t)
sourcePath := filepath.Join(tmp, "records.ndjson")
+
writeRecordsFile(t, sourcePath, recs)
store, err := storage.OpenPebble(dataDir)
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
if err := store.SetMode(config.ModeMirror); err != nil {
t.Fatalf("set mode: %v", err)
}
+
bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+
if err != nil {
t.Fatalf("open block log: %v", err)
}
+
cfg := config.Config{
Mode: config.ModeMirror,
DataDir: dataDir,
@@ -161,49 +191,67 @@ func TestCorruptedBlockRefusesProof(t *testing.T) {
}
cpMgr := checkpoint.NewManager(store, dataDir, keyPath)
svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr)
+
if err := svc.Replay(context.Background()); err != nil {
t.Fatalf("replay: %v", err)
}
+
if _, err := svc.Snapshot(context.Background()); err != nil {
t.Fatalf("snapshot: %v", err)
}
+
svc.Close()
+
if err := store.Close(); err != nil {
t.Fatalf("close store: %v", err)
}
blockPath := filepath.Join(dataDir, "ops", "000001.zst")
b, err := os.ReadFile(blockPath)
+
if err != nil {
t.Fatalf("read block: %v", err)
}
+
b[len(b)/2] ^= 0xFF
+
if err := os.WriteFile(blockPath, b, 0o644); err != nil {
t.Fatalf("write corrupted block: %v", err)
}
store2, err := storage.OpenPebble(dataDir)
+
if err != nil {
t.Fatalf("open store2: %v", err)
}
+
defer store2.Close()
+
bl2, err := storage.OpenBlockLog(dataDir, 3, 4)
+
if err != nil {
t.Fatalf("open blocklog2: %v", err)
}
+
svc2 := ingest.NewService(cfg, store2, ingest.NewClient("file:///nonexistent"), bl2, checkpoint.NewManager(store2, dataDir, keyPath))
+
if !svc2.IsCorrupted() {
t.Fatalf("expected service to detect corruption on restart")
}
ts := httptest.NewServer(NewServer(cfg, store2, svc2, checkpoint.NewManager(store2, dataDir, keyPath)).Handler())
+
defer ts.Close()
+
url := ts.URL + "/did/" + strings.ReplaceAll("did:plc:alice", ":", "%3A") + "/proof"
resp, err := http.Get(url)
+
if err != nil {
t.Fatalf("request proof: %v", err)
}
+
defer resp.Body.Close()
+
if resp.StatusCode != http.StatusServiceUnavailable {
t.Fatalf("expected 503 for corrupted proof, got %d", resp.StatusCode)
}
@@ -211,13 +259,18 @@ func TestCorruptedBlockRefusesProof(t *testing.T) {
func writeRecordsFile(t *testing.T, path string, recs []types.ExportRecord) {
t.Helper()
+
f, err := os.Create(path)
+
if err != nil {
t.Fatalf("create records file: %v", err)
}
+
defer f.Close()
+
for _, rec := range recs {
b, _ := json.Marshal(rec)
+
if _, err := fmt.Fprintln(f, string(b)); err != nil {
t.Fatalf("write record: %v", err)
}
@@ -226,36 +279,44 @@ func writeRecordsFile(t *testing.T, path string, recs []types.ExportRecord) {
func buildCheckpointScenarioRecords(t *testing.T) []types.ExportRecord {
t.Helper()
+
pub, priv, err := ed25519.GenerateKey(rand.Reader)
+
if err != nil {
t.Fatalf("generate key: %v", err)
}
+
mk := func(seq uint64, did, prev string) types.ExportRecord {
unsigned := map[string]any{
"did": did,
"didDoc": map[string]any{"id": did, "seq": seq},
"publicKey": base64.RawURLEncoding.EncodeToString(pub),
}
+
if prev != "" {
unsigned["prev"] = prev
}
+
payload, _ := json.Marshal(unsigned)
canon, _ := types.CanonicalizeJSON(payload)
sig := ed25519.Sign(priv, canon)
op := map[string]any{}
+
for k, v := range unsigned {
op[k] = v
}
+
op["sigPayload"] = base64.RawURLEncoding.EncodeToString(canon)
op["sig"] = base64.RawURLEncoding.EncodeToString(sig)
raw, _ := json.Marshal(op)
opCanon, _ := types.CanonicalizeJSON(raw)
cid := types.ComputeDigestCID(opCanon)
+
return types.ExportRecord{Seq: seq, DID: did, CID: cid, Operation: raw}
}
-
r1 := mk(1, "did:plc:alice", "")
r2 := mk(2, "did:plc:bob", "")
r3 := mk(3, "did:plc:alice", r1.CID)
+
return []types.ExportRecord{r1, r2, r3}
}