package ingest import ( "context" "crypto/ed25519" "crypto/rand" "encoding/base64" "encoding/json" "fmt" "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" "net/http" "net/http/httptest" "os" "path/filepath" "strconv" "strings" "sync" "testing" "time" ) func TestReplayHandlesTruncatedTailAndOverlappingRetryWithoutGaps(t *testing.T) { tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir data dir: %v", err) } keySeed := make([]byte, ed25519.SeedSize) if _, err := rand.Read(keySeed); err != nil { t.Fatalf("rand seed: %v", err) } keyPath := filepath.Join(tmp, "mirror.key") if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { t.Fatalf("write mirror key: %v", err) } records := buildSignedChainRecords(t, 6) var ( mu sync.Mutex attempts = map[uint64]int{} ) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { t.Helper() q := r.URL.Query() afterRaw := q.Get("after") after, err := strconv.ParseUint(afterRaw, 10, 64) if err != nil { http.Error(w, "invalid after", http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/jsonlines") mu.Lock() attempts[after]++ attempt := attempts[after] mu.Unlock() write := func(recs ...types.ExportRecord) { for _, rec := range recs { b, _ := json.Marshal(rec) _, _ = fmt.Fprintln(w, string(b)) } } switch after { case 0: write(records[0], records[1]) case 2: // Truncate the tail on first pass: seq=5 is present but partial. line3, _ := json.Marshal(records[2]) line4, _ := json.Marshal(records[3]) line5, _ := json.Marshal(records[4]) _, _ = fmt.Fprintln(w, string(line3)) _, _ = fmt.Fprintln(w, string(line4)) partial := string(line5) if len(partial) > 16 { partial = partial[:len(partial)-16] } _, _ = fmt.Fprint(w, partial) case 4: // First response is rate-limited; retry overlaps with an old seq. if attempt == 1 { http.Error(w, "rate limited", http.StatusTooManyRequests) return } write(records[2], records[4], records[5]) case 6: // caught up default: http.Error(w, "unexpected after", http.StatusBadRequest) } })) defer server.Close() store, err := storage.OpenPebble(dataDir) if err != nil { t.Fatalf("open pebble: %v", err) } defer store.Close() if err := store.SetMode(config.ModeMirror); err != nil { t.Fatalf("set mode: %v", err) } bl, err := storage.OpenBlockLog(dataDir, 3, 4) if err != nil { t.Fatalf("open block log: %v", err) } cfg := config.Config{ Mode: config.ModeMirror, DataDir: dataDir, PLCSource: server.URL, VerifyPolicy: config.VerifyFull, ZstdLevel: 3, BlockSizeMB: 4, CheckpointInterval: 100000, CommitBatchSize: 4, VerifyWorkers: 2, ListenAddr: ":0", MirrorPrivateKeyPath: keyPath, PollInterval: time.Second, RequestTimeout: 5 * time.Second, HTTPRetryMaxAttempts: 3, HTTPRetryBaseDelay: 10 * time.Millisecond, HTTPRetryMaxDelay: 20 * time.Millisecond, RateLimit: config.RateLimit{ ResolveRPS: 30, ResolveBurst: 60, ProofRPS: 10, ProofBurst: 20, }, ReplayTrace: true, } client := NewClient(server.URL, ClientOptions{ MaxAttempts: cfg.HTTPRetryMaxAttempts, BaseDelay: cfg.HTTPRetryBaseDelay, MaxDelay: cfg.HTTPRetryMaxDelay, }) service := NewService(cfg, store, client, bl, checkpoint.NewManager(store, dataDir, keyPath)) defer service.Close() if err := service.Replay(context.Background()); err != nil { t.Fatalf("replay: %v", err) } if err := service.Flush(context.Background()); err != nil { t.Fatalf("flush: %v", err) } seq, err := store.GetGlobalSeq() if err != nil { t.Fatalf("get global seq: %v", err) } if seq != 6 { t.Fatalf("global seq mismatch: got %d want 6", seq) } var refs int var gaps uint64 var prev uint64 err = store.ForEachOpSeqRef(func(seq uint64, _ types.BlockRefV1) error { refs++ if prev > 0 && seq > prev+1 { gaps += seq - prev - 1 } prev = seq return nil }) if err != nil { t.Fatalf("iterate op refs: %v", err) } if refs != 6 { t.Fatalf("op ref count mismatch: got %d want 6", refs) } if gaps != 0 { t.Fatalf("unexpected sequence gaps: %d", gaps) } tracePath := filepath.Join(dataDir, "trace.jsonl") traceBytes, err := os.ReadFile(tracePath) if err != nil { t.Fatalf("read trace file: %v", err) } trace := string(traceBytes) if !strings.Contains(trace, `"kind":"export_fetch"`) { t.Fatalf("trace missing export_fetch event") } if !strings.Contains(trace, `"truncated_tail":true`) { t.Fatalf("trace missing truncated tail marker") } if !strings.Contains(trace, `"kind":"dedup_decision"`) { t.Fatalf("trace missing dedup decision event") } if !strings.Contains(trace, `"reason":"already_committed_seq"`) { t.Fatalf("trace missing already_committed_seq reason") } } func TestReplayFailsFastOnSequenceGap(t *testing.T) { tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir data dir: %v", err) } keySeed := make([]byte, ed25519.SeedSize) if _, err := rand.Read(keySeed); err != nil { t.Fatalf("rand seed: %v", err) } keyPath := filepath.Join(tmp, "mirror.key") if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { t.Fatalf("write mirror key: %v", err) } records := buildSignedChainRecords(t, 3) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/jsonlines") // Deliberately skip seq=2. for _, rec := range []types.ExportRecord{records[0], records[2]} { b, _ := json.Marshal(rec) _, _ = fmt.Fprintln(w, string(b)) } })) defer server.Close() store, err := storage.OpenPebble(dataDir) if err != nil { t.Fatalf("open pebble: %v", err) } defer store.Close() if err := store.SetMode(config.ModeMirror); err != nil { t.Fatalf("set mode: %v", err) } bl, err := storage.OpenBlockLog(dataDir, 3, 4) if err != nil { t.Fatalf("open block log: %v", err) } cfg := config.Config{ Mode: config.ModeMirror, DataDir: dataDir, PLCSource: server.URL, VerifyPolicy: config.VerifyFull, ZstdLevel: 3, BlockSizeMB: 4, CheckpointInterval: 100000, CommitBatchSize: 2, VerifyWorkers: 1, ExportPageSize: 1000, ListenAddr: ":0", MirrorPrivateKeyPath: keyPath, PollInterval: time.Second, RequestTimeout: 5 * time.Second, HTTPRetryMaxAttempts: 1, HTTPRetryBaseDelay: 10 * time.Millisecond, HTTPRetryMaxDelay: 20 * time.Millisecond, RateLimit: config.RateLimit{ ResolveRPS: 30, ResolveBurst: 60, ProofRPS: 10, ProofBurst: 20, }, ReplayTrace: true, } service := NewService(cfg, store, NewClient(server.URL), bl, checkpoint.NewManager(store, dataDir, keyPath)) defer service.Close() err = service.Replay(context.Background()) if err == nil { t.Fatalf("expected replay to fail on sequence gap") } if !strings.Contains(err.Error(), "sequence gap detected") { t.Fatalf("unexpected error: %v", err) } if !strings.Contains(err.Error(), "expected=2 got=3") { t.Fatalf("error missing sequence details: %v", err) } } func buildSignedChainRecords(t *testing.T, total uint64) []types.ExportRecord { t.Helper() pub, priv, err := ed25519.GenerateKey(rand.Reader) if err != nil { t.Fatalf("generate key: %v", err) } out := make([]types.ExportRecord, 0, total) did := "did:plc:chain" createdAtBase := time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC) prev := "" for seq := uint64(1); seq <= total; seq++ { payloadDoc := map[string]any{ "did": did, "didDoc": map[string]any{"id": did, "seq": seq}, } if prev != "" { payloadDoc["prev"] = prev } payloadBytes, _ := json.Marshal(payloadDoc) canon, _ := types.CanonicalizeJSON(payloadBytes) sig := ed25519.Sign(priv, canon) op := map[string]any{ "did": did, "didDoc": payloadDoc["didDoc"], "publicKey": base64.RawURLEncoding.EncodeToString(pub), "sigPayload": base64.RawURLEncoding.EncodeToString(canon), "sig": base64.RawURLEncoding.EncodeToString(sig), } if prev != "" { op["prev"] = prev } opRaw, _ := json.Marshal(op) opCanon, _ := types.CanonicalizeJSON(opRaw) cid := types.ComputeDigestCID(opCanon) createdAt := createdAtBase.Add(time.Duration(seq-1) * time.Second).Format(time.RFC3339Nano) if seq == 5 || seq == 6 { // Identical timestamps to validate deterministic tie handling. createdAt = createdAtBase.Add(4 * time.Second).Format(time.RFC3339Nano) } out = append(out, types.ExportRecord{ Seq: seq, DID: did, CreatedAt: createdAt, CID: cid, Operation: opRaw, }) prev = cid } return out }