diff options
| author | Fuwn <[email protected]> | 2026-02-27 08:48:17 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-27 08:48:17 -0800 |
| commit | 219b220b99b509e65cb1325ba67f00264e7cc25c (patch) | |
| tree | 746c71e7f8c1e7a253bfe4a6991dc1af6d839fe0 /internal/ingest/service_export_integrity_test.go | |
| parent | chore: improve user-facing API copy clarity and consistency (diff) | |
| download | plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.tar.xz plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.zip | |
fix: make mirror replay lossless with strict seq accounting and trace
Diffstat (limited to 'internal/ingest/service_export_integrity_test.go')
| -rw-r--r-- | internal/ingest/service_export_integrity_test.go | 357 |
1 files changed, 357 insertions, 0 deletions
diff --git a/internal/ingest/service_export_integrity_test.go b/internal/ingest/service_export_integrity_test.go new file mode 100644 index 0000000..92a1703 --- /dev/null +++ b/internal/ingest/service_export_integrity_test.go @@ -0,0 +1,357 @@ +package ingest + +import ( + "context" + "crypto/ed25519" + "crypto/rand" + "encoding/base64" + "encoding/json" + "fmt" + "github.com/Fuwn/plutia/internal/checkpoint" + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "testing" + "time" +) + +func TestReplayHandlesTruncatedTailAndOverlappingRetryWithoutGaps(t *testing.T) { + tmp := t.TempDir() + dataDir := filepath.Join(tmp, "data") + + if err := os.MkdirAll(dataDir, 0o755); err != nil { + t.Fatalf("mkdir data dir: %v", err) + } + + keySeed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(keySeed); err != nil { + t.Fatalf("rand seed: %v", err) + } + + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { + t.Fatalf("write mirror key: %v", err) + } + + records := buildSignedChainRecords(t, 6) + + var ( + mu sync.Mutex + attempts = map[uint64]int{} + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Helper() + + q := r.URL.Query() + afterRaw := q.Get("after") + after, err := strconv.ParseUint(afterRaw, 10, 64) + if err != nil { + http.Error(w, "invalid after", http.StatusBadRequest) + return + } + + w.Header().Set("Content-Type", "application/jsonlines") + + mu.Lock() + attempts[after]++ + attempt := attempts[after] + mu.Unlock() + + write := func(recs ...types.ExportRecord) { + for _, rec := range recs { + b, _ := json.Marshal(rec) + _, _ = fmt.Fprintln(w, string(b)) + } + } + + switch after { + case 0: + write(records[0], records[1]) + case 2: + // Truncate the tail on first pass: seq=5 is present but partial. + line3, _ := json.Marshal(records[2]) + line4, _ := json.Marshal(records[3]) + line5, _ := json.Marshal(records[4]) + _, _ = fmt.Fprintln(w, string(line3)) + _, _ = fmt.Fprintln(w, string(line4)) + partial := string(line5) + if len(partial) > 16 { + partial = partial[:len(partial)-16] + } + _, _ = fmt.Fprint(w, partial) + case 4: + // First response is rate-limited; retry overlaps with an old seq. + if attempt == 1 { + http.Error(w, "rate limited", http.StatusTooManyRequests) + return + } + + write(records[2], records[4], records[5]) + case 6: + // caught up + default: + http.Error(w, "unexpected after", http.StatusBadRequest) + } + })) + defer server.Close() + + store, err := storage.OpenPebble(dataDir) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + + if err := store.SetMode(config.ModeMirror); err != nil { + t.Fatalf("set mode: %v", err) + } + + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { + t.Fatalf("open block log: %v", err) + } + + cfg := config.Config{ + Mode: config.ModeMirror, + DataDir: dataDir, + PLCSource: server.URL, + VerifyPolicy: config.VerifyFull, + ZstdLevel: 3, + BlockSizeMB: 4, + CheckpointInterval: 100000, + CommitBatchSize: 4, + VerifyWorkers: 2, + ListenAddr: ":0", + MirrorPrivateKeyPath: keyPath, + PollInterval: time.Second, + RequestTimeout: 5 * time.Second, + HTTPRetryMaxAttempts: 3, + HTTPRetryBaseDelay: 10 * time.Millisecond, + HTTPRetryMaxDelay: 20 * time.Millisecond, + RateLimit: config.RateLimit{ + ResolveRPS: 30, + ResolveBurst: 60, + ProofRPS: 10, + ProofBurst: 20, + }, + ReplayTrace: true, + } + + client := NewClient(server.URL, ClientOptions{ + MaxAttempts: cfg.HTTPRetryMaxAttempts, + BaseDelay: cfg.HTTPRetryBaseDelay, + MaxDelay: cfg.HTTPRetryMaxDelay, + }) + service := NewService(cfg, store, client, bl, checkpoint.NewManager(store, dataDir, keyPath)) + defer service.Close() + + if err := service.Replay(context.Background()); err != nil { + t.Fatalf("replay: %v", err) + } + + if err := service.Flush(context.Background()); err != nil { + t.Fatalf("flush: %v", err) + } + + seq, err := store.GetGlobalSeq() + if err != nil { + t.Fatalf("get global seq: %v", err) + } + if seq != 6 { + t.Fatalf("global seq mismatch: got %d want 6", seq) + } + + var refs int + var gaps uint64 + var prev uint64 + err = store.ForEachOpSeqRef(func(seq uint64, _ types.BlockRefV1) error { + refs++ + if prev > 0 && seq > prev+1 { + gaps += seq - prev - 1 + } + prev = seq + return nil + }) + if err != nil { + t.Fatalf("iterate op refs: %v", err) + } + if refs != 6 { + t.Fatalf("op ref count mismatch: got %d want 6", refs) + } + if gaps != 0 { + t.Fatalf("unexpected sequence gaps: %d", gaps) + } + + tracePath := filepath.Join(dataDir, "trace.jsonl") + traceBytes, err := os.ReadFile(tracePath) + if err != nil { + t.Fatalf("read trace file: %v", err) + } + + trace := string(traceBytes) + if !strings.Contains(trace, `"kind":"export_fetch"`) { + t.Fatalf("trace missing export_fetch event") + } + if !strings.Contains(trace, `"truncated_tail":true`) { + t.Fatalf("trace missing truncated tail marker") + } + if !strings.Contains(trace, `"kind":"dedup_decision"`) { + t.Fatalf("trace missing dedup decision event") + } + if !strings.Contains(trace, `"reason":"already_committed_seq"`) { + t.Fatalf("trace missing already_committed_seq reason") + } +} + +func TestReplayFailsFastOnSequenceGap(t *testing.T) { + tmp := t.TempDir() + dataDir := filepath.Join(tmp, "data") + + if err := os.MkdirAll(dataDir, 0o755); err != nil { + t.Fatalf("mkdir data dir: %v", err) + } + + keySeed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(keySeed); err != nil { + t.Fatalf("rand seed: %v", err) + } + + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { + t.Fatalf("write mirror key: %v", err) + } + + records := buildSignedChainRecords(t, 3) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/jsonlines") + // Deliberately skip seq=2. + for _, rec := range []types.ExportRecord{records[0], records[2]} { + b, _ := json.Marshal(rec) + _, _ = fmt.Fprintln(w, string(b)) + } + })) + defer server.Close() + + store, err := storage.OpenPebble(dataDir) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + + if err := store.SetMode(config.ModeMirror); err != nil { + t.Fatalf("set mode: %v", err) + } + + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { + t.Fatalf("open block log: %v", err) + } + + cfg := config.Config{ + Mode: config.ModeMirror, + DataDir: dataDir, + PLCSource: server.URL, + VerifyPolicy: config.VerifyFull, + ZstdLevel: 3, + BlockSizeMB: 4, + CheckpointInterval: 100000, + CommitBatchSize: 2, + VerifyWorkers: 1, + ExportPageSize: 1000, + ListenAddr: ":0", + MirrorPrivateKeyPath: keyPath, + PollInterval: time.Second, + RequestTimeout: 5 * time.Second, + HTTPRetryMaxAttempts: 1, + HTTPRetryBaseDelay: 10 * time.Millisecond, + HTTPRetryMaxDelay: 20 * time.Millisecond, + RateLimit: config.RateLimit{ + ResolveRPS: 30, + ResolveBurst: 60, + ProofRPS: 10, + ProofBurst: 20, + }, + ReplayTrace: true, + } + + service := NewService(cfg, store, NewClient(server.URL), bl, checkpoint.NewManager(store, dataDir, keyPath)) + defer service.Close() + + err = service.Replay(context.Background()) + if err == nil { + t.Fatalf("expected replay to fail on sequence gap") + } + if !strings.Contains(err.Error(), "sequence gap detected") { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), "expected=2 got=3") { + t.Fatalf("error missing sequence details: %v", err) + } +} + +func buildSignedChainRecords(t *testing.T, total uint64) []types.ExportRecord { + t.Helper() + + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatalf("generate key: %v", err) + } + + out := make([]types.ExportRecord, 0, total) + did := "did:plc:chain" + createdAtBase := time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC) + prev := "" + + for seq := uint64(1); seq <= total; seq++ { + payloadDoc := map[string]any{ + "did": did, + "didDoc": map[string]any{"id": did, "seq": seq}, + } + + if prev != "" { + payloadDoc["prev"] = prev + } + + payloadBytes, _ := json.Marshal(payloadDoc) + canon, _ := types.CanonicalizeJSON(payloadBytes) + sig := ed25519.Sign(priv, canon) + op := map[string]any{ + "did": did, + "didDoc": payloadDoc["didDoc"], + "publicKey": base64.RawURLEncoding.EncodeToString(pub), + "sigPayload": base64.RawURLEncoding.EncodeToString(canon), + "sig": base64.RawURLEncoding.EncodeToString(sig), + } + if prev != "" { + op["prev"] = prev + } + + opRaw, _ := json.Marshal(op) + opCanon, _ := types.CanonicalizeJSON(opRaw) + cid := types.ComputeDigestCID(opCanon) + + createdAt := createdAtBase.Add(time.Duration(seq-1) * time.Second).Format(time.RFC3339Nano) + if seq == 5 || seq == 6 { + // Identical timestamps to validate deterministic tie handling. + createdAt = createdAtBase.Add(4 * time.Second).Format(time.RFC3339Nano) + } + + out = append(out, types.ExportRecord{ + Seq: seq, + DID: did, + CreatedAt: createdAt, + CID: cid, + Operation: opRaw, + }) + prev = cid + } + + return out +} |