aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/service.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/ingest/service.go')
-rw-r--r--internal/ingest/service.go304
1 files changed, 303 insertions, 1 deletions
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
index e318422..1c95068 100644
--- a/internal/ingest/service.go
+++ b/internal/ingest/service.go
@@ -49,6 +49,8 @@ type Service struct {
corruptErr atomic.Value
startedAt time.Time
metricsSink MetricsSink
+ tracer *replayTracer
+ startupErr error
}
type MetricsSink interface {
@@ -71,6 +73,16 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog
startedAt: time.Now(),
}
+ if cfg.ReplayTrace {
+ tracer, err := openReplayTracer(cfg.DataDir)
+
+ if err != nil {
+ s.startupErr = err
+ } else {
+ s.tracer = tracer
+ }
+ }
+
if didCount, err := countStates(store); err == nil {
atomic.StoreUint64(&s.stats.DIDCount, didCount)
}
@@ -116,6 +128,10 @@ func (s *Service) Poll(ctx context.Context) error {
for {
if _, err := s.RunOnce(ctx); err != nil {
atomic.AddUint64(&s.stats.Errors, 1)
+
+ if isFatalIntegrityError(err) {
+ return err
+ }
}
select {
@@ -127,6 +143,10 @@ func (s *Service) Poll(ctx context.Context) error {
}
func (s *Service) RunOnce(ctx context.Context) (bool, error) {
+ if s.startupErr != nil {
+ return false, s.startupErr
+ }
+
if err := s.CorruptionError(); err != nil {
return false, err
}
@@ -153,7 +173,19 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) {
limit = remaining
}
- records, err := s.client.FetchExportLimited(ctx, lastSeq, limit)
+ pageSize := uint64(s.cfg.ExportPageSize)
+
+ if pageSize == 0 {
+ pageSize = 1000
+ }
+
+ batch, err := s.client.FetchExportBatch(ctx, lastSeq, limit, pageSize, s.traceFetchAttempt)
+
+ if err != nil {
+ return false, err
+ }
+
+ records, err := s.prepareRecordsForCommit(lastSeq, batch)
if err != nil {
return false, err
@@ -177,6 +209,22 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) {
lastCommitted := atomic.LoadUint64(&s.stats.LastSeq)
+ if lastCommitted != lastSeq+committed {
+ err := fatalIntegrityErrorf(
+ "sequence accounting mismatch: previous_global_seq=%d committed=%d new_global_seq=%d",
+ lastSeq,
+ committed,
+ lastCommitted,
+ )
+ s.traceIntegrityError(err.Error(), map[string]any{
+ "after": lastSeq,
+ "batch_count": len(records),
+ "last_committed": lastCommitted,
+ })
+
+ return committed > 0, err
+ }
+
if latestSourceSeq > lastCommitted {
atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted)
} else {
@@ -199,6 +247,22 @@ type verifyResult struct {
err error
}
+type fatalIntegrityError struct {
+ msg string
+}
+
+func (e fatalIntegrityError) Error() string { return e.msg }
+
+func fatalIntegrityErrorf(format string, args ...any) error {
+ return fatalIntegrityError{msg: fmt.Sprintf(format, args...)}
+}
+
+func isFatalIntegrityError(err error) bool {
+ var target fatalIntegrityError
+
+ return errors.As(err, &target)
+}
+
func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) {
verified, err := s.verifyRecords(ctx, records)
@@ -209,6 +273,98 @@ func (s *Service) processRecords(ctx context.Context, records []types.ExportReco
return s.commitVerified(ctx, verified)
}
+func (s *Service) prepareRecordsForCommit(lastSeq uint64, batch FetchBatch) ([]types.ExportRecord, error) {
+ if len(batch.Records) == 0 {
+ return nil, nil
+ }
+
+ sorted := append([]types.ExportRecord(nil), batch.Records...)
+
+ sort.SliceStable(sorted, func(i, j int) bool {
+ if sorted[i].Seq == sorted[j].Seq {
+ if sorted[i].DID == sorted[j].DID {
+ return sorted[i].CID < sorted[j].CID
+ }
+
+ return sorted[i].DID < sorted[j].DID
+ }
+
+ return sorted[i].Seq < sorted[j].Seq
+ })
+
+ out := make([]types.ExportRecord, 0, len(sorted))
+ seen := make(map[uint64]struct{}, len(sorted))
+ expected := lastSeq + 1
+
+ for _, rec := range sorted {
+ if rec.Seq == 0 {
+ err := fatalIntegrityErrorf(
+ "export record missing seq: after=%d request=%s did=%s cid=%s created_at=%s",
+ lastSeq,
+ batch.RequestURL,
+ rec.DID,
+ rec.CID,
+ rec.CreatedAt,
+ )
+ s.traceIntegrityError(err.Error(), map[string]any{
+ "after": lastSeq,
+ "request_url": batch.RequestURL,
+ "did": rec.DID,
+ "cid": rec.CID,
+ "created_at": rec.CreatedAt,
+ "parsed_records": batch.ParsedRecords,
+ })
+
+ return nil, err
+ }
+
+ if rec.Seq <= lastSeq {
+ s.traceDedupDecision(rec, "already_committed_seq", batch, lastSeq)
+ continue
+ }
+
+ if _, ok := seen[rec.Seq]; ok {
+ s.traceDedupDecision(rec, "duplicate_seq_in_batch", batch, lastSeq)
+ continue
+ }
+
+ if rec.Seq != expected {
+ err := fatalIntegrityErrorf(
+ "sequence gap detected: expected=%d got=%d previous_global_seq=%d request=%s first_created_at=%s last_created_at=%s parsed_records=%d skipped_lines=%d truncated_tail=%t",
+ expected,
+ rec.Seq,
+ lastSeq,
+ batch.RequestURL,
+ batch.FirstCreated,
+ batch.LastCreated,
+ batch.ParsedRecords,
+ batch.SkippedLines,
+ batch.TruncatedTail,
+ )
+ s.traceIntegrityError(err.Error(), map[string]any{
+ "expected_seq": expected,
+ "got_seq": rec.Seq,
+ "after": lastSeq,
+ "request_url": batch.RequestURL,
+ "count": batch.Count,
+ "first_created_at": batch.FirstCreated,
+ "last_created_at": batch.LastCreated,
+ "parsed_records": batch.ParsedRecords,
+ "skipped_lines": batch.SkippedLines,
+ "truncated_tail": batch.TruncatedTail,
+ })
+
+ return nil, err
+ }
+
+ seen[rec.Seq] = struct{}{}
+ out = append(out, rec)
+ expected++
+ }
+
+ return out, nil
+}
+
func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) {
_ = ctx
workers := s.cfg.VerifyWorkers
@@ -252,6 +408,14 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor
if err := s.verifier.VerifyOperation(op, existing); err != nil {
atomic.AddUint64(&s.stats.VerifyFailures, 1)
+ if strings.Contains(err.Error(), "prev linkage mismatch") {
+ diag := s.buildPrevMismatchDiagnostic(op, existing)
+ s.tracePrevMismatch(diag)
+ results <- verifyResult{index: task.index, err: fatalIntegrityErrorf("%s", diag.Message)}
+
+ continue
+ }
+
results <- verifyResult{index: task.index, err: err}
continue
@@ -383,6 +547,139 @@ func cloneState(in *types.StateV1) *types.StateV1 {
return &out
}
+type prevMismatchDiagnostic struct {
+ Message string `json:"message"`
+ DID string `json:"did"`
+ TipCID string `json:"tip_cid"`
+ PrevCID string `json:"prev_cid"`
+ OpCID string `json:"op_cid"`
+ CreatedAt string `json:"created_at"`
+ LastOps []types.BlockRefV1 `json:"last_ops"`
+}
+
+func (s *Service) buildPrevMismatchDiagnostic(op types.ParsedOperation, existing *types.StateV1) prevMismatchDiagnostic {
+ diag := prevMismatchDiagnostic{
+ DID: op.DID,
+ PrevCID: op.Prev,
+ OpCID: op.CID,
+ CreatedAt: op.RawRecord.CreatedAt,
+ }
+
+ if existing != nil {
+ diag.TipCID = existing.ChainTipHash
+ }
+
+ const tail = 8
+
+ seqs, err := s.store.ListDIDSequences(op.DID)
+
+ if err == nil && len(seqs) > 0 {
+ start := len(seqs) - tail
+
+ if start < 0 {
+ start = 0
+ }
+
+ for _, seq := range seqs[start:] {
+ ref, ok, getErr := s.store.GetOpSeqRef(seq)
+
+ if getErr != nil || !ok {
+ continue
+ }
+
+ diag.LastOps = append(diag.LastOps, ref)
+ }
+ }
+
+ diag.Message = fmt.Sprintf(
+ "prev linkage mismatch: did=%s tip=%s op_prev=%s op_cid=%s created_at=%s",
+ diag.DID,
+ diag.TipCID,
+ diag.PrevCID,
+ diag.OpCID,
+ diag.CreatedAt,
+ )
+
+ return diag
+}
+
+func (s *Service) traceFetchAttempt(trace FetchAttemptTrace) {
+ if s.tracer == nil {
+ return
+ }
+
+ s.tracer.Write(replayTraceEvent{
+ Kind: "export_fetch",
+ RequestURL: trace.URL,
+ After: trace.After,
+ Count: trace.Count,
+ Attempt: trace.Attempt,
+ StatusCode: trace.StatusCode,
+ Retryable: trace.Retryable,
+ RetryAfterMS: trace.RetryAfter.Milliseconds(),
+ ParsedLines: trace.ParsedLines,
+ ParsedRecords: trace.ParsedRecords,
+ SkippedLines: trace.SkippedLines,
+ TruncatedTail: trace.TruncatedTail,
+ FirstCreatedAt: trace.FirstCreated,
+ LastCreatedAt: trace.LastCreated,
+ Error: trace.Error,
+ })
+}
+
+func (s *Service) traceDedupDecision(rec types.ExportRecord, reason string, batch FetchBatch, after uint64) {
+ if s.tracer == nil {
+ return
+ }
+
+ s.tracer.Write(replayTraceEvent{
+ Kind: "dedup_decision",
+ Reason: reason,
+ Sequence: rec.Seq,
+ DID: rec.DID,
+ CID: rec.CID,
+ CreatedAt: rec.CreatedAt,
+ After: after,
+ Count: batch.Count,
+ RequestURL: batch.RequestURL,
+ FirstCreatedAt: batch.FirstCreated,
+ LastCreatedAt: batch.LastCreated,
+ })
+}
+
+func (s *Service) tracePrevMismatch(diag prevMismatchDiagnostic) {
+ log.Printf("%s", diag.Message)
+
+ if s.tracer == nil {
+ return
+ }
+
+ s.tracer.Write(replayTraceEvent{
+ Kind: "prev_mismatch",
+ DID: diag.DID,
+ Tip: diag.TipCID,
+ Prev: diag.PrevCID,
+ CID: diag.OpCID,
+ CreatedAt: diag.CreatedAt,
+ Message: diag.Message,
+ Details: diag.LastOps,
+ })
+}
+
+func (s *Service) traceIntegrityError(message string, details map[string]any) {
+ log.Printf("%s", message)
+
+ if s.tracer == nil {
+ return
+ }
+
+ s.tracer.Write(replayTraceEvent{
+ Kind: "integrity_error",
+ Message: message,
+ Details: details,
+ })
+}
+
func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) {
_ = ctx
batchSize := s.cfg.CommitBatchSize
@@ -473,6 +770,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
}
result.Ref.Received = time.Now().UTC().Format(time.RFC3339)
+ result.Ref.CreatedAt = v.op.RawRecord.CreatedAt
ref = &result.Ref
}
@@ -1383,6 +1681,10 @@ func extractRotationKeysFromPayload(payload map[string]any, prior []string) []st
}
func (s *Service) Close() {
+ if s.tracer != nil {
+ _ = s.tracer.Close()
+ }
+
if s.appender != nil {
s.appender.Close()
}