aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/service_integration_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/ingest/service_integration_test.go')
-rw-r--r--internal/ingest/service_integration_test.go157
1 files changed, 157 insertions, 0 deletions
diff --git a/internal/ingest/service_integration_test.go b/internal/ingest/service_integration_test.go
new file mode 100644
index 0000000..d7bd1b6
--- /dev/null
+++ b/internal/ingest/service_integration_test.go
@@ -0,0 +1,157 @@
+package ingest
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/Fuwn/plutia/internal/checkpoint"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+func TestReplayIntegration(t *testing.T) {
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data dir: %v", err)
+ }
+
+ keySeed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(keySeed); err != nil {
+ t.Fatalf("rand seed: %v", err)
+ }
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil {
+ t.Fatalf("write mirror key: %v", err)
+ }
+
+ records := buildSignedRecords(t)
+ sourcePath := filepath.Join(tmp, "sample.ndjson")
+ file, err := os.Create(sourcePath)
+ if err != nil {
+ t.Fatalf("create source: %v", err)
+ }
+ for _, rec := range records {
+ b, _ := json.Marshal(rec)
+ if _, err := fmt.Fprintln(file, string(b)); err != nil {
+ t.Fatalf("write source: %v", err)
+ }
+ }
+ file.Close()
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: sourcePath,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 2,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ }
+ service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath))
+ if err := service.Replay(context.Background()); err != nil {
+ t.Fatalf("replay: %v", err)
+ }
+ if err := service.Flush(context.Background()); err != nil {
+ t.Fatalf("flush: %v", err)
+ }
+
+ seq, err := store.GetGlobalSeq()
+ if err != nil {
+ t.Fatalf("get global seq: %v", err)
+ }
+ if seq != 3 {
+ t.Fatalf("global seq mismatch: got %d want 3", seq)
+ }
+
+ s, ok, err := store.GetState("did:plc:alice")
+ if err != nil {
+ t.Fatalf("get state: %v", err)
+ }
+ if !ok {
+ t.Fatalf("missing alice state")
+ }
+ if s.LatestOpSeq != 2 {
+ t.Fatalf("latest op seq mismatch for alice: got %d want 2", s.LatestOpSeq)
+ }
+ if err := service.VerifyDID(context.Background(), "did:plc:alice"); err != nil {
+ t.Fatalf("verify alice did: %v", err)
+ }
+
+ if _, err := service.Snapshot(context.Background()); err != nil {
+ t.Fatalf("snapshot: %v", err)
+ }
+ if _, ok, err := store.GetLatestCheckpoint(); err != nil || !ok {
+ t.Fatalf("expected latest checkpoint, err=%v ok=%v", err, ok)
+ }
+}
+
+func buildSignedRecords(t *testing.T) []types.ExportRecord {
+ t.Helper()
+ pub1, priv1, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key1: %v", err)
+ }
+ pub2, priv2, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key2: %v", err)
+ }
+ var out []types.ExportRecord
+
+ mk := func(seq uint64, did, prev string, pub ed25519.PublicKey, priv ed25519.PrivateKey) types.ExportRecord {
+ payloadDoc := map[string]any{
+ "did": did,
+ "didDoc": map[string]any{"id": did, "seq": seq},
+ }
+ if prev != "" {
+ payloadDoc["prev"] = prev
+ }
+ payloadBytes, _ := json.Marshal(payloadDoc)
+ canon, _ := types.CanonicalizeJSON(payloadBytes)
+ sig := ed25519.Sign(priv, canon)
+ op := map[string]any{
+ "did": did,
+ "didDoc": payloadDoc["didDoc"],
+ "publicKey": base64.RawURLEncoding.EncodeToString(pub),
+ "sigPayload": base64.RawURLEncoding.EncodeToString(canon),
+ "sig": base64.RawURLEncoding.EncodeToString(sig),
+ }
+ if prev != "" {
+ op["prev"] = prev
+ }
+ opRaw, _ := json.Marshal(op)
+ opCanon, _ := types.CanonicalizeJSON(opRaw)
+ cid := types.ComputeDigestCID(opCanon)
+ return types.ExportRecord{Seq: seq, DID: did, CID: cid, Operation: opRaw}
+ }
+
+ rec1 := mk(1, "did:plc:alice", "", pub1, priv1)
+ rec2 := mk(2, "did:plc:alice", rec1.CID, pub1, priv1)
+ rec3 := mk(3, "did:plc:bob", "", pub2, priv2)
+ out = append(out, rec1, rec2, rec3)
+ return out
+}