diff options
Diffstat (limited to 'internal/ingest/service_integration_test.go')
| -rw-r--r-- | internal/ingest/service_integration_test.go | 157 |
1 files changed, 157 insertions, 0 deletions
diff --git a/internal/ingest/service_integration_test.go b/internal/ingest/service_integration_test.go new file mode 100644 index 0000000..d7bd1b6 --- /dev/null +++ b/internal/ingest/service_integration_test.go @@ -0,0 +1,157 @@ +package ingest + +import ( + "context" + "crypto/ed25519" + "crypto/rand" + "encoding/base64" + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/Fuwn/plutia/internal/checkpoint" + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" +) + +func TestReplayIntegration(t *testing.T) { + tmp := t.TempDir() + dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { + t.Fatalf("mkdir data dir: %v", err) + } + + keySeed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(keySeed); err != nil { + t.Fatalf("rand seed: %v", err) + } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { + t.Fatalf("write mirror key: %v", err) + } + + records := buildSignedRecords(t) + sourcePath := filepath.Join(tmp, "sample.ndjson") + file, err := os.Create(sourcePath) + if err != nil { + t.Fatalf("create source: %v", err) + } + for _, rec := range records { + b, _ := json.Marshal(rec) + if _, err := fmt.Fprintln(file, string(b)); err != nil { + t.Fatalf("write source: %v", err) + } + } + file.Close() + + store, err := storage.OpenPebble(dataDir) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + if err := store.SetMode(config.ModeMirror); err != nil { + t.Fatalf("set mode: %v", err) + } + + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { + t.Fatalf("open block log: %v", err) + } + + cfg := config.Config{ + Mode: config.ModeMirror, + DataDir: dataDir, + PLCSource: sourcePath, + VerifyPolicy: config.VerifyFull, + ZstdLevel: 3, + BlockSizeMB: 4, + CheckpointInterval: 2, + ListenAddr: ":0", + MirrorPrivateKeyPath: keyPath, + } + service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath)) + if err := service.Replay(context.Background()); err != nil { + t.Fatalf("replay: %v", err) + } + if err := service.Flush(context.Background()); err != nil { + t.Fatalf("flush: %v", err) + } + + seq, err := store.GetGlobalSeq() + if err != nil { + t.Fatalf("get global seq: %v", err) + } + if seq != 3 { + t.Fatalf("global seq mismatch: got %d want 3", seq) + } + + s, ok, err := store.GetState("did:plc:alice") + if err != nil { + t.Fatalf("get state: %v", err) + } + if !ok { + t.Fatalf("missing alice state") + } + if s.LatestOpSeq != 2 { + t.Fatalf("latest op seq mismatch for alice: got %d want 2", s.LatestOpSeq) + } + if err := service.VerifyDID(context.Background(), "did:plc:alice"); err != nil { + t.Fatalf("verify alice did: %v", err) + } + + if _, err := service.Snapshot(context.Background()); err != nil { + t.Fatalf("snapshot: %v", err) + } + if _, ok, err := store.GetLatestCheckpoint(); err != nil || !ok { + t.Fatalf("expected latest checkpoint, err=%v ok=%v", err, ok) + } +} + +func buildSignedRecords(t *testing.T) []types.ExportRecord { + t.Helper() + pub1, priv1, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatalf("generate key1: %v", err) + } + pub2, priv2, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatalf("generate key2: %v", err) + } + var out []types.ExportRecord + + mk := func(seq uint64, did, prev string, pub ed25519.PublicKey, priv ed25519.PrivateKey) types.ExportRecord { + payloadDoc := map[string]any{ + "did": did, + "didDoc": map[string]any{"id": did, "seq": seq}, + } + if prev != "" { + payloadDoc["prev"] = prev + } + payloadBytes, _ := json.Marshal(payloadDoc) + canon, _ := types.CanonicalizeJSON(payloadBytes) + sig := ed25519.Sign(priv, canon) + op := map[string]any{ + "did": did, + "didDoc": payloadDoc["didDoc"], + "publicKey": base64.RawURLEncoding.EncodeToString(pub), + "sigPayload": base64.RawURLEncoding.EncodeToString(canon), + "sig": base64.RawURLEncoding.EncodeToString(sig), + } + if prev != "" { + op["prev"] = prev + } + opRaw, _ := json.Marshal(op) + opCanon, _ := types.CanonicalizeJSON(opRaw) + cid := types.ComputeDigestCID(opCanon) + return types.ExportRecord{Seq: seq, DID: did, CID: cid, Operation: opRaw} + } + + rec1 := mk(1, "did:plc:alice", "", pub1, priv1) + rec2 := mk(2, "did:plc:alice", rec1.CID, pub1, priv1) + rec3 := mk(3, "did:plc:bob", "", pub2, priv2) + out = append(out, rec1, rec2, rec3) + return out +} |