diff options
Diffstat (limited to 'internal/ingest/service_integration_test.go')
| -rw-r--r-- | internal/ingest/service_integration_test.go | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/internal/ingest/service_integration_test.go b/internal/ingest/service_integration_test.go index d7bd1b6..e01dd4e 100644 --- a/internal/ingest/service_integration_test.go +++ b/internal/ingest/service_integration_test.go @@ -155,3 +155,72 @@ func buildSignedRecords(t *testing.T) []types.ExportRecord { out = append(out, rec1, rec2, rec3) return out } + +func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) { + tmp := t.TempDir() + dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { + t.Fatalf("mkdir data dir: %v", err) + } + + keySeed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(keySeed); err != nil { + t.Fatalf("rand seed: %v", err) + } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { + t.Fatalf("write mirror key: %v", err) + } + + records := buildSignedRecords(t) + sourcePath := filepath.Join(tmp, "sample.ndjson") + file, err := os.Create(sourcePath) + if err != nil { + t.Fatalf("create source: %v", err) + } + for _, rec := range records { + b, _ := json.Marshal(rec) + if _, err := fmt.Fprintln(file, string(b)); err != nil { + t.Fatalf("write source: %v", err) + } + } + file.Close() + + store, err := storage.OpenPebble(dataDir) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + if err := store.SetMode(config.ModeMirror); err != nil { + t.Fatalf("set mode: %v", err) + } + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { + t.Fatalf("open block log: %v", err) + } + cfg := config.Config{ + Mode: config.ModeMirror, + DataDir: dataDir, + PLCSource: sourcePath, + VerifyPolicy: config.VerifyFull, + ZstdLevel: 3, + BlockSizeMB: 4, + CheckpointInterval: 2, + ListenAddr: ":0", + MirrorPrivateKeyPath: keyPath, + PollInterval: 5, + } + service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath)) + if err := service.Replay(context.Background()); err != nil { + t.Fatalf("replay: %v", err) + } + if err := service.Flush(context.Background()); err != nil { + t.Fatalf("flush: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if _, _, err := service.RecomputeTipAtOrBefore(ctx, "did:plc:alice", 2); err == nil { + t.Fatalf("expected cancellation error") + } +} |