aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/service_export_integrity_test.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-27 08:48:17 -0800
committerFuwn <[email protected]>2026-02-27 08:48:17 -0800
commit219b220b99b509e65cb1325ba67f00264e7cc25c (patch)
tree746c71e7f8c1e7a253bfe4a6991dc1af6d839fe0 /internal/ingest/service_export_integrity_test.go
parentchore: improve user-facing API copy clarity and consistency (diff)
downloadplutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.tar.xz
plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.zip
fix: make mirror replay lossless with strict seq accounting and trace
Diffstat (limited to 'internal/ingest/service_export_integrity_test.go')
-rw-r--r--internal/ingest/service_export_integrity_test.go357
1 files changed, 357 insertions, 0 deletions
diff --git a/internal/ingest/service_export_integrity_test.go b/internal/ingest/service_export_integrity_test.go
new file mode 100644
index 0000000..92a1703
--- /dev/null
+++ b/internal/ingest/service_export_integrity_test.go
@@ -0,0 +1,357 @@
+package ingest
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "github.com/Fuwn/plutia/internal/checkpoint"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestReplayHandlesTruncatedTailAndOverlappingRetryWithoutGaps(t *testing.T) {
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data dir: %v", err)
+ }
+
+ keySeed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(keySeed); err != nil {
+ t.Fatalf("rand seed: %v", err)
+ }
+
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil {
+ t.Fatalf("write mirror key: %v", err)
+ }
+
+ records := buildSignedChainRecords(t, 6)
+
+ var (
+ mu sync.Mutex
+ attempts = map[uint64]int{}
+ )
+
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ t.Helper()
+
+ q := r.URL.Query()
+ afterRaw := q.Get("after")
+ after, err := strconv.ParseUint(afterRaw, 10, 64)
+ if err != nil {
+ http.Error(w, "invalid after", http.StatusBadRequest)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/jsonlines")
+
+ mu.Lock()
+ attempts[after]++
+ attempt := attempts[after]
+ mu.Unlock()
+
+ write := func(recs ...types.ExportRecord) {
+ for _, rec := range recs {
+ b, _ := json.Marshal(rec)
+ _, _ = fmt.Fprintln(w, string(b))
+ }
+ }
+
+ switch after {
+ case 0:
+ write(records[0], records[1])
+ case 2:
+ // Truncate the tail on first pass: seq=5 is present but partial.
+ line3, _ := json.Marshal(records[2])
+ line4, _ := json.Marshal(records[3])
+ line5, _ := json.Marshal(records[4])
+ _, _ = fmt.Fprintln(w, string(line3))
+ _, _ = fmt.Fprintln(w, string(line4))
+ partial := string(line5)
+ if len(partial) > 16 {
+ partial = partial[:len(partial)-16]
+ }
+ _, _ = fmt.Fprint(w, partial)
+ case 4:
+ // First response is rate-limited; retry overlaps with an old seq.
+ if attempt == 1 {
+ http.Error(w, "rate limited", http.StatusTooManyRequests)
+ return
+ }
+
+ write(records[2], records[4], records[5])
+ case 6:
+ // caught up
+ default:
+ http.Error(w, "unexpected after", http.StatusBadRequest)
+ }
+ }))
+ defer server.Close()
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: server.URL,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 100000,
+ CommitBatchSize: 4,
+ VerifyWorkers: 2,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ PollInterval: time.Second,
+ RequestTimeout: 5 * time.Second,
+ HTTPRetryMaxAttempts: 3,
+ HTTPRetryBaseDelay: 10 * time.Millisecond,
+ HTTPRetryMaxDelay: 20 * time.Millisecond,
+ RateLimit: config.RateLimit{
+ ResolveRPS: 30,
+ ResolveBurst: 60,
+ ProofRPS: 10,
+ ProofBurst: 20,
+ },
+ ReplayTrace: true,
+ }
+
+ client := NewClient(server.URL, ClientOptions{
+ MaxAttempts: cfg.HTTPRetryMaxAttempts,
+ BaseDelay: cfg.HTTPRetryBaseDelay,
+ MaxDelay: cfg.HTTPRetryMaxDelay,
+ })
+ service := NewService(cfg, store, client, bl, checkpoint.NewManager(store, dataDir, keyPath))
+ defer service.Close()
+
+ if err := service.Replay(context.Background()); err != nil {
+ t.Fatalf("replay: %v", err)
+ }
+
+ if err := service.Flush(context.Background()); err != nil {
+ t.Fatalf("flush: %v", err)
+ }
+
+ seq, err := store.GetGlobalSeq()
+ if err != nil {
+ t.Fatalf("get global seq: %v", err)
+ }
+ if seq != 6 {
+ t.Fatalf("global seq mismatch: got %d want 6", seq)
+ }
+
+ var refs int
+ var gaps uint64
+ var prev uint64
+ err = store.ForEachOpSeqRef(func(seq uint64, _ types.BlockRefV1) error {
+ refs++
+ if prev > 0 && seq > prev+1 {
+ gaps += seq - prev - 1
+ }
+ prev = seq
+ return nil
+ })
+ if err != nil {
+ t.Fatalf("iterate op refs: %v", err)
+ }
+ if refs != 6 {
+ t.Fatalf("op ref count mismatch: got %d want 6", refs)
+ }
+ if gaps != 0 {
+ t.Fatalf("unexpected sequence gaps: %d", gaps)
+ }
+
+ tracePath := filepath.Join(dataDir, "trace.jsonl")
+ traceBytes, err := os.ReadFile(tracePath)
+ if err != nil {
+ t.Fatalf("read trace file: %v", err)
+ }
+
+ trace := string(traceBytes)
+ if !strings.Contains(trace, `"kind":"export_fetch"`) {
+ t.Fatalf("trace missing export_fetch event")
+ }
+ if !strings.Contains(trace, `"truncated_tail":true`) {
+ t.Fatalf("trace missing truncated tail marker")
+ }
+ if !strings.Contains(trace, `"kind":"dedup_decision"`) {
+ t.Fatalf("trace missing dedup decision event")
+ }
+ if !strings.Contains(trace, `"reason":"already_committed_seq"`) {
+ t.Fatalf("trace missing already_committed_seq reason")
+ }
+}
+
+func TestReplayFailsFastOnSequenceGap(t *testing.T) {
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data dir: %v", err)
+ }
+
+ keySeed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(keySeed); err != nil {
+ t.Fatalf("rand seed: %v", err)
+ }
+
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil {
+ t.Fatalf("write mirror key: %v", err)
+ }
+
+ records := buildSignedChainRecords(t, 3)
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/jsonlines")
+ // Deliberately skip seq=2.
+ for _, rec := range []types.ExportRecord{records[0], records[2]} {
+ b, _ := json.Marshal(rec)
+ _, _ = fmt.Fprintln(w, string(b))
+ }
+ }))
+ defer server.Close()
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: server.URL,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 100000,
+ CommitBatchSize: 2,
+ VerifyWorkers: 1,
+ ExportPageSize: 1000,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ PollInterval: time.Second,
+ RequestTimeout: 5 * time.Second,
+ HTTPRetryMaxAttempts: 1,
+ HTTPRetryBaseDelay: 10 * time.Millisecond,
+ HTTPRetryMaxDelay: 20 * time.Millisecond,
+ RateLimit: config.RateLimit{
+ ResolveRPS: 30,
+ ResolveBurst: 60,
+ ProofRPS: 10,
+ ProofBurst: 20,
+ },
+ ReplayTrace: true,
+ }
+
+ service := NewService(cfg, store, NewClient(server.URL), bl, checkpoint.NewManager(store, dataDir, keyPath))
+ defer service.Close()
+
+ err = service.Replay(context.Background())
+ if err == nil {
+ t.Fatalf("expected replay to fail on sequence gap")
+ }
+ if !strings.Contains(err.Error(), "sequence gap detected") {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if !strings.Contains(err.Error(), "expected=2 got=3") {
+ t.Fatalf("error missing sequence details: %v", err)
+ }
+}
+
+func buildSignedChainRecords(t *testing.T, total uint64) []types.ExportRecord {
+ t.Helper()
+
+ pub, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key: %v", err)
+ }
+
+ out := make([]types.ExportRecord, 0, total)
+ did := "did:plc:chain"
+ createdAtBase := time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC)
+ prev := ""
+
+ for seq := uint64(1); seq <= total; seq++ {
+ payloadDoc := map[string]any{
+ "did": did,
+ "didDoc": map[string]any{"id": did, "seq": seq},
+ }
+
+ if prev != "" {
+ payloadDoc["prev"] = prev
+ }
+
+ payloadBytes, _ := json.Marshal(payloadDoc)
+ canon, _ := types.CanonicalizeJSON(payloadBytes)
+ sig := ed25519.Sign(priv, canon)
+ op := map[string]any{
+ "did": did,
+ "didDoc": payloadDoc["didDoc"],
+ "publicKey": base64.RawURLEncoding.EncodeToString(pub),
+ "sigPayload": base64.RawURLEncoding.EncodeToString(canon),
+ "sig": base64.RawURLEncoding.EncodeToString(sig),
+ }
+ if prev != "" {
+ op["prev"] = prev
+ }
+
+ opRaw, _ := json.Marshal(op)
+ opCanon, _ := types.CanonicalizeJSON(opRaw)
+ cid := types.ComputeDigestCID(opCanon)
+
+ createdAt := createdAtBase.Add(time.Duration(seq-1) * time.Second).Format(time.RFC3339Nano)
+ if seq == 5 || seq == 6 {
+ // Identical timestamps to validate deterministic tie handling.
+ createdAt = createdAtBase.Add(4 * time.Second).Format(time.RFC3339Nano)
+ }
+
+ out = append(out, types.ExportRecord{
+ Seq: seq,
+ DID: did,
+ CreatedAt: createdAt,
+ CID: cid,
+ Operation: opRaw,
+ })
+ prev = cid
+ }
+
+ return out
+}