package ingest import ( "context" "crypto/ed25519" "crypto/rand" "encoding/base64" "encoding/json" "fmt" "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" "os" "path/filepath" "testing" ) func TestReplayIntegration(t *testing.T) { tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir data dir: %v", err) } keySeed := make([]byte, ed25519.SeedSize) if _, err := rand.Read(keySeed); err != nil { t.Fatalf("rand seed: %v", err) } keyPath := filepath.Join(tmp, "mirror.key") if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { t.Fatalf("write mirror key: %v", err) } records := buildSignedRecords(t) sourcePath := filepath.Join(tmp, "sample.ndjson") file, err := os.Create(sourcePath) if err != nil { t.Fatalf("create source: %v", err) } for _, rec := range records { b, _ := json.Marshal(rec) if _, err := fmt.Fprintln(file, string(b)); err != nil { t.Fatalf("write source: %v", err) } } file.Close() store, err := storage.OpenPebble(dataDir) if err != nil { t.Fatalf("open pebble: %v", err) } defer store.Close() if err := store.SetMode(config.ModeMirror); err != nil { t.Fatalf("set mode: %v", err) } bl, err := storage.OpenBlockLog(dataDir, 3, 4) if err != nil { t.Fatalf("open block log: %v", err) } cfg := config.Config{ Mode: config.ModeMirror, DataDir: dataDir, PLCSource: sourcePath, VerifyPolicy: config.VerifyFull, ZstdLevel: 3, BlockSizeMB: 4, CheckpointInterval: 2, ListenAddr: ":0", MirrorPrivateKeyPath: keyPath, } service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath)) if err := service.Replay(context.Background()); err != nil { t.Fatalf("replay: %v", err) } if err := service.Flush(context.Background()); err != nil { t.Fatalf("flush: %v", err) } seq, err := store.GetGlobalSeq() if err != nil { t.Fatalf("get global seq: %v", err) } if seq != 3 { t.Fatalf("global seq mismatch: got %d want 3", seq) } s, ok, err := store.GetState("did:plc:alice") if err != nil { t.Fatalf("get state: %v", err) } if !ok { t.Fatalf("missing alice state") } if s.LatestOpSeq != 2 { t.Fatalf("latest op seq mismatch for alice: got %d want 2", s.LatestOpSeq) } if err := service.VerifyDID(context.Background(), "did:plc:alice"); err != nil { t.Fatalf("verify alice did: %v", err) } if _, err := service.Snapshot(context.Background()); err != nil { t.Fatalf("snapshot: %v", err) } if _, ok, err := store.GetLatestCheckpoint(); err != nil || !ok { t.Fatalf("expected latest checkpoint, err=%v ok=%v", err, ok) } } func buildSignedRecords(t *testing.T) []types.ExportRecord { t.Helper() pub1, priv1, err := ed25519.GenerateKey(rand.Reader) if err != nil { t.Fatalf("generate key1: %v", err) } pub2, priv2, err := ed25519.GenerateKey(rand.Reader) if err != nil { t.Fatalf("generate key2: %v", err) } var out []types.ExportRecord mk := func(seq uint64, did, prev string, pub ed25519.PublicKey, priv ed25519.PrivateKey) types.ExportRecord { payloadDoc := map[string]any{ "did": did, "didDoc": map[string]any{"id": did, "seq": seq}, } if prev != "" { payloadDoc["prev"] = prev } payloadBytes, _ := json.Marshal(payloadDoc) canon, _ := types.CanonicalizeJSON(payloadBytes) sig := ed25519.Sign(priv, canon) op := map[string]any{ "did": did, "didDoc": payloadDoc["didDoc"], "publicKey": base64.RawURLEncoding.EncodeToString(pub), "sigPayload": base64.RawURLEncoding.EncodeToString(canon), "sig": base64.RawURLEncoding.EncodeToString(sig), } if prev != "" { op["prev"] = prev } opRaw, _ := json.Marshal(op) opCanon, _ := types.CanonicalizeJSON(opRaw) cid := types.ComputeDigestCID(opCanon) return types.ExportRecord{Seq: seq, DID: did, CID: cid, Operation: opRaw} } rec1 := mk(1, "did:plc:alice", "", pub1, priv1) rec2 := mk(2, "did:plc:alice", rec1.CID, pub1, priv1) rec3 := mk(3, "did:plc:bob", "", pub2, priv2) out = append(out, rec1, rec2, rec3) return out } func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) { tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir data dir: %v", err) } keySeed := make([]byte, ed25519.SeedSize) if _, err := rand.Read(keySeed); err != nil { t.Fatalf("rand seed: %v", err) } keyPath := filepath.Join(tmp, "mirror.key") if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { t.Fatalf("write mirror key: %v", err) } records := buildSignedRecords(t) sourcePath := filepath.Join(tmp, "sample.ndjson") file, err := os.Create(sourcePath) if err != nil { t.Fatalf("create source: %v", err) } for _, rec := range records { b, _ := json.Marshal(rec) if _, err := fmt.Fprintln(file, string(b)); err != nil { t.Fatalf("write source: %v", err) } } file.Close() store, err := storage.OpenPebble(dataDir) if err != nil { t.Fatalf("open pebble: %v", err) } defer store.Close() if err := store.SetMode(config.ModeMirror); err != nil { t.Fatalf("set mode: %v", err) } bl, err := storage.OpenBlockLog(dataDir, 3, 4) if err != nil { t.Fatalf("open block log: %v", err) } cfg := config.Config{ Mode: config.ModeMirror, DataDir: dataDir, PLCSource: sourcePath, VerifyPolicy: config.VerifyFull, ZstdLevel: 3, BlockSizeMB: 4, CheckpointInterval: 2, ListenAddr: ":0", MirrorPrivateKeyPath: keyPath, PollInterval: 5, } service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath)) if err := service.Replay(context.Background()); err != nil { t.Fatalf("replay: %v", err) } if err := service.Flush(context.Background()); err != nil { t.Fatalf("flush: %v", err) } ctx, cancel := context.WithCancel(context.Background()) cancel() if _, _, err := service.RecomputeTipAtOrBefore(ctx, "did:plc:alice", 2); err == nil { t.Fatalf("expected cancellation error") } }