aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/service_integration_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/ingest/service_integration_test.go')
-rw-r--r--internal/ingest/service_integration_test.go69
1 files changed, 69 insertions, 0 deletions
diff --git a/internal/ingest/service_integration_test.go b/internal/ingest/service_integration_test.go
index d7bd1b6..e01dd4e 100644
--- a/internal/ingest/service_integration_test.go
+++ b/internal/ingest/service_integration_test.go
@@ -155,3 +155,72 @@ func buildSignedRecords(t *testing.T) []types.ExportRecord {
out = append(out, rec1, rec2, rec3)
return out
}
+
+func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) {
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data dir: %v", err)
+ }
+
+ keySeed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(keySeed); err != nil {
+ t.Fatalf("rand seed: %v", err)
+ }
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil {
+ t.Fatalf("write mirror key: %v", err)
+ }
+
+ records := buildSignedRecords(t)
+ sourcePath := filepath.Join(tmp, "sample.ndjson")
+ file, err := os.Create(sourcePath)
+ if err != nil {
+ t.Fatalf("create source: %v", err)
+ }
+ for _, rec := range records {
+ b, _ := json.Marshal(rec)
+ if _, err := fmt.Fprintln(file, string(b)); err != nil {
+ t.Fatalf("write source: %v", err)
+ }
+ }
+ file.Close()
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: sourcePath,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 2,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ PollInterval: 5,
+ }
+ service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath))
+ if err := service.Replay(context.Background()); err != nil {
+ t.Fatalf("replay: %v", err)
+ }
+ if err := service.Flush(context.Background()); err != nil {
+ t.Fatalf("flush: %v", err)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ if _, _, err := service.RecomputeTipAtOrBefore(ctx, "did:plc:alice", 2); err == nil {
+ t.Fatalf("expected cancellation error")
+ }
+}