package api import ( "context" "crypto/ed25519" "crypto/rand" "encoding/base64" "encoding/json" "fmt" "io" "net/http" "net/http/httptest" "os" "path/filepath" "strings" "testing" "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/ingest" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" ) func TestProofAgainstOlderCheckpointAfterFurtherIngest(t *testing.T) { tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir data: %v", err) } keyPath := filepath.Join(tmp, "mirror.key") seed := make([]byte, ed25519.SeedSize) if _, err := rand.Read(seed); err != nil { t.Fatalf("seed: %v", err) } if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil { t.Fatalf("write key: %v", err) } recs := buildCheckpointScenarioRecords(t) sourcePath := filepath.Join(tmp, "records.ndjson") writeRecordsFile(t, sourcePath, recs) store, err := storage.OpenPebble(dataDir) if err != nil { t.Fatalf("open pebble: %v", err) } defer store.Close() if err := store.SetMode(config.ModeMirror); err != nil { t.Fatalf("set mode: %v", err) } bl, err := storage.OpenBlockLog(dataDir, 3, 4) if err != nil { t.Fatalf("open block log: %v", err) } cfg := config.Config{ Mode: config.ModeMirror, DataDir: dataDir, PLCSource: sourcePath, VerifyPolicy: config.VerifyFull, ZstdLevel: 3, BlockSizeMB: 4, CheckpointInterval: 2, ListenAddr: ":0", MirrorPrivateKeyPath: keyPath, PollInterval: 5, } cpMgr := checkpoint.NewManager(store, dataDir, keyPath) svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr) if err := svc.Replay(context.Background()); err != nil { t.Fatalf("replay: %v", err) } if err := svc.Flush(context.Background()); err != nil { t.Fatalf("flush: %v", err) } cp2, ok, err := store.GetCheckpoint(2) if err != nil || !ok { t.Fatalf("checkpoint 2 missing: ok=%v err=%v", ok, err) } ts := httptest.NewServer(NewServer(cfg, store, svc, cpMgr).Handler()) defer ts.Close() url := ts.URL + "/did/" + strings.ReplaceAll("did:plc:alice", ":", "%3A") + "/proof?checkpoint=2" resp, err := http.Get(url) if err != nil { t.Fatalf("get proof: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { bodyBytes, _ := io.ReadAll(resp.Body) t.Fatalf("proof status: %d body=%s", resp.StatusCode, string(bodyBytes)) } var body struct { CheckpointSequence uint64 `json:"checkpoint_sequence"` ChainTipReference string `json:"chain_tip_reference"` InclusionProof struct { MerkleRoot string `json:"merkle_root"` } `json:"inclusion_proof"` } if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { t.Fatalf("decode response: %v", err) } if body.CheckpointSequence != 2 { t.Fatalf("unexpected checkpoint sequence: got %d want 2", body.CheckpointSequence) } if body.ChainTipReference != recs[0].CID { t.Fatalf("expected old tip at checkpoint=2: got %s want %s", body.ChainTipReference, recs[0].CID) } if body.InclusionProof.MerkleRoot != cp2.DIDMerkleRoot { t.Fatalf("merkle root mismatch: got %s want %s", body.InclusionProof.MerkleRoot, cp2.DIDMerkleRoot) } } func TestCorruptedBlockRefusesProof(t *testing.T) { tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir data: %v", err) } seed := make([]byte, ed25519.SeedSize) if _, err := rand.Read(seed); err != nil { t.Fatalf("seed: %v", err) } keyPath := filepath.Join(tmp, "mirror.key") if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil { t.Fatalf("write key: %v", err) } recs := buildCheckpointScenarioRecords(t) sourcePath := filepath.Join(tmp, "records.ndjson") writeRecordsFile(t, sourcePath, recs) store, err := storage.OpenPebble(dataDir) if err != nil { t.Fatalf("open pebble: %v", err) } if err := store.SetMode(config.ModeMirror); err != nil { t.Fatalf("set mode: %v", err) } bl, err := storage.OpenBlockLog(dataDir, 3, 4) if err != nil { t.Fatalf("open block log: %v", err) } cfg := config.Config{ Mode: config.ModeMirror, DataDir: dataDir, PLCSource: sourcePath, VerifyPolicy: config.VerifyFull, ZstdLevel: 3, BlockSizeMB: 4, CheckpointInterval: 2, ListenAddr: ":0", MirrorPrivateKeyPath: keyPath, PollInterval: 5, } cpMgr := checkpoint.NewManager(store, dataDir, keyPath) svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr) if err := svc.Replay(context.Background()); err != nil { t.Fatalf("replay: %v", err) } if _, err := svc.Snapshot(context.Background()); err != nil { t.Fatalf("snapshot: %v", err) } svc.Close() if err := store.Close(); err != nil { t.Fatalf("close store: %v", err) } blockPath := filepath.Join(dataDir, "ops", "000001.zst") b, err := os.ReadFile(blockPath) if err != nil { t.Fatalf("read block: %v", err) } b[len(b)/2] ^= 0xFF if err := os.WriteFile(blockPath, b, 0o644); err != nil { t.Fatalf("write corrupted block: %v", err) } store2, err := storage.OpenPebble(dataDir) if err != nil { t.Fatalf("open store2: %v", err) } defer store2.Close() bl2, err := storage.OpenBlockLog(dataDir, 3, 4) if err != nil { t.Fatalf("open blocklog2: %v", err) } svc2 := ingest.NewService(cfg, store2, ingest.NewClient("file:///nonexistent"), bl2, checkpoint.NewManager(store2, dataDir, keyPath)) if !svc2.IsCorrupted() { t.Fatalf("expected service to detect corruption on restart") } ts := httptest.NewServer(NewServer(cfg, store2, svc2, checkpoint.NewManager(store2, dataDir, keyPath)).Handler()) defer ts.Close() url := ts.URL + "/did/" + strings.ReplaceAll("did:plc:alice", ":", "%3A") + "/proof" resp, err := http.Get(url) if err != nil { t.Fatalf("request proof: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusServiceUnavailable { t.Fatalf("expected 503 for corrupted proof, got %d", resp.StatusCode) } } func writeRecordsFile(t *testing.T, path string, recs []types.ExportRecord) { t.Helper() f, err := os.Create(path) if err != nil { t.Fatalf("create records file: %v", err) } defer f.Close() for _, rec := range recs { b, _ := json.Marshal(rec) if _, err := fmt.Fprintln(f, string(b)); err != nil { t.Fatalf("write record: %v", err) } } } func buildCheckpointScenarioRecords(t *testing.T) []types.ExportRecord { t.Helper() pub, priv, err := ed25519.GenerateKey(rand.Reader) if err != nil { t.Fatalf("generate key: %v", err) } mk := func(seq uint64, did, prev string) types.ExportRecord { unsigned := map[string]any{ "did": did, "didDoc": map[string]any{"id": did, "seq": seq}, "publicKey": base64.RawURLEncoding.EncodeToString(pub), } if prev != "" { unsigned["prev"] = prev } payload, _ := json.Marshal(unsigned) canon, _ := types.CanonicalizeJSON(payload) sig := ed25519.Sign(priv, canon) op := map[string]any{} for k, v := range unsigned { op[k] = v } op["sigPayload"] = base64.RawURLEncoding.EncodeToString(canon) op["sig"] = base64.RawURLEncoding.EncodeToString(sig) raw, _ := json.Marshal(op) opCanon, _ := types.CanonicalizeJSON(raw) cid := types.ComputeDigestCID(opCanon) return types.ExportRecord{Seq: seq, DID: did, CID: cid, Operation: raw} } r1 := mk(1, "did:plc:alice", "") r2 := mk(2, "did:plc:bob", "") r3 := mk(3, "did:plc:alice", r1.CID) return []types.ExportRecord{r1, r2, r3} }